AmatyaAvadhanula commented on code in PR #16144:
URL: https://github.com/apache/druid/pull/16144#discussion_r1556805639


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java:
##########
@@ -141,22 +144,31 @@ private void tryUpgradeOverlappingPendingSegments(Task 
task, TaskActionToolbox t
     final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
     final Optional<String> activeSupervisorIdWithAppendLock =
         
supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());
+
     if (!activeSupervisorIdWithAppendLock.isPresent()) {
       return;
     }
 
-    final Set<String> activeRealtimeSequencePrefixes
-        = 
supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
-    Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradedPendingSegments =
-        toolbox.getIndexerMetadataStorageCoordinator()
-               .upgradePendingSegmentsOverlappingWith(segments, 
activeRealtimeSequencePrefixes);
-    log.info(
-        "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
-        upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
-    );
+    List<PendingSegment> pendingSegments
+        = 
toolbox.getIndexerMetadataStorageCoordinator().getAllPendingSegments(task.getDataSource());
+    Map<String, SegmentIdWithShardSpec> pendingSegmentIdMap = new HashMap<>();
+    pendingSegments.forEach(pendingSegment -> pendingSegmentIdMap.put(
+        pendingSegment.getId().asSegmentId().toString(),
+        pendingSegment.getId()
+    ));
+    Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradedPendingSegments = new HashMap<>();
+    pendingSegments.forEach(pendingSegment -> {
+      if (pendingSegment.getParentId() != null
+          && 
!pendingSegment.getParentId().equals(pendingSegment.getId().asSegmentId().toString()))
 {
+        upgradedPendingSegments.put(
+            pendingSegment.getId(),
+            pendingSegmentIdMap.get(pendingSegment.getParentId())
+        );
+      }
+    });
 
     upgradedPendingSegments.forEach(
-        (oldId, newId) -> toolbox.getSupervisorManager()
+        (newId, oldId) -> toolbox.getSupervisorManager()

Review Comment:
   Thanks, done



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -103,6 +103,11 @@ public class TaskLockbox
   // this set should be accessed under the giant lock.
   private final Set<String> activeTasks = new HashSet<>();
 
+  // Stores map of pending task group of tasks to the set of their ids.
+  // Useful for task replicas. Clean up pending segments only when the set is 
empty.
+  // this map should be accessed under the giant lock.

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to