kfaraz commented on code in PR #15699:
URL: https://github.com/apache/druid/pull/15699#discussion_r1460402921
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java:
##########
@@ -26,29 +26,47 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.druid.indexing.common.task.Task;
+import
org.apache.druid.indexing.common.task.batch.parallel.AbstractBatchSubtask;
import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.Partitions;
+import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
/**
* This TaskAction returns a collection of segments which have data within the
specified intervals and are marked as
* used.
+ * If the task holds REPLACE locks and the datasource being read is also the
one being replaced,
+ * fetch only those segments for the interval that were created before its
REPLACE lock's version.
+ * This change is needed to ensure that the input set of segments is always
consistent for a replacing task
+ * when concurrent appending tasks append segments.
Review Comment:
```suggestion
* If the task holds REPLACE locks and is writing back to the same
datasource,
* only segments that were created before the REPLACE lock was acquired are
returned.
* This ensures that the input set of segments for this replace task remains
consistent
* even when new data is appended by other concurrent tasks.
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java:
##########
@@ -113,11 +131,81 @@ public TypeReference<Collection<DataSegment>>
getReturnTypeReference()
@Override
public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+ {
+ // The DruidInputSource can be used to read from one datasource and write
to another.
+ // In such a case, the race condition described in the class-level docs
cannot occur,
+ // and the action can simply fetch all visible segments for the datasource
and interval.
+ // Similarly, an MSQ replace could read from a different datasource.
+ if (!task.getDataSource().equals(dataSource)) {
+ return retrieveUsedSegments(toolbox);
+ }
+
+ final String supervisorId;
+ if (task instanceof AbstractBatchSubtask) {
+ supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
+ } else {
+ supervisorId = task.getId();
+ }
+
+ final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+ .getTaskLockbox()
+ .getAllReplaceLocksForDatasource(task.getDataSource())
+ .stream()
+ .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId()))
+ .collect(Collectors.toSet());
+
+ // If there are no replace locks for the task, simply fetch all visible
segments for the interval
+ if (replaceLocksForTask.isEmpty()) {
+ return retrieveUsedSegments(toolbox);
+ }
+
+ Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments =
new HashMap<>();
+ for (Pair<DataSegment, String> segmentAndCreatedDate :
+
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource,
intervals)) {
+ final DataSegment segment = segmentAndCreatedDate.lhs;
+ final String created = segmentAndCreatedDate.rhs;
+ intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s ->
new HashMap<>())
+ .computeIfAbsent(created, c -> new
HashSet<>())
+ .add(segment);
+ }
+
+ Set<DataSegment> allSegmentsToBeReplaced = new HashSet<>();
+ for (final Map.Entry<Interval, Map<String, Set<DataSegment>>> entry :
intervalToCreatedToSegments.entrySet()) {
+ final Interval segmentInterval = entry.getKey();
+ String lockVersion = null;
+ for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
+ if (replaceLock.getInterval().contains(segmentInterval)) {
+ lockVersion = replaceLock.getVersion();
Review Comment:
Add a `break` after this line.
##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java:
##########
@@ -128,14 +126,6 @@ public <RetType> RetType submit(TaskAction<RetType>
taskAction)
.build()
).collect(Collectors.toSet());
}
- } else if (taskAction instanceof RetrieveSegmentsToReplaceAction) {
Review Comment:
Won't we need any special handling to ensure that the MSQ tests work as
expected?
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java:
##########
@@ -113,11 +131,81 @@ public TypeReference<Collection<DataSegment>>
getReturnTypeReference()
@Override
public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+ {
+ // The DruidInputSource can be used to read from one datasource and write
to another.
+ // In such a case, the race condition described in the class-level docs
cannot occur,
+ // and the action can simply fetch all visible segments for the datasource
and interval.
+ // Similarly, an MSQ replace could read from a different datasource.
+ if (!task.getDataSource().equals(dataSource)) {
+ return retrieveUsedSegments(toolbox);
+ }
+
+ final String supervisorId;
+ if (task instanceof AbstractBatchSubtask) {
+ supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
+ } else {
+ supervisorId = task.getId();
+ }
+
+ final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+ .getTaskLockbox()
+ .getAllReplaceLocksForDatasource(task.getDataSource())
+ .stream()
+ .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId()))
+ .collect(Collectors.toSet());
+
+ // If there are no replace locks for the task, simply fetch all visible
segments for the interval
+ if (replaceLocksForTask.isEmpty()) {
+ return retrieveUsedSegments(toolbox);
+ }
+
+ Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments =
new HashMap<>();
+ for (Pair<DataSegment, String> segmentAndCreatedDate :
+
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource,
intervals)) {
+ final DataSegment segment = segmentAndCreatedDate.lhs;
+ final String created = segmentAndCreatedDate.rhs;
+ intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s ->
new HashMap<>())
+ .computeIfAbsent(created, c -> new
HashSet<>())
+ .add(segment);
+ }
+
+ Set<DataSegment> allSegmentsToBeReplaced = new HashSet<>();
+ for (final Map.Entry<Interval, Map<String, Set<DataSegment>>> entry :
intervalToCreatedToSegments.entrySet()) {
+ final Interval segmentInterval = entry.getKey();
+ String lockVersion = null;
+ for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
+ if (replaceLock.getInterval().contains(segmentInterval)) {
+ lockVersion = replaceLock.getVersion();
+ }
+ }
+ final Map<String, Set<DataSegment>> createdToSegmentsMap =
entry.getValue();
+ for (Map.Entry<String, Set<DataSegment>> createdAndSegments :
createdToSegmentsMap.entrySet()) {
+ if (lockVersion == null ||
lockVersion.compareTo(createdAndSegments.getKey()) > 0) {
+ allSegmentsToBeReplaced.addAll(createdAndSegments.getValue());
+ } else {
+ for (DataSegment segment : createdAndSegments.getValue()) {
+ log.info("Ignoring segment[%s] as it has created_date[%s] greater
than the REPLACE lock version[%s]",
Review Comment:
Instead of logging all segment IDs separately, add them to a set and just
log once.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java:
##########
@@ -113,11 +131,81 @@ public TypeReference<Collection<DataSegment>>
getReturnTypeReference()
@Override
public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+ {
+ // The DruidInputSource can be used to read from one datasource and write
to another.
+ // In such a case, the race condition described in the class-level docs
cannot occur,
+ // and the action can simply fetch all visible segments for the datasource
and interval.
+ // Similarly, an MSQ replace could read from a different datasource.
+ if (!task.getDataSource().equals(dataSource)) {
+ return retrieveUsedSegments(toolbox);
+ }
+
+ final String supervisorId;
+ if (task instanceof AbstractBatchSubtask) {
+ supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
+ } else {
+ supervisorId = task.getId();
+ }
Review Comment:
For later, can we confirm if this logic is really needed? I think the task
action is always fired using the supervisor task ID.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java:
##########
@@ -113,11 +131,81 @@ public TypeReference<Collection<DataSegment>>
getReturnTypeReference()
@Override
public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+ {
+ // The DruidInputSource can be used to read from one datasource and write
to another.
+ // In such a case, the race condition described in the class-level docs
cannot occur,
+ // and the action can simply fetch all visible segments for the datasource
and interval.
+ // Similarly, an MSQ replace could read from a different datasource.
+ if (!task.getDataSource().equals(dataSource)) {
+ return retrieveUsedSegments(toolbox);
+ }
+
+ final String supervisorId;
+ if (task instanceof AbstractBatchSubtask) {
+ supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
+ } else {
+ supervisorId = task.getId();
+ }
+
+ final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+ .getTaskLockbox()
+ .getAllReplaceLocksForDatasource(task.getDataSource())
+ .stream()
+ .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId()))
+ .collect(Collectors.toSet());
+
+ // If there are no replace locks for the task, simply fetch all visible
segments for the interval
+ if (replaceLocksForTask.isEmpty()) {
+ return retrieveUsedSegments(toolbox);
+ }
+
+ Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments =
new HashMap<>();
+ for (Pair<DataSegment, String> segmentAndCreatedDate :
+
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource,
intervals)) {
+ final DataSegment segment = segmentAndCreatedDate.lhs;
+ final String created = segmentAndCreatedDate.rhs;
Review Comment:
```suggestion
final String createdDate = segmentAndCreatedDate.rhs;
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java:
##########
@@ -113,11 +131,81 @@ public TypeReference<Collection<DataSegment>>
getReturnTypeReference()
@Override
public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+ {
+ // The DruidInputSource can be used to read from one datasource and write
to another.
+ // In such a case, the race condition described in the class-level docs
cannot occur,
+ // and the action can simply fetch all visible segments for the datasource
and interval.
+ // Similarly, an MSQ replace could read from a different datasource.
Review Comment:
The race condition is always a possibility even when reading from a
different datasource. There are really two possibilities here:
- There is no way to return a consistent set if this action is fired
multiple times during the lifecycle of the task because the task does not have
any lock on the other datasource.
- OR we do not care about the race condition because we read the segments
just once.
Based on which of the above it really is, we should simplify the comment to
something like:
```suggestion
// When fetching segments for a datasource other than the one this task
is writing to,
// just return all visible segments as there is no way to ensure that
the set of returned
// segments is consistent throughout the lifecycle of this task
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -1163,7 +1163,9 @@ static class SegmentProvider
List<DataSegment> findSegments(TaskActionClient actionClient) throws
IOException
{
return new ArrayList<>(
- actionClient.submit(new RetrieveUsedSegmentsAction(dataSource,
interval, null, Segments.ONLY_VISIBLE))
+ actionClient.submit(
+ new RetrieveUsedSegmentsAction(dataSource, null,
ImmutableList.of(interval), Segments.ONLY_VISIBLE)
Review Comment:
Maybe add another constructor (or static method) to
`RetrieveUsedSegmentsAction` which doesn't require passing the null `interval`
and the visibility as it seems to be `ONLY_VISIBLE` in most cases.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1233,10 +1234,17 @@ private DataSegmentTimelineView
makeDataSegmentTimelineView()
// any segment created after the lock was acquired for its interval will
not be considered.
final Collection<DataSegment> publishedUsedSegments;
try {
- publishedUsedSegments = context.taskActionClient().submit(new
RetrieveSegmentsToReplaceAction(
- dataSource,
- intervals
- ));
+ // Additional check as the task action does not accept empty intervals
+ if (!intervals.isEmpty()) {
Review Comment:
Please invert the condition:
```
if (intervals.isEmpty) {
published = empty;
} else {
// fire task action here
}
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java:
##########
@@ -113,11 +131,81 @@ public TypeReference<Collection<DataSegment>>
getReturnTypeReference()
@Override
public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+ {
+ // The DruidInputSource can be used to read from one datasource and write
to another.
+ // In such a case, the race condition described in the class-level docs
cannot occur,
+ // and the action can simply fetch all visible segments for the datasource
and interval.
+ // Similarly, an MSQ replace could read from a different datasource.
+ if (!task.getDataSource().equals(dataSource)) {
+ return retrieveUsedSegments(toolbox);
+ }
+
+ final String supervisorId;
+ if (task instanceof AbstractBatchSubtask) {
+ supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
+ } else {
+ supervisorId = task.getId();
+ }
+
+ final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+ .getTaskLockbox()
+ .getAllReplaceLocksForDatasource(task.getDataSource())
+ .stream()
+ .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId()))
+ .collect(Collectors.toSet());
+
+ // If there are no replace locks for the task, simply fetch all visible
segments for the interval
+ if (replaceLocksForTask.isEmpty()) {
+ return retrieveUsedSegments(toolbox);
+ }
+
+ Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments =
new HashMap<>();
+ for (Pair<DataSegment, String> segmentAndCreatedDate :
+
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource,
intervals)) {
+ final DataSegment segment = segmentAndCreatedDate.lhs;
+ final String created = segmentAndCreatedDate.rhs;
+ intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s ->
new HashMap<>())
+ .computeIfAbsent(created, c -> new
HashSet<>())
+ .add(segment);
+ }
+
+ Set<DataSegment> allSegmentsToBeReplaced = new HashSet<>();
+ for (final Map.Entry<Interval, Map<String, Set<DataSegment>>> entry :
intervalToCreatedToSegments.entrySet()) {
+ final Interval segmentInterval = entry.getKey();
+ String lockVersion = null;
+ for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
+ if (replaceLock.getInterval().contains(segmentInterval)) {
+ lockVersion = replaceLock.getVersion();
+ }
+ }
+ final Map<String, Set<DataSegment>> createdToSegmentsMap =
entry.getValue();
+ for (Map.Entry<String, Set<DataSegment>> createdAndSegments :
createdToSegmentsMap.entrySet()) {
+ if (lockVersion == null ||
lockVersion.compareTo(createdAndSegments.getKey()) > 0) {
+ allSegmentsToBeReplaced.addAll(createdAndSegments.getValue());
+ } else {
+ for (DataSegment segment : createdAndSegments.getValue()) {
+ log.info("Ignoring segment[%s] as it has created_date[%s] greater
than the REPLACE lock version[%s]",
+ segment.getId(), createdAndSegments.getKey(),
lockVersion);
+ }
+ }
+ }
+ }
+
+ if (visibility == Segments.ONLY_VISIBLE) {
+ return SegmentTimeline.forSegments(allSegmentsToBeReplaced)
+
.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY,
Partitions.ONLY_COMPLETE);
+ } else {
+ return allSegmentsToBeReplaced;
+ }
+ }
+
+ private Collection<DataSegment> retrieveUsedSegments(TaskActionToolbox
toolbox)
{
return toolbox.getIndexerMetadataStorageCoordinator()
.retrieveUsedSegmentsForIntervals(dataSource, intervals,
visibility);
}
+
Review Comment:
Nit: extra line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]