kfaraz commented on code in PR #16162:
URL: https://github.com/apache/druid/pull/16162#discussion_r1574978556
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java:
##########
@@ -46,7 +45,7 @@
import java.util.Set;
@JsonTypeName(MSQWorkerTask.TYPE)
-public class MSQWorkerTask extends AbstractTask implements
PendingSegmentAllocatingTask
+public class MSQWorkerTask extends AbstractTask
Review Comment:
Thanks for fixing this!
##########
server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java:
##########
@@ -92,6 +118,12 @@ public String getErrorMsg()
return errorMsg;
}
+ @JsonIgnore
Review Comment:
Probably not needed since only the properties marked `@JsonProperty`
actually get serialized out.
Also, please annotate this method as `@Nullable`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1599,7 +1597,7 @@ public Response registerNewVersionOfPendingSegment(
log.error(
e,
"Could not register new version[%s] of pending segment[%s]",
- pendingSegmentVersions.getNewVersion(),
pendingSegmentVersions.getBaseSegment()
+ upgradedPendingSegment.getId().getVersion(),
upgradedPendingSegment.getUpgradedFromSegmentId()
Review Comment:
```suggestion
upgradedPendingSegment.getId(),
upgradedPendingSegment.getUpgradedFromSegmentId()
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java:
##########
@@ -197,13 +197,12 @@ public ListenableFuture<Map<PartitionIdType,
SequenceOffsetType>> getEndOffsetsA
@Override
public ListenableFuture<Boolean> registerNewVersionOfPendingSegmentAsync(
String taskId,
- SegmentIdWithShardSpec basePendingSegment,
- SegmentIdWithShardSpec newVersionOfSegment
+ PendingSegmentRecord pendingSegmentRecord
)
{
final RequestBuilder requestBuilder
= new RequestBuilder(HttpMethod.POST, "/pendingSegmentVersion")
- .jsonContent(jsonMapper, new
PendingSegmentVersions(basePendingSegment, newVersionOfSegment));
+ .jsonContent(jsonMapper, pendingSegmentRecord);
Review Comment:
Why do we need to change this API? The task side doesn't seem to need any
info other than the base segment id and the upgraded segment id.
Postponing this refactor until later might simplify this PR a bit.
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -161,8 +163,10 @@ public class StreamAppenderator implements Appenderator
private final AtomicBoolean closed = new AtomicBoolean(false);
- private final ConcurrentHashMap<SegmentId, Set<SegmentIdWithShardSpec>>
- baseSegmentToUpgradedVersions = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<SegmentIdWithShardSpec,
Set<SegmentIdWithShardSpec>> baseSegmentToUpgradedVersions
+ = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<SegmentIdWithShardSpec,
SegmentIdWithShardSpec> upgradedVersionToBaseSegment
Review Comment:
I think the word "version" might cause confusion.
```suggestion
private final ConcurrentHashMap<SegmentIdWithShardSpec,
SegmentIdWithShardSpec> upgradedSegmentToBaseSegment
```
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java:
##########
@@ -697,7 +703,7 @@ ListenableFuture<SegmentsAndCommitMetadata>
publishInBackground(
Throwables.propagateIfPossible(e);
throw new RuntimeException(e);
}
- return segmentsAndCommitMetadata;
+ return
segmentsAndCommitMetadata.withUpgradedSegments(upgradedSegments);
Review Comment:
This can be addressed later.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -310,14 +310,14 @@ public boolean checkPointDataSourceMetadata(
*/
public boolean registerNewVersionOfPendingSegmentOnSupervisor(
String supervisorId,
- SegmentIdWithShardSpec basePendingSegment,
- SegmentIdWithShardSpec newSegmentVersion
+ PendingSegmentRecord upgradedPendingSegment
)
{
try {
Preconditions.checkNotNull(supervisorId, "supervisorId cannot be null");
- Preconditions.checkNotNull(basePendingSegment, "rootPendingSegment
cannot be null");
- Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion cannot
be null");
+ Preconditions.checkNotNull(upgradedPendingSegment, "upgraded pending
segment cannot be null");
+ Preconditions.checkNotNull(upgradedPendingSegment.getTaskAllocatorId(),
"replica group cannot be null");
+
Preconditions.checkNotNull(upgradedPendingSegment.getUpgradedFromSegmentId(),
"root id cannot be null");
Review Comment:
Exceptions should have the correct field names:
```suggestion
Preconditions.checkNotNull(upgradedPendingSegment.getTaskAllocatorId(),
"taskAllocatorId cannot be null");
Preconditions.checkNotNull(upgradedPendingSegment.getUpgradedFromSegmentId(),
"upgradedFromSegmentId cannot be null");
```
##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java:
##########
@@ -898,7 +898,7 @@ private static int computeNumChangedSegments(List<String>
segmentIds, int[] segm
*
* @implNote JDBI 3.x has better support for binding {@code IN} clauses
directly.
*/
- private static String getParameterizedInConditionForColumn(final String
columnName, final List<String> values)
+ public static String getParameterizedInConditionForColumn(final String
columnName, final List<String> values)
Review Comment:
Why does this need to be public?
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -310,14 +310,14 @@ public boolean checkPointDataSourceMetadata(
*/
public boolean registerNewVersionOfPendingSegmentOnSupervisor(
Review Comment:
Rename for clarity, given the new arguments:
```suggestion
public boolean registerUpgradedPendingSegmentOnSupervisor(
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -326,12 +326,12 @@ public boolean
registerNewVersionOfPendingSegmentOnSupervisor(
}
SeekableStreamSupervisor<?, ?, ?> seekableStreamSupervisor =
(SeekableStreamSupervisor<?, ?, ?>) supervisor.lhs;
-
seekableStreamSupervisor.registerNewVersionOfPendingSegment(basePendingSegment,
newSegmentVersion);
+
seekableStreamSupervisor.registerNewVersionOfPendingSegment(upgradedPendingSegment);
return true;
}
catch (Exception e) {
- log.error(e, "PendingSegmentRecord[%s] mapping update request to
version[%s] on Supervisor[%s] failed",
- basePendingSegment.asSegmentId(),
newSegmentVersion.getVersion(), supervisorId);
+ log.error(e, "PendingSegment[%s] mapping update request to version[%s]
on Supervisor[%s] failed",
+ upgradedPendingSegment.getUpgradedFromSegmentId(),
upgradedPendingSegment.getId().getVersion(), supervisorId);
Review Comment:
```suggestion
log.error(e, "Failed to upgrade pending segment[%s] to new pending
segment[%s] on Supervisor[%s]",
upgradedPendingSegment.getUpgradedFromSegmentId(),
upgradedPendingSegment.getId(), supervisorId);
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1599,7 +1597,7 @@ public Response registerNewVersionOfPendingSegment(
log.error(
e,
"Could not register new version[%s] of pending segment[%s]",
Review Comment:
```suggestion
"Could not register pending segment[%s] upgraded from[%s]",
```
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -161,8 +163,10 @@ public class StreamAppenderator implements Appenderator
private final AtomicBoolean closed = new AtomicBoolean(false);
- private final ConcurrentHashMap<SegmentId, Set<SegmentIdWithShardSpec>>
- baseSegmentToUpgradedVersions = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<SegmentIdWithShardSpec,
Set<SegmentIdWithShardSpec>> baseSegmentToUpgradedVersions
Review Comment:
```suggestion
private final ConcurrentHashMap<SegmentIdWithShardSpec,
Set<SegmentIdWithShardSpec>> baseSegmentToUpgradedSegments
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1576,17 +1577,14 @@ public Response setEndOffsetsHTTP(
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response registerNewVersionOfPendingSegment(
Review Comment:
Rename this method: `registerUpgradedPendingSegment`.
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -712,16 +713,15 @@ public Map<SegmentIdWithShardSpec,
SegmentIdWithShardSpec> upgradePendingSegment
* those versions.</li>
* </ul>
*
- * @return Map from original pending segment to the new upgraded ID.
+ * @return Inserted pending segment records
*/
- private Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec>
upgradePendingSegments(
+ private List<PendingSegmentRecord> upgradePendingSegments(
Review Comment:
Thanks for updating this, much simpler now!
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1334,6 +1372,13 @@ private SegmentPublishResult
commitAppendSegmentsAndMetadataInTransaction(
}
insertIntoUpgradeSegmentsTable(handle, appendSegmentToReplaceLock);
+
+ final List<String> appendedSegmentIds
Review Comment:
Please add a comment here.
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1263,6 +1269,39 @@ public int hashCode()
}
}
+ private static void bindColumnValuesToQueryWithInCondition(
+ final String columnName,
+ final List<String> values,
+ final Update query
+ )
+ {
+ if (values == null) {
+ return;
+ }
+
+ for (int i = 0; i < values.size(); i++) {
+ query.bind(StringUtils.format("%s%d", columnName, i), values.get(i));
+ }
+ }
+
+ private int deletePendingSegmentsById(Handle handle, String datasource,
List<String> pendingSegmentIds)
+ {
+ if (pendingSegmentIds.isEmpty()) {
+ return 0;
+ }
+
+ Update query = handle.createStatement(
Review Comment:
Should this operation be broken up into smaller batches in case there are
too many segments to delete? Otherwise we might end up with a large `IN` clause.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]