This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch 28.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/28.0.0 by this push:
     new 0c4c1cf647f A Replacing task must read segments created before it 
acquired its lock (#15085) (#15208)
0c4c1cf647f is described below

commit 0c4c1cf647fc438bf304335ed4df2bdcf41626bc
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Thu Oct 19 14:39:44 2023 +0530

    A Replacing task must read segments created before it acquired its lock 
(#15085) (#15208)
    
    * Replacing tasks must read segments created before they acquired their 
locks
---
 .../MaterializedViewSupervisor.java                |   3 +-
 .../actions/RetrieveSegmentsToReplaceAction.java   | 209 +++++++++++++++++++++
 .../druid/indexing/common/actions/TaskAction.java  |   1 +
 .../task/batch/parallel/AbstractBatchSubtask.java  |  16 +-
 .../parallel/PartialDimensionCardinalityTask.java  |  15 +-
 .../parallel/PartialDimensionDistributionTask.java |  15 +-
 .../parallel/PartialHashSegmentGenerateTask.java   |  12 +-
 .../parallel/PartialRangeSegmentGenerateTask.java  |  12 +-
 .../batch/parallel/PartialSegmentGenerateTask.java |   3 +-
 .../batch/parallel/PartialSegmentMergeTask.java    |  17 +-
 .../batch/parallel/PerfectRollupWorkerTask.java    |   5 +-
 .../task/batch/parallel/SinglePhaseSubTask.java    |  15 +-
 .../druid/indexing/input/DruidInputSource.java     |  12 +-
 .../parallel/PerfectRollupWorkerTaskTest.java      |   8 +-
 .../concurrent/ConcurrentReplaceAndAppendTest.java |  66 +++++++
 .../TestIndexerMetadataStorageCoordinator.java     |   2 +-
 .../IndexerMetadataStorageCoordinator.java         |   6 +-
 .../IndexerSQLMetadataStorageCoordinator.java      |  25 ++-
 .../druid/metadata/SqlSegmentsMetadataQuery.java   | 115 ++++++++----
 .../IndexerSQLMetadataStorageCoordinatorTest.java  |  46 +++++
 20 files changed, 478 insertions(+), 125 deletions(-)

diff --git 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index ac2738534da..d0a035be17c 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -40,6 +40,7 @@ import 
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
@@ -364,7 +365,7 @@ public class MaterializedViewSupervisor implements 
Supervisor
     // Pair<interval -> max(created_date), interval -> list<DataSegment>>
     Pair<Map<Interval, String>, Map<Interval, List<DataSegment>>> 
baseSegmentsSnapshot =
         getMaxCreateDateAndBaseSegments(
-            
metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(spec.getBaseDataSource())
+            
metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(spec.getBaseDataSource(),
 Intervals.ETERNITY)
         );
     // baseSegments are used to create HadoopIndexTask
     Map<Interval, List<DataSegment>> baseSegments = baseSegmentsSnapshot.rhs;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java
new file mode 100644
index 00000000000..78e6ada5c1e
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.indexing.common.task.Task;
+import 
org.apache.druid.indexing.common.task.batch.parallel.AbstractBatchSubtask;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.ReplaceTaskLock;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.Partitions;
+import org.apache.druid.timeline.SegmentTimeline;
+import org.joda.time.Interval;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This action exists in addition to retrieveUsedSegmentsAction because that 
action suffers
+ * from a race condition described by the following sequence of events:
+ *
+ * -Segments S1, S2, S3 exist
+ * -Compact acquires a replace lock
+ * -A concurrent appending job publishes a segment S4 which needs to be 
upgraded to the replace lock's version
+ * -Compact task processes S1-S4 to create new segments
+ * -Compact task publishes new segments and carries S4 forward to the new 
version
+ *
+ * This can lead to the data in S4 being duplicated
+ *
+ * This TaskAction returns a collection of segments which have data within the 
specified interval and are marked as
+ * used, and have been created before a REPLACE lock, if any, was acquired.
+ * This ensures that a consistent set of segments is returned each time this 
action is called
+ */
+public class RetrieveSegmentsToReplaceAction implements 
TaskAction<Collection<DataSegment>>
+{
+  private static final Logger log = new 
Logger(RetrieveSegmentsToReplaceAction.class);
+
+  @JsonIgnore
+  private final String dataSource;
+
+  @JsonIgnore
+  private final Interval interval;
+
+  @JsonCreator
+  public RetrieveSegmentsToReplaceAction(
+      @JsonProperty("dataSource") String dataSource,
+      @JsonProperty("interval") Interval interval
+  )
+  {
+    this.dataSource = dataSource;
+    this.interval = interval;
+  }
+
+  @JsonProperty
+  public String getDataSource()
+  {
+    return dataSource;
+  }
+
+  @JsonProperty
+  public Interval getInterval()
+  {
+    return interval;
+  }
+
+  @Override
+  public TypeReference<Collection<DataSegment>> getReturnTypeReference()
+  {
+    return new TypeReference<Collection<DataSegment>>() {};
+  }
+
+  @Override
+  public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+  {
+    // The DruidInputSource can be used to read from one datasource and write 
to another.
+    // In such a case, the race condition described in the class-level docs 
cannot occur,
+    // and the action can simply fetch all visible segments for the datasource 
and interval
+    if (!task.getDataSource().equals(dataSource)) {
+      return retrieveAllVisibleSegments(toolbox);
+    }
+
+    final String supervisorId;
+    if (task instanceof AbstractBatchSubtask) {
+      supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
+    } else {
+      supervisorId = task.getId();
+    }
+
+    final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+        .getTaskLockbox()
+        .getAllReplaceLocksForDatasource(task.getDataSource())
+        .stream()
+        .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId()))
+        .collect(Collectors.toSet());
+
+    // If there are no replace locks for the task, simply fetch all visible 
segments for the interval
+    if (replaceLocksForTask.isEmpty()) {
+      return retrieveAllVisibleSegments(toolbox);
+    }
+
+    Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments = 
new HashMap<>();
+    for (Pair<DataSegment, String> segmentAndCreatedDate :
+        
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource,
 interval)) {
+      final DataSegment segment = segmentAndCreatedDate.lhs;
+      final String created = segmentAndCreatedDate.rhs;
+      intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s -> 
new HashMap<>())
+                                 .computeIfAbsent(created, c -> new 
HashSet<>())
+                                 .add(segment);
+    }
+
+    Set<DataSegment> allSegmentsToBeReplaced = new HashSet<>();
+    for (final Map.Entry<Interval, Map<String, Set<DataSegment>>> entry : 
intervalToCreatedToSegments.entrySet()) {
+      final Interval segmentInterval = entry.getKey();
+      String lockVersion = null;
+      for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
+        if (replaceLock.getInterval().contains(segmentInterval)) {
+          lockVersion = replaceLock.getVersion();
+        }
+      }
+      final Map<String, Set<DataSegment>> createdToSegmentsMap = 
entry.getValue();
+      for (Map.Entry<String, Set<DataSegment>> createdAndSegments : 
createdToSegmentsMap.entrySet()) {
+        if (lockVersion == null || 
lockVersion.compareTo(createdAndSegments.getKey()) > 0) {
+          allSegmentsToBeReplaced.addAll(createdAndSegments.getValue());
+        } else {
+          for (DataSegment segment : createdAndSegments.getValue()) {
+            log.info("Ignoring segment[%s] as it has created_date[%s] greater 
than the REPLACE lock version[%s]",
+                     segment.getId(), createdAndSegments.getKey(), 
lockVersion);
+          }
+        }
+      }
+    }
+
+    return SegmentTimeline.forSegments(allSegmentsToBeReplaced)
+                          
.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, 
Partitions.ONLY_COMPLETE);
+  }
+
+  private Collection<DataSegment> retrieveAllVisibleSegments(TaskActionToolbox 
toolbox)
+  {
+    return toolbox.getIndexerMetadataStorageCoordinator()
+                  .retrieveUsedSegmentsForInterval(dataSource, interval, 
Segments.ONLY_VISIBLE);
+  }
+
+  @Override
+  public boolean isAudited()
+  {
+    return false;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    RetrieveSegmentsToReplaceAction that = (RetrieveSegmentsToReplaceAction) o;
+
+    if (!dataSource.equals(that.dataSource)) {
+      return false;
+    }
+    return interval.equals(that.interval);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(dataSource, interval);
+  }
+
+  @Override
+  public String toString()
+  {
+    return getClass().getSimpleName() + "{" +
+           "dataSource='" + dataSource + '\'' +
+           ", interval=" + interval +
+           '}';
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
index 171d53b9cdd..e251626f869 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
@@ -38,6 +38,7 @@ import java.util.concurrent.Future;
     @JsonSubTypes.Type(name = "segmentTransactionalInsert", value = 
SegmentTransactionalInsertAction.class),
     @JsonSubTypes.Type(name = "segmentTransactionalAppend", value = 
SegmentTransactionalAppendAction.class),
     @JsonSubTypes.Type(name = "segmentTransactionalReplace", value = 
SegmentTransactionalReplaceAction.class),
+    @JsonSubTypes.Type(name = "retrieveSegmentsToReplace", value = 
RetrieveSegmentsToReplaceAction.class),
     // Type name doesn't correspond to the name of the class for backward 
compatibility.
     @JsonSubTypes.Type(name = "segmentListUsed", value = 
RetrieveUsedSegmentsAction.class),
     // Type name doesn't correspond to the name of the class for backward 
compatibility.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java
index 37b70c53ed5..e6de0cf2594 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
 import org.apache.druid.indexing.common.task.TaskResource;
 
@@ -29,16 +30,20 @@ import java.util.Map;
 public abstract class AbstractBatchSubtask extends AbstractBatchIndexTask
 {
 
+  private final String supervisorTaskId;
+
   protected AbstractBatchSubtask(
       String id,
       @Nullable String groupId,
       @Nullable TaskResource taskResource,
       String dataSource,
       @Nullable Map<String, Object> context,
-      @Nonnull IngestionMode ingestionMode
+      @Nonnull IngestionMode ingestionMode,
+      @Nonnull String supervisorTaskId
   )
   {
     super(id, groupId, taskResource, dataSource, context, -1, ingestionMode);
+    this.supervisorTaskId = supervisorTaskId;
   }
 
   /**
@@ -46,4 +51,13 @@ public abstract class AbstractBatchSubtask extends 
AbstractBatchIndexTask
    * This ID is used to identify duplicate work of retry tasks for the same 
spec.
    */
   public abstract String getSubtaskSpecId();
+
+  /**
+   * @return Task ID of the {@code ParallelIndexSupervisorTask} which launched 
this sub-task.
+   */
+  @JsonProperty
+  public String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
index 625b2051783..698f8c9d0e9 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
@@ -69,7 +69,6 @@ public class PartialDimensionCardinalityTask extends 
PerfectRollupWorkerTask
 
   private final int numAttempts;
   private final ParallelIndexIngestionSpec ingestionSchema;
-  private final String supervisorTaskId;
   private final String subtaskSpecId;
 
   private final ObjectMapper jsonMapper;
@@ -95,7 +94,8 @@ public class PartialDimensionCardinalityTask extends 
PerfectRollupWorkerTask
         taskResource,
         ingestionSchema.getDataSchema(),
         ingestionSchema.getTuningConfig(),
-        context
+        context,
+        supervisorTaskId
     );
 
     Preconditions.checkArgument(
@@ -107,7 +107,6 @@ public class PartialDimensionCardinalityTask extends 
PerfectRollupWorkerTask
     this.subtaskSpecId = subtaskSpecId;
     this.numAttempts = numAttempts;
     this.ingestionSchema = ingestionSchema;
-    this.supervisorTaskId = supervisorTaskId;
     this.jsonMapper = jsonMapper;
   }
 
@@ -123,12 +122,6 @@ public class PartialDimensionCardinalityTask extends 
PerfectRollupWorkerTask
     return ingestionSchema;
   }
 
-  @JsonProperty
-  private String getSupervisorTaskId()
-  {
-    return supervisorTaskId;
-  }
-
   @JsonProperty
   @Override
   public String getSubtaskSpecId()
@@ -163,7 +156,7 @@ public class PartialDimensionCardinalityTask extends 
PerfectRollupWorkerTask
   {
     if 
(!getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals().isEmpty())
 {
       return tryTimeChunkLock(
-          new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
+          new SurrogateTaskActionClient(getSupervisorTaskId(), 
taskActionClient),
           
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
       );
     } else {
@@ -274,7 +267,7 @@ public class PartialDimensionCardinalityTask extends 
PerfectRollupWorkerTask
   {
     final ParallelIndexSupervisorTaskClient taskClient =
         toolbox.getSupervisorTaskClientProvider().build(
-            supervisorTaskId,
+            getSupervisorTaskId(),
             ingestionSchema.getTuningConfig().getChatHandlerTimeout(),
             ingestionSchema.getTuningConfig().getChatHandlerNumRetries()
         );
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
index b2ecd3dc269..8f03c3bfa55 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
@@ -82,7 +82,6 @@ public class PartialDimensionDistributionTask extends 
PerfectRollupWorkerTask
 
   private final int numAttempts;
   private final ParallelIndexIngestionSpec ingestionSchema;
-  private final String supervisorTaskId;
   private final String subtaskSpecId;
 
   // For testing
@@ -136,7 +135,8 @@ public class PartialDimensionDistributionTask extends 
PerfectRollupWorkerTask
         taskResource,
         ingestionSchema.getDataSchema(),
         ingestionSchema.getTuningConfig(),
-        context
+        context,
+        supervisorTaskId
     );
 
     Preconditions.checkArgument(
@@ -148,7 +148,6 @@ public class PartialDimensionDistributionTask extends 
PerfectRollupWorkerTask
     this.subtaskSpecId = subtaskSpecId;
     this.numAttempts = numAttempts;
     this.ingestionSchema = ingestionSchema;
-    this.supervisorTaskId = supervisorTaskId;
     this.dedupInputRowFilterSupplier = dedupRowDimValueFilterSupplier;
   }
 
@@ -164,12 +163,6 @@ public class PartialDimensionDistributionTask extends 
PerfectRollupWorkerTask
     return ingestionSchema;
   }
 
-  @JsonProperty
-  private String getSupervisorTaskId()
-  {
-    return supervisorTaskId;
-  }
-
   @JsonProperty
   @Override
   public String getSubtaskSpecId()
@@ -204,7 +197,7 @@ public class PartialDimensionDistributionTask extends 
PerfectRollupWorkerTask
   {
     if 
(!getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals().isEmpty())
 {
       return tryTimeChunkLock(
-          new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
+          new SurrogateTaskActionClient(getSupervisorTaskId(), 
taskActionClient),
           
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
       );
     } else {
@@ -326,7 +319,7 @@ public class PartialDimensionDistributionTask extends 
PerfectRollupWorkerTask
   private void sendReport(TaskToolbox toolbox, DimensionDistributionReport 
report)
   {
     final ParallelIndexSupervisorTaskClient taskClient = 
toolbox.getSupervisorTaskClientProvider().build(
-        supervisorTaskId,
+        getSupervisorTaskId(),
         ingestionSchema.getTuningConfig().getChatHandlerTimeout(),
         ingestionSchema.getTuningConfig().getChatHandlerNumRetries()
     );
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
index b91a6ce3a82..49e3591ff18 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
@@ -63,7 +63,6 @@ public class PartialHashSegmentGenerateTask extends 
PartialSegmentGenerateTask<G
 
   private final int numAttempts;
   private final ParallelIndexIngestionSpec ingestionSchema;
-  private final String supervisorTaskId;
   private final String subtaskSpecId;
   @Nullable
   private final Map<Interval, Integer> intervalToNumShardsOverride;
@@ -96,7 +95,6 @@ public class PartialHashSegmentGenerateTask extends 
PartialSegmentGenerateTask<G
     this.subtaskSpecId = subtaskSpecId;
     this.numAttempts = numAttempts;
     this.ingestionSchema = ingestionSchema;
-    this.supervisorTaskId = supervisorTaskId;
     this.intervalToNumShardsOverride = intervalToNumShardsOverride;
   }
 
@@ -112,12 +110,6 @@ public class PartialHashSegmentGenerateTask extends 
PartialSegmentGenerateTask<G
     return ingestionSchema;
   }
 
-  @JsonProperty
-  public String getSupervisorTaskId()
-  {
-    return supervisorTaskId;
-  }
-
   @JsonProperty
   @Override
   public String getSubtaskSpecId()
@@ -158,7 +150,7 @@ public class PartialHashSegmentGenerateTask extends 
PartialSegmentGenerateTask<G
   public boolean isReady(TaskActionClient taskActionClient) throws Exception
   {
     return tryTimeChunkLock(
-        new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
+        new SurrogateTaskActionClient(getSupervisorTaskId(), taskActionClient),
         
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
     );
   }
@@ -175,7 +167,7 @@ public class PartialHashSegmentGenerateTask extends 
PartialSegmentGenerateTask<G
         getDataSource(),
         getSubtaskSpecId(),
         granularitySpec,
-        new SupervisorTaskAccess(supervisorTaskId, taskClient),
+        new SupervisorTaskAccess(getSupervisorTaskId(), taskClient),
         createHashPartitionAnalysisFromPartitionsSpec(
             granularitySpec,
             partitionsSpec,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
index 147a1fbf121..27604eb7e77 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
@@ -64,7 +64,6 @@ public class PartialRangeSegmentGenerateTask extends 
PartialSegmentGenerateTask<
   private static final String PROP_SPEC = "spec";
   private static final boolean SKIP_NULL = true;
 
-  private final String supervisorTaskId;
   private final String subtaskSpecId;
   private final int numAttempts;
   private final ParallelIndexIngestionSpec ingestionSchema;
@@ -98,7 +97,6 @@ public class PartialRangeSegmentGenerateTask extends 
PartialSegmentGenerateTask<
     this.subtaskSpecId = subtaskSpecId;
     this.numAttempts = numAttempts;
     this.ingestionSchema = ingestionSchema;
-    this.supervisorTaskId = supervisorTaskId;
     this.intervalToPartitions = intervalToPartitions;
   }
 
@@ -131,12 +129,6 @@ public class PartialRangeSegmentGenerateTask extends 
PartialSegmentGenerateTask<
     return ingestionSchema;
   }
 
-  @JsonProperty
-  public String getSupervisorTaskId()
-  {
-    return supervisorTaskId;
-  }
-
   @JsonProperty
   @Override
   public String getSubtaskSpecId()
@@ -176,7 +168,7 @@ public class PartialRangeSegmentGenerateTask extends 
PartialSegmentGenerateTask<
   public boolean isReady(TaskActionClient taskActionClient) throws IOException
   {
     return tryTimeChunkLock(
-        new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
+        new SurrogateTaskActionClient(getSupervisorTaskId(), taskActionClient),
         
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
     );
   }
@@ -194,7 +186,7 @@ public class PartialRangeSegmentGenerateTask extends 
PartialSegmentGenerateTask<
         getDataSource(),
         getSubtaskSpecId(),
         ingestionSchema.getDataSchema().getGranularitySpec(),
-        new SupervisorTaskAccess(supervisorTaskId, taskClient),
+        new SupervisorTaskAccess(getSupervisorTaskId(), taskClient),
         partitionAnalysis
     );
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
index dc2a7ef5bf9..e20c7bdbe35 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
@@ -94,7 +94,8 @@ abstract class PartialSegmentGenerateTask<T extends 
GeneratedPartitionsReport> e
         taskResource,
         ingestionSchema.getDataSchema(),
         ingestionSchema.getTuningConfig(),
-        context
+        context,
+        supervisorTaskId
     );
 
     Preconditions.checkArgument(
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
index bb933169d4b..b59ec65716c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
@@ -78,7 +78,6 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> 
extends PerfectRollu
 
   private final PartialSegmentMergeIOConfig ioConfig;
   private final int numAttempts;
-  private final String supervisorTaskId;
   private final String subtaskSpecId;
 
   PartialSegmentMergeTask(
@@ -101,7 +100,8 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> 
extends PerfectRollu
         taskResource,
         dataSchema,
         tuningConfig,
-        context
+        context,
+        supervisorTaskId
     );
 
     Preconditions.checkArgument(
@@ -111,7 +111,6 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> 
extends PerfectRollu
     this.subtaskSpecId = subtaskSpecId;
     this.ioConfig = ioConfig;
     this.numAttempts = numAttempts;
-    this.supervisorTaskId = supervisorTaskId;
   }
 
   @JsonProperty
@@ -120,12 +119,6 @@ abstract class PartialSegmentMergeTask<S extends 
ShardSpec> extends PerfectRollu
     return numAttempts;
   }
 
-  @JsonProperty
-  public String getSupervisorTaskId()
-  {
-    return supervisorTaskId;
-  }
-
   @JsonProperty
   @Override
   public String getSubtaskSpecId()
@@ -151,7 +144,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> 
extends PerfectRollu
     }
 
     final List<TaskLock> locks = toolbox.getTaskActionClient().submit(
-        new SurrogateAction<>(supervisorTaskId, new LockListAction())
+        new SurrogateAction<>(getSupervisorTaskId(), new LockListAction())
     );
     final Map<Interval, String> intervalToVersion = 
Maps.newHashMapWithExpectedSize(locks.size());
     locks.forEach(lock -> {
@@ -179,7 +172,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> 
extends PerfectRollu
     LOG.info("Fetch took [%s] seconds", fetchTime);
 
     final ParallelIndexSupervisorTaskClient taskClient = 
toolbox.getSupervisorTaskClientProvider().build(
-        supervisorTaskId,
+        getSupervisorTaskId(),
         getTuningConfig().getChatHandlerTimeout(),
         getTuningConfig().getChatHandlerNumRetries()
     );
@@ -225,7 +218,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> 
extends PerfectRollu
         );
         FileUtils.mkdirp(partitionDir);
         for (PartitionLocation location : entryPerBucketId.getValue()) {
-          final File unzippedDir = 
toolbox.getShuffleClient().fetchSegmentFile(partitionDir, supervisorTaskId, 
location);
+          final File unzippedDir = 
toolbox.getShuffleClient().fetchSegmentFile(partitionDir, 
getSupervisorTaskId(), location);
           intervalToUnzippedFiles.computeIfAbsent(interval, k -> new 
Int2ObjectOpenHashMap<>())
               .computeIfAbsent(bucketId, k -> new ArrayList<>())
               .add(unzippedDir);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
index 4259922b43a..3b00f0fedf6 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
@@ -49,10 +49,11 @@ abstract class PerfectRollupWorkerTask extends 
AbstractBatchSubtask
       @Nullable TaskResource taskResource,
       DataSchema dataSchema,
       ParallelIndexTuningConfig tuningConfig,
-      @Nullable Map<String, Object> context
+      @Nullable Map<String, Object> context,
+      String supervisorTaskId
   )
   {
-    super(id, groupId, taskResource, dataSchema.getDataSource(), context, 
IngestionMode.NONE);
+    super(id, groupId, taskResource, dataSchema.getDataSource(), context, 
IngestionMode.NONE, supervisorTaskId);
 
     Preconditions.checkArgument(
         tuningConfig.isForceGuaranteedRollup(),
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 183cde7c66d..a3bd47d2960 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -118,7 +118,6 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
 
   private final int numAttempts;
   private final ParallelIndexIngestionSpec ingestionSchema;
-  private final String supervisorTaskId;
   private final String subtaskSpecId;
 
   /**
@@ -169,7 +168,8 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
         taskResource,
         ingestionSchema.getDataSchema().getDataSource(),
         context,
-        AbstractTask.computeBatchIngestionMode(ingestionSchema.getIOConfig())
+        AbstractTask.computeBatchIngestionMode(ingestionSchema.getIOConfig()),
+        supervisorTaskId
     );
 
     if (ingestionSchema.getTuningConfig().isForceGuaranteedRollup()) {
@@ -179,7 +179,6 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
     this.subtaskSpecId = subtaskSpecId;
     this.numAttempts = numAttempts;
     this.ingestionSchema = ingestionSchema;
-    this.supervisorTaskId = supervisorTaskId;
     this.missingIntervalsInOverwriteMode = 
ingestionSchema.getIOConfig().isAppendToExisting() != true
                                            && ingestionSchema.getDataSchema()
                                                              
.getGranularitySpec()
@@ -217,7 +216,7 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
   public boolean isReady(TaskActionClient taskActionClient) throws IOException
   {
     return determineLockGranularityAndTryLock(
-        new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
+        new SurrogateTaskActionClient(getSupervisorTaskId(), taskActionClient),
         ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals()
     );
   }
@@ -234,12 +233,6 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
     return ingestionSchema;
   }
 
-  @JsonProperty
-  public String getSupervisorTaskId()
-  {
-    return supervisorTaskId;
-  }
-
   @Override
   @JsonProperty
   public String getSubtaskSpecId()
@@ -272,7 +265,7 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
       final InputSource inputSource = 
ingestionSchema.getIOConfig().getNonNullInputSource(toolbox);
 
       final ParallelIndexSupervisorTaskClient taskClient = 
toolbox.getSupervisorTaskClientProvider().build(
-          supervisorTaskId,
+          getSupervisorTaskId(),
           ingestionSchema.getTuningConfig().getChatHandlerTimeout(),
           ingestionSchema.getTuningConfig().getChatHandlerNumRetries()
       );
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
index bf8b4bfb1d5..8056c69901f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
@@ -48,10 +48,9 @@ import 
org.apache.druid.data.input.impl.SplittableInputSource;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
+import 
org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.firehose.WindowedSegmentId;
-import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.guava.Comparators;
@@ -552,14 +551,7 @@ public class DruidInputSource extends AbstractInputSource 
implements SplittableI
     } else {
       try {
         usedSegments = toolbox.getTaskActionClient()
-                              .submit(
-                                  new RetrieveUsedSegmentsAction(
-                                      dataSource,
-                                      null,
-                                      Collections.singletonList(interval),
-                                      Segments.ONLY_VISIBLE
-                                  )
-                              );
+                              .submit(new 
RetrieveSegmentsToReplaceAction(dataSource, interval));
       }
       catch (IOException e) {
         LOG.error(e, "Error retrieving the used segments for interval[%s].", 
interval);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
index 71b474e7735..b95fd53c74f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
@@ -111,7 +111,8 @@ public class PerfectRollupWorkerTaskTest
           null,
           createDataSchema(granularitySpecInputIntervals),
           createTuningConfig(forceGuaranteedRollup, partitionsSpec),
-          null
+          null,
+          "supervisor-id"
       );
     }
 
@@ -149,10 +150,11 @@ public class PerfectRollupWorkerTaskTest
         @Nullable TaskResource taskResource,
         DataSchema dataSchema,
         ParallelIndexTuningConfig tuningConfig,
-        @Nullable Map<String, Object> context
+        @Nullable Map<String, Object> context,
+        String supervisorId
     )
     {
-      super(id, groupId, taskResource, dataSchema, tuningConfig, context);
+      super(id, groupId, taskResource, dataSchema, tuningConfig, context, 
supervisorId);
     }
 
     @Override
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index 22f21fb79b6..1c4b6809c38 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -27,6 +27,7 @@ import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskStorageDirTracker;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
+import 
org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction;
 import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
@@ -845,6 +846,54 @@ public class ConcurrentReplaceAndAppendTest extends 
IngestionTestBase
     verifyIntervalHasVisibleSegments(YEAR_23, segmentV20, segmentV21, 
segmentV22, segmentV23);
   }
 
+  @Test
+  public void testSegmentsToReplace()
+  {
+    final SegmentIdWithShardSpec pendingSegmentV01
+        = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), 
Granularities.MONTH);
+    Assert.assertEquals(SEGMENT_V0, pendingSegmentV01.getVersion());
+    Assert.assertEquals(JAN_23, pendingSegmentV01.getInterval());
+    final DataSegment segment1 = asSegment(pendingSegmentV01);
+    appendTask.commitAppendSegments(segment1);
+
+    final SegmentIdWithShardSpec pendingSegmentV02
+        = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), 
Granularities.MONTH);
+    Assert.assertNotEquals(pendingSegmentV01.asSegmentId(), 
pendingSegmentV02.asSegmentId());
+    Assert.assertEquals(SEGMENT_V0, pendingSegmentV02.getVersion());
+    Assert.assertEquals(JAN_23, pendingSegmentV02.getInterval());
+
+    verifyInputSegments(replaceTask, JAN_23, segment1);
+
+    replaceTask.acquireReplaceLockOn(JAN_23);
+
+    final DataSegment segment2 = asSegment(pendingSegmentV02);
+    appendTask.commitAppendSegments(segment2);
+
+    // Despite segment2 existing, it is not chosen to be replaced because it 
was created after the tasklock was acquired
+    verifyInputSegments(replaceTask, JAN_23, segment1);
+
+    replaceTask.releaseLock(JAN_23);
+
+    final SegmentIdWithShardSpec pendingSegmentV03
+        = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), 
Granularities.MONTH);
+    Assert.assertNotEquals(pendingSegmentV01.asSegmentId(), 
pendingSegmentV03.asSegmentId());
+    Assert.assertNotEquals(pendingSegmentV02.asSegmentId(), 
pendingSegmentV03.asSegmentId());
+    Assert.assertEquals(SEGMENT_V0, pendingSegmentV03.getVersion());
+    Assert.assertEquals(JAN_23, pendingSegmentV03.getInterval());
+    final DataSegment segment3 = asSegment(pendingSegmentV03);
+    appendTask.commitAppendSegments(segment3);
+    appendTask.releaseLock(JAN_23);
+
+    replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23);
+    // The new lock was acquired before segment3 was created but it doesn't 
contain the month's interval
+    // So, all three segments are chosen
+    verifyInputSegments(replaceTask, JAN_23, segment1, segment2, segment3);
+
+    replaceTask.releaseLock(FIRST_OF_JAN_23);
+    // All the segments are chosen when there's no lock
+    verifyInputSegments(replaceTask, JAN_23, segment1, segment2, segment3);
+  }
+
   @Nullable
   private DataSegment findSegmentWith(String version, Map<String, Object> 
loadSpec, Set<DataSegment> segments)
   {
@@ -901,6 +950,23 @@ public class ConcurrentReplaceAndAppendTest extends 
IngestionTestBase
     }
   }
 
+  private void verifyInputSegments(Task task, Interval interval, 
DataSegment... expectedSegments)
+  {
+    try {
+      final TaskActionClient taskActionClient = 
taskActionClientFactory.create(task);
+      Collection<DataSegment> allUsedSegments = taskActionClient.submit(
+          new RetrieveSegmentsToReplaceAction(
+              WIKI,
+              interval
+          )
+      );
+      Assert.assertEquals(Sets.newHashSet(expectedSegments), 
Sets.newHashSet(allUsedSegments));
+    }
+    catch (IOException e) {
+      throw new ISE(e, "Error while fetching segments to replace in 
interval[%s]", interval);
+    }
+  }
+
   private TaskToolboxFactory createToolboxFactory(
       TaskConfig taskConfig,
       TaskActionClientFactory taskActionClientFactory
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index d1c72485011..108833422c8 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -89,7 +89,7 @@ public class TestIndexerMetadataStorageCoordinator implements 
IndexerMetadataSto
   }
 
   @Override
-  public List<Pair<DataSegment, String>> 
retrieveUsedSegmentsAndCreatedDates(String dataSource)
+  public List<Pair<DataSegment, String>> 
retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval)
   {
     return ImmutableList.of();
   }
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 2c2a6bc0f77..7c6710048a1 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -82,8 +82,9 @@ public interface IndexerMetadataStorageCoordinator
   Collection<DataSegment> retrieveAllUsedSegments(String dataSource, Segments 
visibility);
 
   /**
+   *
    * Retrieve all published segments which are marked as used and the 
created_date of these segments belonging to the
-   * given data source from the metadata store.
+   * given data source and interval from the metadata store.
    *
    * Unlike other similar methods in this interface, this method doesn't 
accept a {@link Segments} "visibility"
    * parameter. The returned collection may include overshadowed segments and 
their created_dates, as if {@link
@@ -91,10 +92,11 @@ public interface IndexerMetadataStorageCoordinator
    * if needed.
    *
    * @param dataSource The data source to query
+   * @param interval The interval to query
    *
    * @return The DataSegments and the related created_date of segments
    */
-  Collection<Pair<DataSegment, String>> 
retrieveUsedSegmentsAndCreatedDates(String dataSource);
+  Collection<Pair<DataSegment, String>> 
retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval);
 
   /**
    * Retrieve all published segments which may include any data in the given 
intervals and are marked as used from the
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 226663c3233..c654d5e229b 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -174,15 +174,34 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   }
 
   @Override
-  public List<Pair<DataSegment, String>> 
retrieveUsedSegmentsAndCreatedDates(String dataSource)
+  public List<Pair<DataSegment, String>> 
retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval)
   {
-    String rawQueryString = "SELECT created_date, payload FROM %1$s WHERE 
dataSource = :dataSource AND used = true";
-    final String queryString = StringUtils.format(rawQueryString, 
dbTables.getSegmentsTable());
+    StringBuilder queryBuilder = new StringBuilder(
+        "SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource 
AND used = true"
+    );
+
+    final List<Interval> intervals = new ArrayList<>();
+    // Do not need an interval condition if the interval is ETERNITY
+    if (!Intervals.isEternity(interval)) {
+      intervals.add(interval);
+    }
+
+    SqlSegmentsMetadataQuery.appendConditionForIntervalsAndMatchMode(
+        queryBuilder,
+        intervals,
+        SqlSegmentsMetadataQuery.IntervalMode.OVERLAPS,
+        connector
+    );
+
+    final String queryString = StringUtils.format(queryBuilder.toString(), 
dbTables.getSegmentsTable());
     return connector.retryWithHandle(
         handle -> {
           Query<Map<String, Object>> query = handle
               .createQuery(queryString)
               .bind("dataSource", dataSource);
+
+          SqlSegmentsMetadataQuery.bindQueryIntervals(query, intervals);
+
           return query
               .map((int index, ResultSet r, StatementContext ctx) ->
                        new Pair<>(
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index 20b176c5091..76e4f974576 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -261,6 +261,82 @@ public class SqlSegmentsMetadataQuery
     return null;
   }
 
+  /**
+   * Append the condition for the interval and match mode to the given string 
builder with a partial query
+   * @param sb - StringBuilder containing the paritial query with SELECT 
clause and WHERE condition for used, datasource
+   * @param intervals - intervals to fetch the segments for
+   * @param matchMode - Interval match mode - overlaps or contains
+   * @param connector - SQL connector
+   */
+  public static void appendConditionForIntervalsAndMatchMode(
+      final StringBuilder sb,
+      final Collection<Interval> intervals,
+      final IntervalMode matchMode,
+      final SQLMetadataConnector connector
+  )
+  {
+    if (intervals.isEmpty()) {
+      return;
+    }
+
+    sb.append(" AND (");
+    for (int i = 0; i < intervals.size(); i++) {
+      sb.append(
+          matchMode.makeSqlCondition(
+              connector.getQuoteString(),
+              StringUtils.format(":start%d", i),
+              StringUtils.format(":end%d", i)
+          )
+      );
+
+      // Add a special check for a segment which have one end at eternity and 
the other at some finite value. Since
+      // we are using string comparison, a segment with this start or end will 
not be returned otherwise.
+      if (matchMode.equals(IntervalMode.OVERLAPS)) {
+        sb.append(StringUtils.format(
+            " OR (start = '%s' AND \"end\" != '%s' AND \"end\" > :start%d)",
+            Intervals.ETERNITY.getStart(), Intervals.ETERNITY.getEnd(), i
+        ));
+        sb.append(StringUtils.format(
+            " OR (start != '%s' AND \"end\" = '%s' AND start < :end%d)",
+            Intervals.ETERNITY.getStart(), Intervals.ETERNITY.getEnd(), i
+        ));
+      }
+
+      if (i != intervals.size() - 1) {
+        sb.append(" OR ");
+      }
+    }
+
+    // Add a special check for a single segment with eternity. Since we are 
using string comparison, a segment with
+    // this start and end will not be returned otherwise.
+    // Known Issue: https://github.com/apache/druid/issues/12860
+    if (matchMode.equals(IntervalMode.OVERLAPS)) {
+      sb.append(StringUtils.format(
+          " OR (start = '%s' AND \"end\" = '%s')", 
Intervals.ETERNITY.getStart(), Intervals.ETERNITY.getEnd()
+      ));
+    }
+    sb.append(")");
+  }
+
+  /**
+   * Given a Query object bind the input intervals to it
+   * @param query Query to fetch segments
+   * @param intervals Intervals to fetch segments for
+   */
+  public static void bindQueryIntervals(final Query<Map<String, Object>> 
query, final Collection<Interval> intervals)
+  {
+    if (intervals.isEmpty()) {
+      return;
+    }
+
+    final Iterator<Interval> iterator = intervals.iterator();
+    for (int i = 0; iterator.hasNext(); i++) {
+      Interval interval = iterator.next();
+      query.bind(StringUtils.format("start%d", i), 
interval.getStart().toString())
+           .bind(StringUtils.format("end%d", i), interval.getEnd().toString());
+    }
+  }
+
   private CloseableIterator<DataSegment> retrieveSegments(
       final String dataSource,
       final Collection<Interval> intervals,
@@ -275,36 +351,8 @@ public class SqlSegmentsMetadataQuery
     final StringBuilder sb = new StringBuilder();
     sb.append("SELECT payload FROM %s WHERE used = :used AND dataSource = 
:dataSource");
 
-    if (compareAsString && !intervals.isEmpty()) {
-      sb.append(" AND (");
-      for (int i = 0; i < intervals.size(); i++) {
-        sb.append(
-            matchMode.makeSqlCondition(
-                connector.getQuoteString(),
-                StringUtils.format(":start%d", i),
-                StringUtils.format(":end%d", i)
-            )
-        );
-
-        // Add a special check for a segment which have one end at eternity 
and the other at some finite value. Since
-        // we are using string comparison, a segment with this start or end 
will not be returned otherwise.
-        if (matchMode.equals(IntervalMode.OVERLAPS)) {
-          sb.append(StringUtils.format(" OR (start = '%s' AND \"end\" != '%s' 
AND \"end\" > :start%d)", Intervals.ETERNITY.getStart(), 
Intervals.ETERNITY.getEnd(), i));
-          sb.append(StringUtils.format(" OR (start != '%s' AND \"end\" = '%s' 
AND start < :end%d)", Intervals.ETERNITY.getStart(), 
Intervals.ETERNITY.getEnd(), i));
-        }
-
-        if (i != intervals.size() - 1) {
-          sb.append(" OR ");
-        }
-      }
-
-      // Add a special check for a single segment with eternity. Since we are 
using string comparison, a segment with
-      // this start and end will not be returned otherwise.
-      // Known Issue: https://github.com/apache/druid/issues/12860
-      if (matchMode.equals(IntervalMode.OVERLAPS)) {
-        sb.append(StringUtils.format(" OR (start = '%s' AND \"end\" = '%s')", 
Intervals.ETERNITY.getStart(), Intervals.ETERNITY.getEnd()));
-      }
-      sb.append(")");
+    if (compareAsString) {
+      appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode, 
connector);
     }
 
     final Query<Map<String, Object>> sql = handle
@@ -317,12 +365,7 @@ public class SqlSegmentsMetadataQuery
     }
 
     if (compareAsString) {
-      final Iterator<Interval> iterator = intervals.iterator();
-      for (int i = 0; iterator.hasNext(); i++) {
-        Interval interval = iterator.next();
-        sql.bind(StringUtils.format("start%d", i), 
interval.getStart().toString())
-           .bind(StringUtils.format("end%d", i), interval.getEnd().toString());
-      }
+      bindQueryIntervals(sql, intervals);
     }
 
     final ResultIterator<DataSegment> resultIterator =
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index b1b6f3aa16e..0512357ffc1 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -2554,6 +2554,52 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     );
   }
 
+  @Test
+  public void testRetrieveUsedSegmentsAndCreatedDates()
+  {
+    insertUsedSegments(ImmutableSet.of(defaultSegment));
+
+    List<Pair<DataSegment, String>> resultForIntervalOnTheLeft =
+        
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), 
Intervals.of("2000/2001"));
+    Assert.assertTrue(resultForIntervalOnTheLeft.isEmpty());
+
+    List<Pair<DataSegment, String>> resultForIntervalOnTheRight =
+        
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), 
Intervals.of("3000/3001"));
+    Assert.assertTrue(resultForIntervalOnTheRight.isEmpty());
+
+    List<Pair<DataSegment, String>> resultForExactInterval =
+        
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), 
defaultSegment.getInterval());
+    Assert.assertEquals(1, resultForExactInterval.size());
+    Assert.assertEquals(defaultSegment, resultForExactInterval.get(0).lhs);
+
+    List<Pair<DataSegment, String>> resultForIntervalWithLeftOverlap =
+        
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), 
Intervals.of("2000/2015-01-02"));
+    Assert.assertEquals(resultForExactInterval, 
resultForIntervalWithLeftOverlap);
+
+    List<Pair<DataSegment, String>> resultForIntervalWithRightOverlap =
+        
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), 
Intervals.of("2015-01-01/3000"));
+    Assert.assertEquals(resultForExactInterval, 
resultForIntervalWithRightOverlap);
+
+    List<Pair<DataSegment, String>> resultForEternity =
+        
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), 
Intervals.ETERNITY);
+    Assert.assertEquals(resultForExactInterval, resultForEternity);
+  }
+
+  @Test
+  public void 
testRetrieveUsedSegmentsAndCreatedDatesFetchesEternityForAnyInterval()
+  {
+
+    insertUsedSegments(ImmutableSet.of(eternitySegment, 
firstHalfEternityRangeSegment, secondHalfEternityRangeSegment));
+
+    List<Pair<DataSegment, String>> resultForRandomInterval =
+        
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), 
defaultSegment.getInterval());
+    Assert.assertEquals(3, resultForRandomInterval.size());
+
+    List<Pair<DataSegment, String>> resultForEternity =
+        
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), 
eternitySegment.getInterval());
+    Assert.assertEquals(3, resultForEternity.size());
+  }
+
   private static class DS
   {
     static final String WIKI = "wiki";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to