kfaraz commented on code in PR #15699:
URL: https://github.com/apache/druid/pull/15699#discussion_r1460402921


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java:
##########
@@ -26,29 +26,47 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import org.apache.druid.indexing.common.task.Task;
+import 
org.apache.druid.indexing.common.task.batch.parallel.AbstractBatchSubtask;
 import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.ReplaceTaskLock;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.Partitions;
+import org.apache.druid.timeline.SegmentTimeline;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * This TaskAction returns a collection of segments which have data within the 
specified intervals and are marked as
  * used.
+ * If the task holds REPLACE locks and the datasource being read is also the 
one being replaced,
+ * fetch only those segments for the interval that were created before its 
REPLACE lock's version.
+ * This change is needed to ensure that the input set of segments is always 
consistent for a replacing task
+ * when concurrent appending tasks append segments.

Review Comment:
   ```suggestion
    * If the task holds REPLACE locks and is writing back to the same 
datasource,
    * only segments that were created before the REPLACE lock was acquired are 
returned.
    * This ensures that the input set of segments for this replace task remains 
consistent
    * even when new data is appended by other concurrent tasks.
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java:
##########
@@ -113,11 +131,81 @@ public TypeReference<Collection<DataSegment>> 
getReturnTypeReference()
 
   @Override
   public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+  {
+    // The DruidInputSource can be used to read from one datasource and write 
to another.
+    // In such a case, the race condition described in the class-level docs 
cannot occur,
+    // and the action can simply fetch all visible segments for the datasource 
and interval.
+    // Similarly, an MSQ replace could read from a different datasource.
+    if (!task.getDataSource().equals(dataSource)) {
+      return retrieveUsedSegments(toolbox);
+    }
+
+    final String supervisorId;
+    if (task instanceof AbstractBatchSubtask) {
+      supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
+    } else {
+      supervisorId = task.getId();
+    }
+
+    final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+        .getTaskLockbox()
+        .getAllReplaceLocksForDatasource(task.getDataSource())
+        .stream()
+        .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId()))
+        .collect(Collectors.toSet());
+
+    // If there are no replace locks for the task, simply fetch all visible 
segments for the interval
+    if (replaceLocksForTask.isEmpty()) {
+      return retrieveUsedSegments(toolbox);
+    }
+
+    Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments = 
new HashMap<>();
+    for (Pair<DataSegment, String> segmentAndCreatedDate :
+        
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource,
 intervals)) {
+      final DataSegment segment = segmentAndCreatedDate.lhs;
+      final String created = segmentAndCreatedDate.rhs;
+      intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s -> 
new HashMap<>())
+                                 .computeIfAbsent(created, c -> new 
HashSet<>())
+                                 .add(segment);
+    }
+
+    Set<DataSegment> allSegmentsToBeReplaced = new HashSet<>();
+    for (final Map.Entry<Interval, Map<String, Set<DataSegment>>> entry : 
intervalToCreatedToSegments.entrySet()) {
+      final Interval segmentInterval = entry.getKey();
+      String lockVersion = null;
+      for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
+        if (replaceLock.getInterval().contains(segmentInterval)) {
+          lockVersion = replaceLock.getVersion();

Review Comment:
   Add a `break` after this line.



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java:
##########
@@ -128,14 +126,6 @@ public <RetType> RetType submit(TaskAction<RetType> 
taskAction)
                                                                  .build()
                                      ).collect(Collectors.toSet());
       }
-    } else if (taskAction instanceof RetrieveSegmentsToReplaceAction) {

Review Comment:
   Won't we need any special handling to ensure that the MSQ tests work as 
expected?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java:
##########
@@ -113,11 +131,81 @@ public TypeReference<Collection<DataSegment>> 
getReturnTypeReference()
 
   @Override
   public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+  {
+    // The DruidInputSource can be used to read from one datasource and write 
to another.
+    // In such a case, the race condition described in the class-level docs 
cannot occur,
+    // and the action can simply fetch all visible segments for the datasource 
and interval.
+    // Similarly, an MSQ replace could read from a different datasource.
+    if (!task.getDataSource().equals(dataSource)) {
+      return retrieveUsedSegments(toolbox);
+    }
+
+    final String supervisorId;
+    if (task instanceof AbstractBatchSubtask) {
+      supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
+    } else {
+      supervisorId = task.getId();
+    }
+
+    final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+        .getTaskLockbox()
+        .getAllReplaceLocksForDatasource(task.getDataSource())
+        .stream()
+        .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId()))
+        .collect(Collectors.toSet());
+
+    // If there are no replace locks for the task, simply fetch all visible 
segments for the interval
+    if (replaceLocksForTask.isEmpty()) {
+      return retrieveUsedSegments(toolbox);
+    }
+
+    Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments = 
new HashMap<>();
+    for (Pair<DataSegment, String> segmentAndCreatedDate :
+        
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource,
 intervals)) {
+      final DataSegment segment = segmentAndCreatedDate.lhs;
+      final String created = segmentAndCreatedDate.rhs;
+      intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s -> 
new HashMap<>())
+                                 .computeIfAbsent(created, c -> new 
HashSet<>())
+                                 .add(segment);
+    }
+
+    Set<DataSegment> allSegmentsToBeReplaced = new HashSet<>();
+    for (final Map.Entry<Interval, Map<String, Set<DataSegment>>> entry : 
intervalToCreatedToSegments.entrySet()) {
+      final Interval segmentInterval = entry.getKey();
+      String lockVersion = null;
+      for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
+        if (replaceLock.getInterval().contains(segmentInterval)) {
+          lockVersion = replaceLock.getVersion();
+        }
+      }
+      final Map<String, Set<DataSegment>> createdToSegmentsMap = 
entry.getValue();
+      for (Map.Entry<String, Set<DataSegment>> createdAndSegments : 
createdToSegmentsMap.entrySet()) {
+        if (lockVersion == null || 
lockVersion.compareTo(createdAndSegments.getKey()) > 0) {
+          allSegmentsToBeReplaced.addAll(createdAndSegments.getValue());
+        } else {
+          for (DataSegment segment : createdAndSegments.getValue()) {
+            log.info("Ignoring segment[%s] as it has created_date[%s] greater 
than the REPLACE lock version[%s]",

Review Comment:
   Instead of logging all segment IDs separately, add them to a set and just 
log once.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java:
##########
@@ -113,11 +131,81 @@ public TypeReference<Collection<DataSegment>> 
getReturnTypeReference()
 
   @Override
   public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+  {
+    // The DruidInputSource can be used to read from one datasource and write 
to another.
+    // In such a case, the race condition described in the class-level docs 
cannot occur,
+    // and the action can simply fetch all visible segments for the datasource 
and interval.
+    // Similarly, an MSQ replace could read from a different datasource.
+    if (!task.getDataSource().equals(dataSource)) {
+      return retrieveUsedSegments(toolbox);
+    }
+
+    final String supervisorId;
+    if (task instanceof AbstractBatchSubtask) {
+      supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
+    } else {
+      supervisorId = task.getId();
+    }

Review Comment:
   For later, can we confirm if this logic is really needed? I think the task 
action is always fired using the supervisor task ID.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java:
##########
@@ -113,11 +131,81 @@ public TypeReference<Collection<DataSegment>> 
getReturnTypeReference()
 
   @Override
   public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+  {
+    // The DruidInputSource can be used to read from one datasource and write 
to another.
+    // In such a case, the race condition described in the class-level docs 
cannot occur,
+    // and the action can simply fetch all visible segments for the datasource 
and interval.
+    // Similarly, an MSQ replace could read from a different datasource.
+    if (!task.getDataSource().equals(dataSource)) {
+      return retrieveUsedSegments(toolbox);
+    }
+
+    final String supervisorId;
+    if (task instanceof AbstractBatchSubtask) {
+      supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
+    } else {
+      supervisorId = task.getId();
+    }
+
+    final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+        .getTaskLockbox()
+        .getAllReplaceLocksForDatasource(task.getDataSource())
+        .stream()
+        .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId()))
+        .collect(Collectors.toSet());
+
+    // If there are no replace locks for the task, simply fetch all visible 
segments for the interval
+    if (replaceLocksForTask.isEmpty()) {
+      return retrieveUsedSegments(toolbox);
+    }
+
+    Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments = 
new HashMap<>();
+    for (Pair<DataSegment, String> segmentAndCreatedDate :
+        
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource,
 intervals)) {
+      final DataSegment segment = segmentAndCreatedDate.lhs;
+      final String created = segmentAndCreatedDate.rhs;

Review Comment:
   ```suggestion
         final String createdDate = segmentAndCreatedDate.rhs;
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java:
##########
@@ -113,11 +131,81 @@ public TypeReference<Collection<DataSegment>> 
getReturnTypeReference()
 
   @Override
   public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+  {
+    // The DruidInputSource can be used to read from one datasource and write 
to another.
+    // In such a case, the race condition described in the class-level docs 
cannot occur,
+    // and the action can simply fetch all visible segments for the datasource 
and interval.
+    // Similarly, an MSQ replace could read from a different datasource.

Review Comment:
   The race condition is always a possibility even when reading from a 
different datasource. There are really two possibilities here:
   - There is no way to return a consistent set if this action is fired 
multiple times during the lifecycle of the task because the task does not have 
any lock on the other datasource.
   - OR we do not care about the race condition because we read the segments 
just once.
   
   Based on which of the above it really is, we should simplify the comment to 
something like:
   ```suggestion
       // When fetching segments for a datasource other than the one this task 
is writing to,
       // just return all visible segments as there is no way to ensure that 
the set of returned
       // segments is consistent throughout the lifecycle of this task
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -1163,7 +1163,9 @@ static class SegmentProvider
     List<DataSegment> findSegments(TaskActionClient actionClient) throws 
IOException
     {
       return new ArrayList<>(
-          actionClient.submit(new RetrieveUsedSegmentsAction(dataSource, 
interval, null, Segments.ONLY_VISIBLE))
+          actionClient.submit(
+              new RetrieveUsedSegmentsAction(dataSource, null, 
ImmutableList.of(interval), Segments.ONLY_VISIBLE)

Review Comment:
   Maybe add another constructor (or static method) to 
`RetrieveUsedSegmentsAction` which doesn't require passing the null `interval` 
and the visibility as it seems to be `ONLY_VISIBLE` in most cases.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1233,10 +1234,17 @@ private DataSegmentTimelineView 
makeDataSegmentTimelineView()
       // any segment created after the lock was acquired for its interval will 
not be considered.
       final Collection<DataSegment> publishedUsedSegments;
       try {
-        publishedUsedSegments = context.taskActionClient().submit(new 
RetrieveSegmentsToReplaceAction(
-            dataSource,
-            intervals
-        ));
+        // Additional check as the task action does not accept empty intervals
+        if (!intervals.isEmpty()) {

Review Comment:
   Please invert the condition:
   ```
   if (intervals.isEmpty) {
      published = empty;
   } else {
      // fire task action here
   }
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java:
##########
@@ -113,11 +131,81 @@ public TypeReference<Collection<DataSegment>> 
getReturnTypeReference()
 
   @Override
   public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+  {
+    // The DruidInputSource can be used to read from one datasource and write 
to another.
+    // In such a case, the race condition described in the class-level docs 
cannot occur,
+    // and the action can simply fetch all visible segments for the datasource 
and interval.
+    // Similarly, an MSQ replace could read from a different datasource.
+    if (!task.getDataSource().equals(dataSource)) {
+      return retrieveUsedSegments(toolbox);
+    }
+
+    final String supervisorId;
+    if (task instanceof AbstractBatchSubtask) {
+      supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
+    } else {
+      supervisorId = task.getId();
+    }
+
+    final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+        .getTaskLockbox()
+        .getAllReplaceLocksForDatasource(task.getDataSource())
+        .stream()
+        .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId()))
+        .collect(Collectors.toSet());
+
+    // If there are no replace locks for the task, simply fetch all visible 
segments for the interval
+    if (replaceLocksForTask.isEmpty()) {
+      return retrieveUsedSegments(toolbox);
+    }
+
+    Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments = 
new HashMap<>();
+    for (Pair<DataSegment, String> segmentAndCreatedDate :
+        
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource,
 intervals)) {
+      final DataSegment segment = segmentAndCreatedDate.lhs;
+      final String created = segmentAndCreatedDate.rhs;
+      intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s -> 
new HashMap<>())
+                                 .computeIfAbsent(created, c -> new 
HashSet<>())
+                                 .add(segment);
+    }
+
+    Set<DataSegment> allSegmentsToBeReplaced = new HashSet<>();
+    for (final Map.Entry<Interval, Map<String, Set<DataSegment>>> entry : 
intervalToCreatedToSegments.entrySet()) {
+      final Interval segmentInterval = entry.getKey();
+      String lockVersion = null;
+      for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
+        if (replaceLock.getInterval().contains(segmentInterval)) {
+          lockVersion = replaceLock.getVersion();
+        }
+      }
+      final Map<String, Set<DataSegment>> createdToSegmentsMap = 
entry.getValue();
+      for (Map.Entry<String, Set<DataSegment>> createdAndSegments : 
createdToSegmentsMap.entrySet()) {
+        if (lockVersion == null || 
lockVersion.compareTo(createdAndSegments.getKey()) > 0) {
+          allSegmentsToBeReplaced.addAll(createdAndSegments.getValue());
+        } else {
+          for (DataSegment segment : createdAndSegments.getValue()) {
+            log.info("Ignoring segment[%s] as it has created_date[%s] greater 
than the REPLACE lock version[%s]",
+                     segment.getId(), createdAndSegments.getKey(), 
lockVersion);
+          }
+        }
+      }
+    }
+
+    if (visibility == Segments.ONLY_VISIBLE) {
+      return SegmentTimeline.forSegments(allSegmentsToBeReplaced)
+                            
.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, 
Partitions.ONLY_COMPLETE);
+    } else {
+      return allSegmentsToBeReplaced;
+    }
+  }
+
+  private Collection<DataSegment> retrieveUsedSegments(TaskActionToolbox 
toolbox)
   {
     return toolbox.getIndexerMetadataStorageCoordinator()
                   .retrieveUsedSegmentsForIntervals(dataSource, intervals, 
visibility);
   }
 
+

Review Comment:
   Nit: extra line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to