AmatyaAvadhanula commented on code in PR #15699:
URL: https://github.com/apache/druid/pull/15699#discussion_r1461370742
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java:
##########
@@ -113,11 +131,81 @@ public TypeReference<Collection<DataSegment>>
getReturnTypeReference()
@Override
public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+ {
+ // The DruidInputSource can be used to read from one datasource and write
to another.
+ // In such a case, the race condition described in the class-level docs
cannot occur,
+ // and the action can simply fetch all visible segments for the datasource
and interval.
+ // Similarly, an MSQ replace could read from a different datasource.
+ if (!task.getDataSource().equals(dataSource)) {
+ return retrieveUsedSegments(toolbox);
+ }
+
+ final String supervisorId;
+ if (task instanceof AbstractBatchSubtask) {
+ supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
+ } else {
+ supervisorId = task.getId();
+ }
+
+ final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+ .getTaskLockbox()
+ .getAllReplaceLocksForDatasource(task.getDataSource())
+ .stream()
+ .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId()))
+ .collect(Collectors.toSet());
+
+ // If there are no replace locks for the task, simply fetch all visible
segments for the interval
+ if (replaceLocksForTask.isEmpty()) {
+ return retrieveUsedSegments(toolbox);
+ }
+
+ Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments =
new HashMap<>();
+ for (Pair<DataSegment, String> segmentAndCreatedDate :
+
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource,
intervals)) {
+ final DataSegment segment = segmentAndCreatedDate.lhs;
+ final String created = segmentAndCreatedDate.rhs;
+ intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s ->
new HashMap<>())
+ .computeIfAbsent(created, c -> new
HashSet<>())
+ .add(segment);
+ }
+
+ Set<DataSegment> allSegmentsToBeReplaced = new HashSet<>();
+ for (final Map.Entry<Interval, Map<String, Set<DataSegment>>> entry :
intervalToCreatedToSegments.entrySet()) {
+ final Interval segmentInterval = entry.getKey();
+ String lockVersion = null;
+ for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
+ if (replaceLock.getInterval().contains(segmentInterval)) {
+ lockVersion = replaceLock.getVersion();
+ }
+ }
+ final Map<String, Set<DataSegment>> createdToSegmentsMap =
entry.getValue();
+ for (Map.Entry<String, Set<DataSegment>> createdAndSegments :
createdToSegmentsMap.entrySet()) {
+ if (lockVersion == null ||
lockVersion.compareTo(createdAndSegments.getKey()) > 0) {
+ allSegmentsToBeReplaced.addAll(createdAndSegments.getValue());
+ } else {
+ for (DataSegment segment : createdAndSegments.getValue()) {
+ log.info("Ignoring segment[%s] as it has created_date[%s] greater
than the REPLACE lock version[%s]",
+ segment.getId(), createdAndSegments.getKey(),
lockVersion);
+ }
+ }
+ }
+ }
+
+ if (visibility == Segments.ONLY_VISIBLE) {
+ return SegmentTimeline.forSegments(allSegmentsToBeReplaced)
+
.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY,
Partitions.ONLY_COMPLETE);
+ } else {
+ return allSegmentsToBeReplaced;
+ }
+ }
+
+ private Collection<DataSegment> retrieveUsedSegments(TaskActionToolbox
toolbox)
{
return toolbox.getIndexerMetadataStorageCoordinator()
.retrieveUsedSegmentsForIntervals(dataSource, intervals,
visibility);
}
+
Review Comment:
Removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]