kfaraz commented on code in PR #15699:
URL: https://github.com/apache/druid/pull/15699#discussion_r1461625798


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java:
##########
@@ -113,11 +131,81 @@ public TypeReference<Collection<DataSegment>> 
getReturnTypeReference()
 
   @Override
   public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+  {
+    // The DruidInputSource can be used to read from one datasource and write 
to another.
+    // In such a case, the race condition described in the class-level docs 
cannot occur,
+    // and the action can simply fetch all visible segments for the datasource 
and interval.
+    // Similarly, an MSQ replace could read from a different datasource.
+    if (!task.getDataSource().equals(dataSource)) {
+      return retrieveUsedSegments(toolbox);
+    }
+
+    final String supervisorId;
+    if (task instanceof AbstractBatchSubtask) {
+      supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
+    } else {
+      supervisorId = task.getId();
+    }
+
+    final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+        .getTaskLockbox()
+        .getAllReplaceLocksForDatasource(task.getDataSource())
+        .stream()
+        .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId()))
+        .collect(Collectors.toSet());
+
+    // If there are no replace locks for the task, simply fetch all visible 
segments for the interval
+    if (replaceLocksForTask.isEmpty()) {
+      return retrieveUsedSegments(toolbox);
+    }
+
+    Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments = 
new HashMap<>();
+    for (Pair<DataSegment, String> segmentAndCreatedDate :
+        
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource,
 intervals)) {
+      final DataSegment segment = segmentAndCreatedDate.lhs;
+      final String created = segmentAndCreatedDate.rhs;
+      intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s -> 
new HashMap<>())
+                                 .computeIfAbsent(created, c -> new 
HashSet<>())
+                                 .add(segment);
+    }
+
+    Set<DataSegment> allSegmentsToBeReplaced = new HashSet<>();
+    for (final Map.Entry<Interval, Map<String, Set<DataSegment>>> entry : 
intervalToCreatedToSegments.entrySet()) {
+      final Interval segmentInterval = entry.getKey();
+      String lockVersion = null;
+      for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
+        if (replaceLock.getInterval().contains(segmentInterval)) {
+          lockVersion = replaceLock.getVersion();
+        }
+      }
+      final Map<String, Set<DataSegment>> createdToSegmentsMap = 
entry.getValue();
+      for (Map.Entry<String, Set<DataSegment>> createdAndSegments : 
createdToSegmentsMap.entrySet()) {
+        if (lockVersion == null || 
lockVersion.compareTo(createdAndSegments.getKey()) > 0) {
+          allSegmentsToBeReplaced.addAll(createdAndSegments.getValue());
+        } else {
+          for (DataSegment segment : createdAndSegments.getValue()) {
+            log.info("Ignoring segment[%s] as it has created_date[%s] greater 
than the REPLACE lock version[%s]",

Review Comment:
   I fear it might end up being too verbose. Better to just log the replace 
lock version for each interval and point out that anything newer than that 
would not be considered.
   The segment IDs if needed can go in a debug log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to