kfaraz commented on code in PR #16162:
URL: https://github.com/apache/druid/pull/16162#discussion_r1574978556


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java:
##########
@@ -46,7 +45,7 @@
 import java.util.Set;
 
 @JsonTypeName(MSQWorkerTask.TYPE)
-public class MSQWorkerTask extends AbstractTask implements 
PendingSegmentAllocatingTask
+public class MSQWorkerTask extends AbstractTask

Review Comment:
   Thanks for fixing this!



##########
server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java:
##########
@@ -92,6 +118,12 @@ public String getErrorMsg()
     return errorMsg;
   }
 
+  @JsonIgnore

Review Comment:
   Probably not needed since only the properties marked `@JsonProperty` 
actually get serialized out.
   Also, please annotate this method as `@Nullable`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1599,7 +1597,7 @@ public Response registerNewVersionOfPendingSegment(
       log.error(
           e,
           "Could not register new version[%s] of pending segment[%s]",
-          pendingSegmentVersions.getNewVersion(), 
pendingSegmentVersions.getBaseSegment()
+          upgradedPendingSegment.getId().getVersion(), 
upgradedPendingSegment.getUpgradedFromSegmentId()

Review Comment:
   ```suggestion
             upgradedPendingSegment.getId(), 
upgradedPendingSegment.getUpgradedFromSegmentId()
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java:
##########
@@ -197,13 +197,12 @@ public ListenableFuture<Map<PartitionIdType, 
SequenceOffsetType>> getEndOffsetsA
   @Override
   public ListenableFuture<Boolean> registerNewVersionOfPendingSegmentAsync(
       String taskId,
-      SegmentIdWithShardSpec basePendingSegment,
-      SegmentIdWithShardSpec newVersionOfSegment
+      PendingSegmentRecord pendingSegmentRecord
   )
   {
     final RequestBuilder requestBuilder
         = new RequestBuilder(HttpMethod.POST, "/pendingSegmentVersion")
-        .jsonContent(jsonMapper, new 
PendingSegmentVersions(basePendingSegment, newVersionOfSegment));
+        .jsonContent(jsonMapper, pendingSegmentRecord);

Review Comment:
   Why do we need to change this API? The task side doesn't seem to need any 
info other than the base segment id and the upgraded segment id.
   
   Postponing this refactor until later might simplify this PR a bit.



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -161,8 +163,10 @@ public class StreamAppenderator implements Appenderator
 
   private final AtomicBoolean closed = new AtomicBoolean(false);
 
-  private final ConcurrentHashMap<SegmentId, Set<SegmentIdWithShardSpec>>
-      baseSegmentToUpgradedVersions = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<SegmentIdWithShardSpec, 
Set<SegmentIdWithShardSpec>> baseSegmentToUpgradedVersions
+      = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<SegmentIdWithShardSpec, 
SegmentIdWithShardSpec> upgradedVersionToBaseSegment

Review Comment:
   I think the word "version" might cause confusion.
   ```suggestion
     private final ConcurrentHashMap<SegmentIdWithShardSpec, 
SegmentIdWithShardSpec> upgradedSegmentToBaseSegment
   ```



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java:
##########
@@ -697,7 +703,7 @@ ListenableFuture<SegmentsAndCommitMetadata> 
publishInBackground(
                 Throwables.propagateIfPossible(e);
                 throw new RuntimeException(e);
               }
-              return segmentsAndCommitMetadata;
+              return 
segmentsAndCommitMetadata.withUpgradedSegments(upgradedSegments);

Review Comment:
   This can be addressed later.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -310,14 +310,14 @@ public boolean checkPointDataSourceMetadata(
    */
   public boolean registerNewVersionOfPendingSegmentOnSupervisor(
       String supervisorId,
-      SegmentIdWithShardSpec basePendingSegment,
-      SegmentIdWithShardSpec newSegmentVersion
+      PendingSegmentRecord upgradedPendingSegment
   )
   {
     try {
       Preconditions.checkNotNull(supervisorId, "supervisorId cannot be null");
-      Preconditions.checkNotNull(basePendingSegment, "rootPendingSegment 
cannot be null");
-      Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion cannot 
be null");
+      Preconditions.checkNotNull(upgradedPendingSegment, "upgraded pending 
segment cannot be null");
+      Preconditions.checkNotNull(upgradedPendingSegment.getTaskAllocatorId(), 
"replica group cannot be null");
+      
Preconditions.checkNotNull(upgradedPendingSegment.getUpgradedFromSegmentId(), 
"root id cannot be null");

Review Comment:
   Exceptions should have the correct field names:
   ```suggestion
         
Preconditions.checkNotNull(upgradedPendingSegment.getTaskAllocatorId(), 
"taskAllocatorId cannot be null");
         
Preconditions.checkNotNull(upgradedPendingSegment.getUpgradedFromSegmentId(), 
"upgradedFromSegmentId cannot be null");
   ```



##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java:
##########
@@ -898,7 +898,7 @@ private static int computeNumChangedSegments(List<String> 
segmentIds, int[] segm
    *
    * @implNote JDBI 3.x has better support for binding {@code IN} clauses 
directly.
    */
-  private static String getParameterizedInConditionForColumn(final String 
columnName, final List<String> values)
+  public static String getParameterizedInConditionForColumn(final String 
columnName, final List<String> values)

Review Comment:
   Why does this need to be public?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -310,14 +310,14 @@ public boolean checkPointDataSourceMetadata(
    */
   public boolean registerNewVersionOfPendingSegmentOnSupervisor(

Review Comment:
   Rename for clarity, given the new arguments:
   ```suggestion
     public boolean registerUpgradedPendingSegmentOnSupervisor(
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -326,12 +326,12 @@ public boolean 
registerNewVersionOfPendingSegmentOnSupervisor(
       }
 
       SeekableStreamSupervisor<?, ?, ?> seekableStreamSupervisor = 
(SeekableStreamSupervisor<?, ?, ?>) supervisor.lhs;
-      
seekableStreamSupervisor.registerNewVersionOfPendingSegment(basePendingSegment, 
newSegmentVersion);
+      
seekableStreamSupervisor.registerNewVersionOfPendingSegment(upgradedPendingSegment);
       return true;
     }
     catch (Exception e) {
-      log.error(e, "PendingSegmentRecord[%s] mapping update request to 
version[%s] on Supervisor[%s] failed",
-                basePendingSegment.asSegmentId(), 
newSegmentVersion.getVersion(), supervisorId);
+      log.error(e, "PendingSegment[%s] mapping update request to version[%s] 
on Supervisor[%s] failed",
+                upgradedPendingSegment.getUpgradedFromSegmentId(), 
upgradedPendingSegment.getId().getVersion(), supervisorId);

Review Comment:
   ```suggestion
         log.error(e, "Failed to upgrade pending segment[%s] to new pending 
segment[%s] on Supervisor[%s]",
                   upgradedPendingSegment.getUpgradedFromSegmentId(), 
upgradedPendingSegment.getId(), supervisorId);
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1599,7 +1597,7 @@ public Response registerNewVersionOfPendingSegment(
       log.error(
           e,
           "Could not register new version[%s] of pending segment[%s]",

Review Comment:
   ```suggestion
             "Could not register pending segment[%s] upgraded from[%s]",
   ```



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -161,8 +163,10 @@ public class StreamAppenderator implements Appenderator
 
   private final AtomicBoolean closed = new AtomicBoolean(false);
 
-  private final ConcurrentHashMap<SegmentId, Set<SegmentIdWithShardSpec>>
-      baseSegmentToUpgradedVersions = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<SegmentIdWithShardSpec, 
Set<SegmentIdWithShardSpec>> baseSegmentToUpgradedVersions

Review Comment:
   ```suggestion
     private final ConcurrentHashMap<SegmentIdWithShardSpec, 
Set<SegmentIdWithShardSpec>> baseSegmentToUpgradedSegments
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1576,17 +1577,14 @@ public Response setEndOffsetsHTTP(
   @Consumes(MediaType.APPLICATION_JSON)
   @Produces(MediaType.APPLICATION_JSON)
   public Response registerNewVersionOfPendingSegment(

Review Comment:
   Rename this method: `registerUpgradedPendingSegment`.



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -712,16 +713,15 @@ public Map<SegmentIdWithShardSpec, 
SegmentIdWithShardSpec> upgradePendingSegment
    * those versions.</li>
    * </ul>
    *
-   * @return Map from original pending segment to the new upgraded ID.
+   * @return Inserted pending segment records
    */
-  private Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradePendingSegments(
+  private List<PendingSegmentRecord> upgradePendingSegments(

Review Comment:
   Thanks for updating this, much simpler now!



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1334,6 +1372,13 @@ private SegmentPublishResult 
commitAppendSegmentsAndMetadataInTransaction(
             }
 
             insertIntoUpgradeSegmentsTable(handle, appendSegmentToReplaceLock);
+
+            final List<String> appendedSegmentIds

Review Comment:
   Please add a comment here.



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1263,6 +1269,39 @@ public int hashCode()
     }
   }
 
+  private static void bindColumnValuesToQueryWithInCondition(
+      final String columnName,
+      final List<String> values,
+      final Update query
+  )
+  {
+    if (values == null) {
+      return;
+    }
+
+    for (int i = 0; i < values.size(); i++) {
+      query.bind(StringUtils.format("%s%d", columnName, i), values.get(i));
+    }
+  }
+
+  private int deletePendingSegmentsById(Handle handle, String datasource, 
List<String> pendingSegmentIds)
+  {
+    if (pendingSegmentIds.isEmpty()) {
+      return 0;
+    }
+
+    Update query = handle.createStatement(

Review Comment:
   Should this operation be broken up into smaller batches in case there are 
too many segments to delete? Otherwise we might end up with a large `IN` clause.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to