This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d25caaefa42 Add support for streaming ingestion with concurrent 
replace (#15039)
d25caaefa42 is described below

commit d25caaefa4282a6b61663e8184d3c0b4092f6f35
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Fri Oct 13 09:09:03 2023 +0530

    Add support for streaming ingestion with concurrent replace (#15039)
    
    Add support for streaming ingestion with concurrent replace
    
    ---------
    
    Co-authored-by: Kashif Faraz <[email protected]>
---
 .../actions/SegmentTransactionalAppendAction.java  | 100 ++++--
 .../actions/SegmentTransactionalInsertAction.java  |  44 +--
 .../actions/SegmentTransactionalReplaceAction.java |  62 +++-
 .../druid/indexing/common/actions/TaskLocks.java   |   2 +
 .../common/task/AbstractBatchIndexTask.java        |  12 +-
 .../task/AppenderatorDriverRealtimeIndexTask.java  |   3 +-
 .../druid/indexing/common/task/IndexTask.java      |   8 +-
 .../druid/indexing/common/task/IndexTaskUtils.java |  49 ++-
 .../parallel/ParallelIndexSupervisorTask.java      |   7 +-
 .../druid/indexing/overlord/TaskLockbox.java       |  62 +++-
 .../indexing/overlord/TaskStorageQueryAdapter.java |  10 +
 .../indexing/overlord/http/OverlordResource.java   |  16 +
 .../overlord/supervisor/SupervisorManager.java     |  51 +++
 .../seekablestream/PendingSegmentVersions.java     |  56 +++
 .../seekablestream/SeekableStreamIndexTask.java    |   3 +-
 .../SeekableStreamIndexTaskClient.java             |  16 +
 .../SeekableStreamIndexTaskClientAsyncImpl.java    |  18 +
 .../SeekableStreamIndexTaskRunner.java             |  57 +++-
 .../indexing/seekablestream/SequenceMetadata.java  |  41 ++-
 .../supervisor/SeekableStreamSupervisor.java       |  20 ++
 .../common/task/concurrent/ActionsTestTask.java    |  30 +-
 .../concurrent/ConcurrentReplaceAndAppendTest.java |  91 ++++-
 .../druid/indexing/overlord/TaskLockboxTest.java   |  77 +++++
 .../seekablestream/SequenceMetadataTest.java       |  30 +-
 .../TestIndexerMetadataStorageCoordinator.java     |  23 +-
 .../util/emitter/service/SegmentMetadataEvent.java |  13 +
 .../emitter/service/SegmentMetadataEventTest.java  |  31 ++
 .../IndexerMetadataStorageCoordinator.java         |  33 +-
 .../IndexerSQLMetadataStorageCoordinator.java      | 376 +++++++++++++++------
 .../apache/druid/metadata/LockFilterPolicy.java    |  88 +++++
 .../apache/druid/rpc/indexing/OverlordClient.java  |  15 +-
 .../druid/rpc/indexing/OverlordClientImpl.java     |   9 +-
 .../appenderator/SinkQuerySegmentWalker.java       |  18 +-
 .../realtime/appenderator/StreamAppenderator.java  |  69 +++-
 .../server/coordinator/duty/CompactSegments.java   |  20 +-
 .../druid/server/http/DataSourcesResource.java     |  23 +-
 .../druid/client/indexing/NoopOverlordClient.java  |   5 +-
 .../druid/rpc/indexing/OverlordClientImplTest.java |  21 +-
 .../coordinator/duty/CompactSegmentsTest.java      |   6 +-
 39 files changed, 1314 insertions(+), 301 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
index 171c4f6640f..67b701718ca 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
@@ -22,15 +22,20 @@ package org.apache.druid.indexing.common.actions;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.task.IndexTaskUtils;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.CriticalAction;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.SegmentPublishResult;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.metadata.ReplaceTaskLock;
 import org.apache.druid.segment.SegmentUtils;
 import org.apache.druid.timeline.DataSegment;
 
+import javax.annotation.Nullable;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -42,18 +47,40 @@ import java.util.stream.Collectors;
 public class SegmentTransactionalAppendAction implements 
TaskAction<SegmentPublishResult>
 {
   private final Set<DataSegment> segments;
+  @Nullable
+  private final DataSourceMetadata startMetadata;
+  @Nullable
+  private final DataSourceMetadata endMetadata;
 
-  public static SegmentTransactionalAppendAction create(Set<DataSegment> 
segments)
+  public static SegmentTransactionalAppendAction forSegments(Set<DataSegment> 
segments)
   {
-    return new SegmentTransactionalAppendAction(segments);
+    return new SegmentTransactionalAppendAction(segments, null, null);
+  }
+
+  public static SegmentTransactionalAppendAction forSegmentsAndMetadata(
+      Set<DataSegment> segments,
+      DataSourceMetadata startMetadata,
+      DataSourceMetadata endMetadata
+  )
+  {
+    return new SegmentTransactionalAppendAction(segments, startMetadata, 
endMetadata);
   }
 
   @JsonCreator
   private SegmentTransactionalAppendAction(
-      @JsonProperty("segments") Set<DataSegment> segments
+      @JsonProperty("segments") Set<DataSegment> segments,
+      @JsonProperty("startMetadata") @Nullable DataSourceMetadata 
startMetadata,
+      @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata
   )
   {
     this.segments = segments;
+    this.startMetadata = startMetadata;
+    this.endMetadata = endMetadata;
+
+    if ((startMetadata == null && endMetadata != null)
+        || (startMetadata != null && endMetadata == null)) {
+      throw InvalidInput.exception("startMetadata and endMetadata must either 
be both null or both non-null.");
+    }
   }
 
   @JsonProperty
@@ -62,6 +89,20 @@ public class SegmentTransactionalAppendAction implements 
TaskAction<SegmentPubli
     return segments;
   }
 
+  @JsonProperty
+  @Nullable
+  public DataSourceMetadata getStartMetadata()
+  {
+    return startMetadata;
+  }
+
+  @JsonProperty
+  @Nullable
+  public DataSourceMetadata getEndMetadata()
+  {
+    return endMetadata;
+  }
+
   @Override
   public TypeReference<SegmentPublishResult> getReturnTypeReference()
   {
@@ -70,30 +111,48 @@ public class SegmentTransactionalAppendAction implements 
TaskAction<SegmentPubli
     };
   }
 
-  /**
-   * Performs some sanity checks and publishes the given segments.
-   */
   @Override
   public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
   {
+    // Verify that all the locks are of expected type
+    final List<TaskLock> locks = 
toolbox.getTaskLockbox().findLocksForTask(task);
+    for (TaskLock lock : locks) {
+      if (lock.getType() != TaskLockType.APPEND) {
+        throw InvalidInput.exception(
+            "Cannot use action[%s] for task[%s] as it is holding a lock of 
type[%s] instead of [APPEND].",
+            "SegmentTransactionalAppendAction", task.getId(), lock.getType()
+        );
+      }
+    }
+
     TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), 
segments);
 
     final String datasource = task.getDataSource();
     final Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock
         = TaskLocks.findReplaceLocksCoveringSegments(datasource, 
toolbox.getTaskLockbox(), segments);
 
+    final CriticalAction.Action<SegmentPublishResult> publishAction;
+    if (startMetadata == null) {
+      publishAction = () -> 
toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
+          segments,
+          segmentToReplaceLock
+      );
+    } else {
+      publishAction = () -> 
toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegmentsAndMetadata(
+          segments,
+          segmentToReplaceLock,
+          startMetadata,
+          endMetadata
+      );
+    }
+
     final SegmentPublishResult retVal;
     try {
       retVal = toolbox.getTaskLockbox().doInCriticalSection(
           task,
           
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
           CriticalAction.<SegmentPublishResult>builder()
-              .onValidLocks(
-                  () -> 
toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
-                      segments,
-                      segmentToReplaceLock
-                  )
-              )
+              .onValidLocks(publishAction)
               .onInvalidLocks(
                   () -> SegmentPublishResult.fail(
                       "Invalid task locks. Maybe they are revoked by a higher 
priority task."
@@ -107,20 +166,7 @@ public class SegmentTransactionalAppendAction implements 
TaskAction<SegmentPubli
       throw new RuntimeException(e);
     }
 
-    // Emit metrics
-    final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
-    IndexTaskUtils.setTaskDimensions(metricBuilder, task);
-
-    if (retVal.isSuccess()) {
-      toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 
1));
-      for (DataSegment segment : retVal.getSegments()) {
-        IndexTaskUtils.setSegmentDimensions(metricBuilder, segment);
-        
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", 
segment.getSize()));
-      }
-    } else {
-      toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 
1));
-    }
-
+    IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox);
     return retVal;
   }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
index 9b23db71d46..5a9ca0cacdf 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
@@ -33,13 +33,8 @@ import org.apache.druid.indexing.overlord.CriticalAction;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.SegmentPublishResult;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
-import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.segment.SegmentUtils;
 import org.apache.druid.timeline.DataSegment;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
@@ -222,47 +217,10 @@ public class SegmentTransactionalInsertAction implements 
TaskAction<SegmentPubli
       throw new RuntimeException(e);
     }
 
-    // Emit metrics
-    final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
-    IndexTaskUtils.setTaskDimensions(metricBuilder, task);
-
-    if (retVal.isSuccess()) {
-      toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 
1));
-    } else {
-      toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 
1));
-    }
-
-    // getSegments() should return an empty set if 
announceHistoricalSegments() failed
-    for (DataSegment segment : retVal.getSegments()) {
-      metricBuilder.setDimension(DruidMetrics.INTERVAL, 
segment.getInterval().toString());
-      metricBuilder.setDimension(
-          DruidMetrics.PARTITIONING_TYPE,
-          segment.getShardSpec() == null ? null : 
segment.getShardSpec().getType()
-      );
-      toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", 
segment.getSize()));
-      // Emit the segment related metadata using the configured emitters.
-      // There is a possibility that some segments' metadata event might get 
missed if the
-      // server crashes after commiting segment but before emitting the event.
-      this.emitSegmentMetadata(segment, toolbox);
-    }
-
+    IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox);
     return retVal;
   }
 
-  private void emitSegmentMetadata(DataSegment segment, TaskActionToolbox 
toolbox)
-  {
-    SegmentMetadataEvent event = new SegmentMetadataEvent(
-        segment.getDataSource(),
-        DateTime.now(DateTimeZone.UTC),
-        segment.getInterval().getStart(),
-        segment.getInterval().getEnd(),
-        segment.getVersion(),
-        segment.getLastCompactionState() != null
-    );
-
-    toolbox.getEmitter().emit(event);
-  }
-
   private void checkWithSegmentLock()
   {
     final Map<Interval, List<DataSegment>> oldSegmentsMap = 
groupSegmentsByIntervalAndSort(segmentsToBeOverwritten);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
index 5a1228e1dd1..5a2b3ceec8f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
@@ -22,17 +22,20 @@ package org.apache.druid.indexing.common.actions;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.indexing.common.task.IndexTaskUtils;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.CriticalAction;
 import org.apache.druid.indexing.overlord.SegmentPublishResult;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.metadata.ReplaceTaskLock;
-import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.timeline.DataSegment;
 
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -42,6 +45,8 @@ import java.util.stream.Collectors;
  */
 public class SegmentTransactionalReplaceAction implements 
TaskAction<SegmentPublishResult>
 {
+  private static final Logger log = new 
Logger(SegmentTransactionalReplaceAction.class);
+
   /**
    * Set of segments to be inserted into metadata storage
    */
@@ -88,9 +93,9 @@ public class SegmentTransactionalReplaceAction implements 
TaskAction<SegmentPubl
     final Set<ReplaceTaskLock> replaceLocksForTask
         = toolbox.getTaskLockbox().findReplaceLocksForTask(task);
 
-    final SegmentPublishResult retVal;
+    final SegmentPublishResult publishResult;
     try {
-      retVal = toolbox.getTaskLockbox().doInCriticalSection(
+      publishResult = toolbox.getTaskLockbox().doInCriticalSection(
           task,
           
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
           CriticalAction.<SegmentPublishResult>builder()
@@ -111,24 +116,45 @@ public class SegmentTransactionalReplaceAction implements 
TaskAction<SegmentPubl
       throw new RuntimeException(e);
     }
 
-    // Emit metrics
-    final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
-    IndexTaskUtils.setTaskDimensions(metricBuilder, task);
-
-    if (retVal.isSuccess()) {
-      toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 
1));
+    IndexTaskUtils.emitSegmentPublishMetrics(publishResult, task, toolbox);
 
-      for (DataSegment segment : retVal.getSegments()) {
-        final String partitionType = segment.getShardSpec() == null ? null : 
segment.getShardSpec().getType();
-        metricBuilder.setDimension(DruidMetrics.PARTITIONING_TYPE, 
partitionType);
-        metricBuilder.setDimension(DruidMetrics.INTERVAL, 
segment.getInterval().toString());
-        
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", 
segment.getSize()));
+    // Upgrade any overlapping pending segments
+    // Do not perform upgrade in the same transaction as replace commit so that
+    // failure to upgrade pending segments does not affect success of the 
commit
+    if (publishResult.isSuccess() && toolbox.getSupervisorManager() != null) {
+      try {
+        tryUpgradeOverlappingPendingSegments(task, toolbox);
+      }
+      catch (Exception e) {
+        log.error(e, "Error while upgrading pending segments for task[%s]", 
task.getId());
       }
-    } else {
-      toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 
1));
     }
 
-    return retVal;
+    return publishResult;
+  }
+
+  /**
+   * Tries to upgrade any pending segments that overlap with the committed 
segments.
+   */
+  private void tryUpgradeOverlappingPendingSegments(Task task, 
TaskActionToolbox toolbox)
+  {
+    final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
+    final Optional<String> activeSupervisorId = 
supervisorManager.getActiveSupervisorIdForDatasource(task.getDataSource());
+    if (!activeSupervisorId.isPresent()) {
+      return;
+    }
+
+    Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradedPendingSegments =
+        
toolbox.getIndexerMetadataStorageCoordinator().upgradePendingSegmentsOverlappingWith(segments);
+    log.info(
+        "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
+        upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
+    );
+
+    upgradedPendingSegments.forEach(
+        (oldId, newId) -> toolbox.getSupervisorManager()
+                                 
.registerNewVersionOfPendingSegmentOnSupervisor(activeSupervisorId.get(), 
oldId, newId)
+    );
   }
 
   @Override
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
index bb835997801..d6064935925 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
@@ -125,6 +125,8 @@ public class TaskLocks
                          && 
timeChunkLock.getDataSource().equals(segment.getDataSource())
                          && 
(timeChunkLock.getVersion().compareTo(segment.getVersion()) >= 0
                              || 
TaskLockType.APPEND.equals(timeChunkLock.getType()));
+                  // APPEND locks always have the version DateTimes.EPOCH 
(1970-01-01)
+                  // and cover the segments irrespective of the segment version
                 } else {
                   final SegmentLock segmentLock = (SegmentLock) lock;
                   return 
segmentLock.getInterval().contains(segment.getInterval())
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index ea61f37c7e9..fe19b35391e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -401,21 +401,21 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
 
   /**
    * Builds a TaskAction to publish segments based on the type of locks that 
this
-   * task acquires (determined by context property {@link 
Tasks#TASK_LOCK_TYPE}).
+   * task acquires.
+   *
+   * @see #determineLockType
    */
   protected TaskAction<SegmentPublishResult> buildPublishAction(
       Set<DataSegment> segmentsToBeOverwritten,
-      Set<DataSegment> segmentsToPublish
+      Set<DataSegment> segmentsToPublish,
+      TaskLockType lockType
   )
   {
-    TaskLockType lockType = TaskLockType.valueOf(
-        getContextValue(Tasks.TASK_LOCK_TYPE, 
Tasks.DEFAULT_TASK_LOCK_TYPE.name())
-    );
     switch (lockType) {
       case REPLACE:
         return SegmentTransactionalReplaceAction.create(segmentsToPublish);
       case APPEND:
-        return SegmentTransactionalAppendAction.create(segmentsToPublish);
+        return SegmentTransactionalAppendAction.forSegments(segmentsToPublish);
       default:
         return 
SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, 
segmentsToPublish);
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index dfa1f85fde7..3a599dd485b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -27,7 +27,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -696,7 +695,7 @@ public class AppenderatorDriverRealtimeIndexTask extends 
AbstractTask implements
     );
     pendingHandoffs.add(Futures.transformAsync(
         publishFuture,
-        (AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) 
driver::registerHandoff,
+        driver::registerHandoff,
         MoreExecutors.directExecutor()
     ));
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index a2ca4f869ea..d880f3eb86a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -50,6 +50,7 @@ import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexer.partitions.SecondaryPartitionType;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
@@ -910,10 +911,11 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
         throw new UOE("[%s] secondary partition type is not supported", 
partitionsSpec.getType());
     }
 
-
+    final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse();
     final TransactionalSegmentPublisher publisher =
-        (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
-            
toolbox.getTaskActionClient().submit(buildPublishAction(segmentsToBeOverwritten,
 segmentsToPublish));
+        (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> 
toolbox.getTaskActionClient().submit(
+            buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, 
taskLockType)
+        );
 
     String effectiveId = 
getContextValue(CompactionTask.CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, null);
     if (effectiveId == null) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
index 20f7584c8eb..79a3e8993a8 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
@@ -20,8 +20,10 @@
 package org.apache.druid.indexing.common.task;
 
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.actions.TaskActionToolbox;
+import org.apache.druid.indexing.overlord.SegmentPublishResult;
 import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
@@ -35,7 +37,6 @@ import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.utils.CircularBuffer;
-import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
 import javax.servlet.http.HttpServletRequest;
@@ -45,29 +46,6 @@ import java.util.Map;
 
 public class IndexTaskUtils
 {
-  @Nullable
-  public static List<String> getMessagesFromSavedParseExceptions(
-      CircularBuffer<ParseException> savedParseExceptions,
-      boolean includeTimeOfException
-  )
-  {
-    if (savedParseExceptions == null) {
-      return null;
-    }
-
-    List<String> events = new ArrayList<>();
-    for (int i = 0; i < savedParseExceptions.size(); i++) {
-      if (includeTimeOfException) {
-        DateTime timeOfException = 
DateTimes.utc(savedParseExceptions.getLatest(i).getTimeOfExceptionMillis());
-        events.add(timeOfException + ", " + 
savedParseExceptions.getLatest(i).getMessage());
-      } else {
-        events.add(savedParseExceptions.getLatest(i).getMessage());
-      }
-    }
-
-    return events;
-  }
-
   @Nullable
   public static List<ParseExceptionReport> 
getReportListFromSavedParseExceptions(
       CircularBuffer<ParseExceptionReport> savedParseExceptionReports
@@ -152,4 +130,25 @@ public class IndexTaskUtils
     metricBuilder.setDimension(DruidMetrics.PARTITIONING_TYPE, partitionType);
     metricBuilder.setDimension(DruidMetrics.INTERVAL, 
segment.getInterval().toString());
   }
+
+  public static void emitSegmentPublishMetrics(
+      SegmentPublishResult publishResult,
+      Task task,
+      TaskActionToolbox toolbox
+  )
+  {
+    final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
+    IndexTaskUtils.setTaskDimensions(metricBuilder, task);
+
+    if (publishResult.isSuccess()) {
+      toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 
1));
+      for (DataSegment segment : publishResult.getSegments()) {
+        IndexTaskUtils.setSegmentDimensions(metricBuilder, segment);
+        
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", 
segment.getSize()));
+        toolbox.getEmitter().emit(SegmentMetadataEvent.create(segment, 
DateTimes.nowUtc()));
+      }
+    } else {
+      toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 
1));
+    }
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index d3e218623cd..e99ef35d942 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -42,6 +42,7 @@ import 
org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -1167,9 +1168,11 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
       }
     }
 
+    final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse();
     final TransactionalSegmentPublisher publisher =
-        (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
-            
toolbox.getTaskActionClient().submit(buildPublishAction(segmentsToBeOverwritten,
 segmentsToPublish));
+        (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> 
toolbox.getTaskActionClient().submit(
+            buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, 
taskLockType)
+        );
 
     final boolean published =
         newSegments.isEmpty()
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 761c0b59160..54191adf05d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
 import com.google.inject.Inject;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.SegmentLock;
 import org.apache.druid.indexing.common.TaskLock;
@@ -38,12 +39,14 @@ import 
org.apache.druid.indexing.common.actions.SegmentAllocateAction;
 import org.apache.druid.indexing.common.actions.SegmentAllocateRequest;
 import org.apache.druid.indexing.common.actions.SegmentAllocateResult;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.metadata.ReplaceTaskLock;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.joda.time.DateTime;
@@ -941,7 +944,7 @@ public class TaskLockbox
 
       // Replace locks are always held by the supervisor task
       if (posse.taskIds.size() > 1) {
-        throw new ISE(
+        throw DruidException.defensive(
             "Replace lock[%s] for datasource[%s] is held by multiple 
tasks[%s]",
             lock, datasource, posse.taskIds
         );
@@ -956,6 +959,63 @@ public class TaskLockbox
     return replaceLocks;
   }
 
+  /**
+   * @param lockFilterPolicies Lock filters for the given datasources
+   * @return Map from datasource to intervals locked by tasks satisfying the 
lock filter condititions
+   */
+  public Map<String, List<Interval>> getLockedIntervals(List<LockFilterPolicy> 
lockFilterPolicies)
+  {
+    final Map<String, Set<Interval>> datasourceToIntervals = new HashMap<>();
+
+    // Take a lock and populate the maps
+    giant.lock();
+
+    try {
+      lockFilterPolicies.forEach(
+          lockFilter -> {
+            final String datasource = lockFilter.getDatasource();
+            if (!running.containsKey(datasource)) {
+              return;
+            }
+
+            final int priority = lockFilter.getPriority();
+            final boolean ignoreAppendLocks =
+                
TaskLockType.REPLACE.name().equals(lockFilter.getContext().get(Tasks.TASK_LOCK_TYPE));
+
+            running.get(datasource).forEach(
+                (startTime, startTimeLocks) -> startTimeLocks.forEach(
+                    (interval, taskLockPosses) -> taskLockPosses.forEach(
+                        taskLockPosse -> {
+                          if (taskLockPosse.getTaskLock().isRevoked()) {
+                            // do nothing
+                          } else if (ignoreAppendLocks
+                                     && 
TaskLockType.APPEND.equals(taskLockPosse.getTaskLock().getType())) {
+                            // do nothing
+                          } else if (taskLockPosse.getTaskLock().getPriority() 
== null
+                                     || 
taskLockPosse.getTaskLock().getPriority() < priority) {
+                            // do nothing
+                          } else {
+                            datasourceToIntervals.computeIfAbsent(datasource, 
k -> new HashSet<>())
+                                                 .add(interval);
+                          }
+                        }
+                    )
+                )
+            );
+          }
+      );
+    }
+    finally {
+      giant.unlock();
+    }
+
+    return datasourceToIntervals.entrySet().stream()
+                                .collect(Collectors.toMap(
+                                    Map.Entry::getKey,
+                                    entry -> new ArrayList<>(entry.getValue())
+                                ));
+  }
+
   /**
    * Gets a List of Intervals locked by higher priority tasks for each 
datasource.
    * Here, Segment Locks are being treated the same as Time Chunk Locks i.e.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java
index 3fa570ccb32..140d9b7ac40 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java
@@ -28,6 +28,7 @@ import 
org.apache.druid.indexing.common.actions.SegmentInsertAction;
 import 
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
 import org.apache.druid.indexing.common.actions.TaskAction;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.metadata.TaskLookup;
 import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
 import org.apache.druid.metadata.TaskLookup.TaskLookupType;
@@ -60,6 +61,15 @@ public class TaskStorageQueryAdapter
     return storage.getActiveTasks();
   }
 
+  /**
+   * @param lockFilterPolicies Requests for conflicing lock intervals for 
various datasources
+   * @return Map from datasource to intervals locked by tasks that have a 
conflicting lock type that cannot be revoked
+   */
+  public Map<String, List<Interval>> getLockedIntervals(List<LockFilterPolicy> 
lockFilterPolicies)
+  {
+    return taskLockbox.getLockedIntervals(lockFilterPolicies);
+  }
+
   /**
    * Gets a List of Intervals locked by higher priority tasks for each 
datasource.
    *
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index fa61f796154..f9604492dd4 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -63,6 +63,7 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.metadata.TaskLookup;
 import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
 import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
@@ -260,6 +261,7 @@ public class OverlordResource
     }
   }
 
+  @Deprecated
   @POST
   @Path("/lockedIntervals")
   @Produces(MediaType.APPLICATION_JSON)
@@ -274,6 +276,20 @@ public class OverlordResource
     return 
Response.ok(taskStorageQueryAdapter.getLockedIntervals(minTaskPriority)).build();
   }
 
+  @POST
+  @Path("/lockedIntervals/v2")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(StateResourceFilter.class)
+  public Response getDatasourceLockedIntervalsV2(List<LockFilterPolicy> 
lockFilterPolicies)
+  {
+    if (lockFilterPolicies == null || lockFilterPolicies.isEmpty()) {
+      return Response.status(Status.BAD_REQUEST).entity("No filter 
provided").build();
+    }
+
+    // Build the response
+    return 
Response.ok(taskStorageQueryAdapter.getLockedIntervals(lockFilterPolicies)).build();
+  }
+
   @GET
   @Path("/task/{taskid}")
   @Produces(MediaType.APPLICATION_JSON)
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 2cd926bae90..d55f3cc8bd0 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -24,12 +24,14 @@ import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.metadata.MetadataSupervisorManager;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 
 import javax.annotation.Nullable;
 
@@ -69,6 +71,22 @@ public class SupervisorManager
     return supervisors.keySet();
   }
 
+  public Optional<String> getActiveSupervisorIdForDatasource(String datasource)
+  {
+    for (Map.Entry<String, Pair<Supervisor, SupervisorSpec>> entry : 
supervisors.entrySet()) {
+      final String supervisorId = entry.getKey();
+      final Supervisor supervisor = entry.getValue().lhs;
+      final SupervisorSpec supervisorSpec = entry.getValue().rhs;
+      if (supervisor instanceof SeekableStreamSupervisor
+          && !supervisorSpec.isSuspended()
+          && supervisorSpec.getDataSources().contains(datasource)) {
+        return Optional.of(supervisorId);
+      }
+    }
+
+    return Optional.absent();
+  }
+
   public Optional<SupervisorSpec> getSupervisorSpec(String id)
   {
     Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
@@ -247,6 +265,39 @@ public class SupervisorManager
     return false;
   }
 
+  /**
+   * Registers a new version of the given pending segment on a supervisor. This
+   * allows the supervisor to include the pending segment in queries fired 
against
+   * that segment version.
+   */
+  public boolean registerNewVersionOfPendingSegmentOnSupervisor(
+      String supervisorId,
+      SegmentIdWithShardSpec basePendingSegment,
+      SegmentIdWithShardSpec newSegmentVersion
+  )
+  {
+    try {
+      Preconditions.checkNotNull(supervisorId, "supervisorId cannot be null");
+      Preconditions.checkNotNull(basePendingSegment, "rootPendingSegment 
cannot be null");
+      Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion cannot 
be null");
+
+      Pair<Supervisor, SupervisorSpec> supervisor = 
supervisors.get(supervisorId);
+      Preconditions.checkNotNull(supervisor, "supervisor could not be found");
+      if (!(supervisor.lhs instanceof SeekableStreamSupervisor)) {
+        return false;
+      }
+
+      SeekableStreamSupervisor<?, ?, ?> seekableStreamSupervisor = 
(SeekableStreamSupervisor<?, ?, ?>) supervisor.lhs;
+      
seekableStreamSupervisor.registerNewVersionOfPendingSegment(basePendingSegment, 
newSegmentVersion);
+      return true;
+    }
+    catch (Exception e) {
+      log.error(e, "PendingSegment[%s] mapping update request to version[%s] 
on Supervisor[%s] failed",
+                basePendingSegment.asSegmentId(), 
newSegmentVersion.getVersion(), supervisorId);
+    }
+    return false;
+  }
+
 
   /**
    * Stops a supervisor with a given id and then removes it from the list.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java
new file mode 100644
index 00000000000..146b0afc4b9
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+
+/**
+ * Contains a new version of an existing base pending segment. Used by realtime
+ * tasks to serve queries against multiple versions of the same pending 
segment.
+ */
+public class PendingSegmentVersions
+{
+  private final SegmentIdWithShardSpec baseSegment;
+  private final SegmentIdWithShardSpec newVersion;
+
+  @JsonCreator
+  public PendingSegmentVersions(
+      @JsonProperty("baseSegment") SegmentIdWithShardSpec baseSegment,
+      @JsonProperty("newVersion") SegmentIdWithShardSpec newVersion
+  )
+  {
+    this.baseSegment = baseSegment;
+    this.newVersion = newVersion;
+  }
+
+  @JsonProperty
+  public SegmentIdWithShardSpec getBaseSegment()
+  {
+    return baseSegment;
+  }
+
+  @JsonProperty
+  public SegmentIdWithShardSpec getNewVersion()
+  {
+    return newVersion;
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 3aca46fbfae..d74ee5c0be2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -34,6 +34,7 @@ import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskLocks;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.AbstractTask;
 import org.apache.druid.indexing.common.task.TaskResource;
@@ -106,7 +107,7 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
     this.lockGranularityToUse = 
getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, 
Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK)
                                 ? LockGranularity.TIME_CHUNK
                                 : LockGranularity.SEGMENT;
-    this.lockTypeToUse = getContextValue(Tasks.USE_SHARED_LOCK, false) ? 
TaskLockType.SHARED : TaskLockType.EXCLUSIVE;
+    this.lockTypeToUse = TaskLocks.determineLockTypeForAppend(getContext());
   }
 
   protected static String getFormattedGroupId(String dataSource, String type)
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java
index 18631626d0f..5e592424960 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.seekablestream;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.joda.time.DateTime;
 
 import java.util.List;
@@ -153,6 +154,21 @@ public interface 
SeekableStreamIndexTaskClient<PartitionIdType, SequenceOffsetTy
    */
   ListenableFuture<SeekableStreamIndexTaskRunner.Status> getStatusAsync(String 
id);
 
+  /**
+   * Update the task state to redirect queries for later versions to the root 
pending segment.
+   * The task also announces that it is serving the segments belonging to the 
subsequent versions.
+   * The update is processed only if the task is serving the original pending 
segment.
+   * @param taskId - task id
+   * @param basePendingSegment - the pending segment that was originally 
allocated
+   * @param newVersionOfSegment - the ids belonging to the versions to which 
the root segment needs to be updated
+   * @return true if the update succeeds
+   */
+  ListenableFuture<Boolean> registerNewVersionOfPendingSegmentAsync(
+      String taskId,
+      SegmentIdWithShardSpec basePendingSegment,
+      SegmentIdWithShardSpec newVersionOfSegment
+  );
+
   Class<PartitionIdType> getPartitionType();
 
   Class<SequenceOffsetType> getSequenceType();
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java
index 9d6d49e00bf..40d475909e6 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java
@@ -57,6 +57,7 @@ import org.apache.druid.rpc.ServiceRetryPolicy;
 import org.apache.druid.rpc.StandardRetryPolicy;
 import org.apache.druid.rpc.indexing.SpecificTaskRetryPolicy;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.joda.time.DateTime;
@@ -193,6 +194,23 @@ public abstract class 
SeekableStreamIndexTaskClientAsyncImpl<PartitionIdType, Se
         .go();
   }
 
+  @Override
+  public ListenableFuture<Boolean> registerNewVersionOfPendingSegmentAsync(
+      String taskId,
+      SegmentIdWithShardSpec basePendingSegment,
+      SegmentIdWithShardSpec newVersionOfSegment
+  )
+  {
+    final RequestBuilder requestBuilder
+        = new RequestBuilder(HttpMethod.POST, "/pendingSegmentVersion")
+        .jsonContent(jsonMapper, new 
PendingSegmentVersions(basePendingSegment, newVersionOfSegment));
+
+    return makeRequest(taskId, requestBuilder)
+        .handler(IgnoreHttpResponseHandler.INSTANCE)
+        .onSuccess(r -> true)
+        .go();
+  }
+
   @Override
   public ListenableFuture<Boolean> setEndOffsetsAsync(
       final String id,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 27909aea83c..769413d6ffc 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -46,6 +46,8 @@ import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.discovery.DiscoveryDruidNode;
 import org.apache.druid.discovery.LookupNodeService;
 import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.ErrorResponse;
 import org.apache.druid.indexer.IngestionState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
@@ -60,6 +62,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
 import 
org.apache.druid.indexing.common.actions.CheckPointDataSourceMetadataAction;
 import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction;
 import org.apache.druid.indexing.common.actions.SegmentLockAcquireAction;
+import org.apache.druid.indexing.common.actions.TaskLocks;
 import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
 import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
 import org.apache.druid.indexing.common.task.IndexTaskUtils;
@@ -83,6 +86,7 @@ import 
org.apache.druid.segment.realtime.FireDepartmentMetrics;
 import org.apache.druid.segment.realtime.appenderator.Appenderator;
 import 
org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
 import 
org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
+import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
 import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
 import org.apache.druid.segment.realtime.firehose.ChatHandler;
 import org.apache.druid.server.security.Access;
@@ -319,7 +323,8 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
                   previous.getValue(),
                   current.getValue(),
                   true,
-                  exclusiveStartPartitions
+                  exclusiveStartPartitions,
+                  getTaskLockType()
               )
           );
           previous = current;
@@ -334,7 +339,8 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
                 previous.getValue(),
                 endOffsets,
                 false,
-                exclusiveStartPartitions
+                exclusiveStartPartitions,
+                getTaskLockType()
             )
         );
       } else {
@@ -345,7 +351,8 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
                 
ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap(),
                 endOffsets,
                 false,
-                ioConfig.getStartSequenceNumbers().getExclusivePartitions()
+                ioConfig.getStartSequenceNumbers().getExclusivePartitions(),
+                getTaskLockType()
             )
         );
       }
@@ -444,7 +451,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
               } else {
                 final TaskLock lock = toolbox.getTaskActionClient().submit(
                     new TimeChunkLockAcquireAction(
-                        TaskLockType.EXCLUSIVE,
+                        
TaskLocks.determineLockTypeForAppend(task.getContext()),
                         segmentId.getInterval(),
                         1000L
                     )
@@ -925,6 +932,11 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     return TaskStatus.success(task.getId());
   }
 
+  private TaskLockType getTaskLockType()
+  {
+    return TaskLocks.determineLockTypeForAppend(task.getContext());
+  }
+
   private void checkPublishAndHandoffFailure() throws ExecutionException, 
InterruptedException
   {
     // Check if any publishFuture failed.
@@ -1541,6 +1553,40 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     return setEndOffsets(sequences, finish);
   }
 
+  @POST
+  @Path("/pendingSegmentVersion")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response registerNewVersionOfPendingSegment(
+      PendingSegmentVersions pendingSegmentVersions,
+      // this field is only for internal purposes, shouldn't be usually set by 
users
+      @Context final HttpServletRequest req
+  )
+  {
+    authorizationCheck(req, Action.WRITE);
+    try {
+      ((StreamAppenderator) appenderator).registerNewVersionOfPendingSegment(
+          pendingSegmentVersions.getBaseSegment(),
+          pendingSegmentVersions.getNewVersion()
+      );
+      return Response.ok().build();
+    }
+    catch (DruidException e) {
+      return Response
+          .status(e.getStatusCode())
+          .entity(new ErrorResponse(e))
+          .build();
+    }
+    catch (Exception e) {
+      log.error(
+          e,
+          "Could not register new version[%s] of pending segment[%s]",
+          pendingSegmentVersions.getNewVersion(), 
pendingSegmentVersions.getBaseSegment()
+      );
+      return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
+    }
+  }
+
   public Map<String, Object> doGetRowStats()
   {
     Map<String, Object> returnMap = new HashMap<>();
@@ -1712,7 +1758,8 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
               sequenceNumbers,
               endOffsets,
               false,
-              exclusiveStartPartitions
+              exclusiveStartPartitions,
+              getTaskLockType()
           );
 
           log.info(
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
index 161a36de2fd..b5a65e99462 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
@@ -25,8 +25,12 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.data.input.Committer;
+import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskToolbox;
+import 
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
 import 
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import org.apache.druid.indexing.common.actions.TaskAction;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.SegmentPublishResult;
 import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
@@ -54,6 +58,7 @@ public class SequenceMetadata<PartitionIdType, 
SequenceOffsetType>
   private final String sequenceName;
   private final Set<PartitionIdType> exclusiveStartPartitions;
   private final Set<PartitionIdType> assignments;
+  private final TaskLockType taskLockType;
   private final boolean sentinel;
   /**
    * Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This 
lock is required because
@@ -73,7 +78,8 @@ public class SequenceMetadata<PartitionIdType, 
SequenceOffsetType>
       @JsonProperty("startOffsets") Map<PartitionIdType, SequenceOffsetType> 
startOffsets,
       @JsonProperty("endOffsets") Map<PartitionIdType, SequenceOffsetType> 
endOffsets,
       @JsonProperty("checkpointed") boolean checkpointed,
-      @JsonProperty("exclusiveStartPartitions") Set<PartitionIdType> 
exclusiveStartPartitions
+      @JsonProperty("exclusiveStartPartitions") Set<PartitionIdType> 
exclusiveStartPartitions,
+      @JsonProperty("taskLockType") @Nullable TaskLockType taskLockType
   )
   {
     Preconditions.checkNotNull(sequenceName);
@@ -86,6 +92,7 @@ public class SequenceMetadata<PartitionIdType, 
SequenceOffsetType>
     this.assignments = new HashSet<>(startOffsets.keySet());
     this.checkpointed = checkpointed;
     this.sentinel = false;
+    this.taskLockType = taskLockType;
     this.exclusiveStartPartitions = exclusiveStartPartitions == null
                                     ? Collections.emptySet()
                                     : exclusiveStartPartitions;
@@ -139,6 +146,12 @@ public class SequenceMetadata<PartitionIdType, 
SequenceOffsetType>
     }
   }
 
+  @JsonProperty
+  public TaskLockType getTaskLockType()
+  {
+    return taskLockType;
+  }
+
   @JsonProperty
   public boolean isSentinel()
   {
@@ -363,7 +376,7 @@ public class SequenceMetadata<PartitionIdType, 
SequenceOffsetType>
         );
       }
 
-      final SegmentTransactionalInsertAction action;
+      final TaskAction<SegmentPublishResult> action;
 
       if (segmentsToPush.isEmpty()) {
         // If a task ingested no data but made progress reading through its 
assigned partitions,
@@ -395,19 +408,21 @@ public class SequenceMetadata<PartitionIdType, 
SequenceOffsetType>
           );
         }
       } else if (useTransaction) {
-        action = SegmentTransactionalInsertAction.appendAction(
-            segmentsToPush,
-            runner.createDataSourceMetadata(
-                new SeekableStreamStartSequenceNumbers<>(
-                    finalPartitions.getStream(),
-                    getStartOffsets(),
-                    exclusiveStartPartitions
-                )
-            ),
-            runner.createDataSourceMetadata(finalPartitions)
+        final DataSourceMetadata startMetadata = 
runner.createDataSourceMetadata(
+            new SeekableStreamStartSequenceNumbers<>(
+                finalPartitions.getStream(),
+                getStartOffsets(),
+                exclusiveStartPartitions
+            )
         );
+        final DataSourceMetadata endMetadata = 
runner.createDataSourceMetadata(finalPartitions);
+        action = taskLockType == TaskLockType.APPEND
+                 ? 
SegmentTransactionalAppendAction.forSegmentsAndMetadata(segmentsToPush, 
startMetadata, endMetadata)
+                 : 
SegmentTransactionalInsertAction.appendAction(segmentsToPush, startMetadata, 
endMetadata);
       } else {
-        action = SegmentTransactionalInsertAction.appendAction(segmentsToPush, 
null, null);
+        action = taskLockType == TaskLockType.APPEND
+                 ? SegmentTransactionalAppendAction.forSegments(segmentsToPush)
+                 : 
SegmentTransactionalInsertAction.appendAction(segmentsToPush, null, null);
       }
 
       return toolbox.getTaskActionClient().submit(action);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 2f6cb008b84..1d05169e3fb 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -97,6 +97,7 @@ import org.apache.druid.query.ordering.StringComparators;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nonnull;
@@ -1092,6 +1093,25 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     addNotice(new ResetOffsetsNotice(resetDataSourceMetadata));
   }
 
+  public void registerNewVersionOfPendingSegment(
+      SegmentIdWithShardSpec basePendingSegment,
+      SegmentIdWithShardSpec newSegmentVersion
+  )
+  {
+    for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
+      for (String taskId : taskGroup.taskIds()) {
+        taskClient.registerNewVersionOfPendingSegmentAsync(taskId, 
basePendingSegment, newSegmentVersion);
+      }
+    }
+    for (List<TaskGroup> taskGroupList : pendingCompletionTaskGroups.values()) 
{
+      for (TaskGroup taskGroup : taskGroupList) {
+        for (String taskId : taskGroup.taskIds()) {
+          taskClient.registerNewVersionOfPendingSegmentAsync(taskId, 
basePendingSegment, newSegmentVersion);
+        }
+      }
+    }
+  }
+
   public ReentrantLock getRecordSupplierLock()
   {
     return recordSupplierLock;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
index b78efcbc346..69a8b6cc103 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Sets;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.actions.LockReleaseAction;
 import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
 import 
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
 import 
org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
@@ -60,6 +61,11 @@ public class ActionsTestTask extends CommandQueueTask
     return runAction(new TimeChunkLockTryAcquireAction(TaskLockType.REPLACE, 
interval));
   }
 
+  public Void releaseLock(Interval interval)
+  {
+    return runAction(new LockReleaseAction(interval));
+  }
+
   public TaskLock acquireAppendLockOn(Interval interval)
   {
     return runAction(new TimeChunkLockTryAcquireAction(TaskLockType.APPEND, 
interval));
@@ -75,7 +81,7 @@ public class ActionsTestTask extends CommandQueueTask
   public SegmentPublishResult commitAppendSegments(DataSegment... segments)
   {
     return runAction(
-        SegmentTransactionalAppendAction.create(Sets.newHashSet(segments))
+        SegmentTransactionalAppendAction.forSegments(Sets.newHashSet(segments))
     );
   }
 
@@ -97,6 +103,28 @@ public class ActionsTestTask extends CommandQueueTask
     );
   }
 
+  public SegmentIdWithShardSpec allocateSegmentForTimestamp(
+      DateTime timestamp,
+      Granularity preferredSegmentGranularity,
+      String sequenceName
+  )
+  {
+    return runAction(
+        new SegmentAllocateAction(
+            getDataSource(),
+            timestamp,
+            Granularities.SECOND,
+            preferredSegmentGranularity,
+            getId() + "__" + sequenceName,
+            null,
+            false,
+            NumberedPartialShardSpec.instance(),
+            LockGranularity.TIME_CHUNK,
+            TaskLockType.APPEND
+        )
+    );
+  }
+
   private <T> T runAction(TaskAction<T> action)
   {
     return execute(() -> client.submit(action));
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index 293503b1c72..22f21fb79b6 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -66,11 +66,14 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -626,10 +629,10 @@ public class ConcurrentReplaceAndAppendTest extends 
IngestionTestBase
 
     // Allocate an append segment for v1
     final ActionsTestTask appendTask1 = createAndStartTask();
-    appendTask1.acquireAppendLockOn(YEAR_23);
     final SegmentIdWithShardSpec pendingSegmentV11
         = appendTask1.allocateSegmentForTimestamp(YEAR_23.getStart(), 
Granularities.YEAR);
-    Assert.assertEquals(segmentV10.getVersion(), 
pendingSegmentV11.getVersion());
+    Assert.assertEquals(v1, pendingSegmentV11.getVersion());
+    Assert.assertEquals(YEAR_23, pendingSegmentV11.getInterval());
 
     // Commit replace segment for v2
     final ActionsTestTask replaceTask2 = createAndStartTask();
@@ -771,6 +774,90 @@ public class ConcurrentReplaceAndAppendTest extends 
IngestionTestBase
     verifyIntervalHasVisibleSegments(YEAR_23, segmentV10, segmentV11, 
segmentV13);
   }
 
+  @Test
+  public void testSegmentIsAllocatedAtLatestVersion()
+  {
+    final SegmentIdWithShardSpec pendingSegmentV01
+        = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), 
Granularities.MONTH);
+    Assert.assertEquals(SEGMENT_V0, pendingSegmentV01.getVersion());
+    Assert.assertEquals(JAN_23, pendingSegmentV01.getInterval());
+
+    final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
+    final DataSegment segmentV10 = createSegment(JAN_23, v1);
+    replaceTask.commitReplaceSegments(segmentV10);
+    verifyIntervalHasUsedSegments(JAN_23, segmentV10);
+    verifyIntervalHasVisibleSegments(JAN_23, segmentV10);
+
+    final SegmentIdWithShardSpec pendingSegmentV12
+        = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), 
Granularities.MONTH);
+    Assert.assertNotEquals(pendingSegmentV01.asSegmentId(), 
pendingSegmentV12.asSegmentId());
+    Assert.assertEquals(v1, pendingSegmentV12.getVersion());
+    Assert.assertEquals(JAN_23, pendingSegmentV12.getInterval());
+
+    replaceTask.releaseLock(JAN_23);
+    final ActionsTestTask replaceTask2 = createAndStartTask();
+    final String v2 = replaceTask2.acquireReplaceLockOn(JAN_23).getVersion();
+    final DataSegment segmentV20 = createSegment(JAN_23, v2);
+    replaceTask2.commitReplaceSegments(segmentV20);
+    verifyIntervalHasUsedSegments(JAN_23, segmentV10, segmentV20);
+    verifyIntervalHasVisibleSegments(JAN_23, segmentV20);
+
+    final SegmentIdWithShardSpec pendingSegmentV23
+        = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), 
Granularities.MONTH);
+    Assert.assertNotEquals(pendingSegmentV01.asSegmentId(), 
pendingSegmentV23.asSegmentId());
+    Assert.assertEquals(v2, pendingSegmentV23.getVersion());
+    Assert.assertEquals(JAN_23, pendingSegmentV23.getInterval());
+
+    // Commit the append segments
+    final DataSegment segmentV01 = asSegment(pendingSegmentV01);
+    final DataSegment segmentV12 = asSegment(pendingSegmentV12);
+    final DataSegment segmentV23 = asSegment(pendingSegmentV23);
+
+    Set<DataSegment> appendedSegments
+        = appendTask.commitAppendSegments(segmentV01, segmentV12, 
segmentV23).getSegments();
+    Assert.assertEquals(3 + 3, appendedSegments.size());
+
+    // Verify that the original append segments have been committed
+    Assert.assertTrue(appendedSegments.remove(segmentV01));
+    Assert.assertTrue(appendedSegments.remove(segmentV12));
+    Assert.assertTrue(appendedSegments.remove(segmentV23));
+
+    // Verify that segmentV01 has been upgraded to both v1 and v2
+    final DataSegment segmentV11 = findSegmentWith(v1, 
segmentV01.getLoadSpec(), appendedSegments);
+    Assert.assertNotNull(segmentV11);
+    final DataSegment segmentV21 = findSegmentWith(v2, 
segmentV01.getLoadSpec(), appendedSegments);
+    Assert.assertNotNull(segmentV21);
+
+    // Verify that segmentV12 has been upgraded to v2
+    final DataSegment segmentV22 = findSegmentWith(v2, 
segmentV12.getLoadSpec(), appendedSegments);
+    Assert.assertNotNull(segmentV22);
+
+    // Verify that segmentV23 is not downgraded to v1
+    final DataSegment segmentV13 = findSegmentWith(v1, 
segmentV23.getLoadSpec(), appendedSegments);
+    Assert.assertNull(segmentV13);
+
+    verifyIntervalHasUsedSegments(
+        YEAR_23,
+        segmentV01,
+        segmentV10, segmentV11, segmentV12,
+        segmentV20, segmentV21, segmentV22, segmentV23
+    );
+    verifyIntervalHasVisibleSegments(YEAR_23, segmentV20, segmentV21, 
segmentV22, segmentV23);
+  }
+
+  @Nullable
+  private DataSegment findSegmentWith(String version, Map<String, Object> 
loadSpec, Set<DataSegment> segments)
+  {
+    for (DataSegment segment : segments) {
+      if (version.equals(segment.getVersion())
+          && Objects.equals(segment.getLoadSpec(), loadSpec)) {
+        return segment;
+      }
+    }
+
+    return null;
+  }
+
   private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment)
   {
     final SegmentId id = pendingSegment.asSegmentId();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 4c761a6f71b..01dfc0f5d67 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import org.apache.druid.indexer.TaskStatus;
@@ -43,6 +44,7 @@ import 
org.apache.druid.indexing.common.config.TaskStorageConfig;
 import org.apache.druid.indexing.common.task.AbstractTask;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.indexing.overlord.TaskLockbox.TaskLockPosse;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
@@ -55,6 +57,7 @@ import 
org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
 import org.apache.druid.metadata.EntryExistsException;
 import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
+import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.metadata.MetadataStorageTablesConfig;
 import org.apache.druid.metadata.TestDerbyConnector;
 import org.apache.druid.segment.TestHelper;
@@ -1248,6 +1251,80 @@ public class TaskLockboxTest
     );
   }
 
+  @Test
+  public void testGetLockedIntervalsForHigherPriorityExclusiveLock()
+  {
+    final Task task = NoopTask.ofPriority(50);
+    lockbox.add(task);
+    taskStorage.insert(task, TaskStatus.running(task.getId()));
+    tryTimeChunkLock(
+        TaskLockType.APPEND,
+        task,
+        Intervals.of("2017/2018")
+    );
+
+    LockFilterPolicy requestForExclusiveLowerPriorityLock = new 
LockFilterPolicy(
+        task.getDataSource(),
+        75,
+        null
+    );
+
+    Map<String, List<Interval>> conflictingIntervals =
+        
lockbox.getLockedIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock));
+    Assert.assertTrue(conflictingIntervals.isEmpty());
+  }
+
+  @Test
+  public void testGetLockedIntervalsForLowerPriorityExclusiveLock()
+  {
+    final Task task = NoopTask.ofPriority(50);
+    lockbox.add(task);
+    taskStorage.insert(task, TaskStatus.running(task.getId()));
+    tryTimeChunkLock(
+        TaskLockType.APPEND,
+        task,
+        Intervals.of("2017/2018")
+    );
+
+    LockFilterPolicy requestForExclusiveLowerPriorityLock = new 
LockFilterPolicy(
+        task.getDataSource(),
+        25,
+        null
+    );
+
+    Map<String, List<Interval>> conflictingIntervals =
+        
lockbox.getLockedIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock));
+    Assert.assertEquals(1, conflictingIntervals.size());
+    Assert.assertEquals(
+        Collections.singletonList(Intervals.of("2017/2018")),
+        conflictingIntervals.get(task.getDataSource())
+    );
+  }
+
+  @Test
+  public void testGetLockedIntervalsForLowerPriorityReplaceLock()
+  {
+    final Task task = NoopTask.ofPriority(50);
+    lockbox.add(task);
+    taskStorage.insert(task, TaskStatus.running(task.getId()));
+    tryTimeChunkLock(
+        TaskLockType.APPEND,
+        task,
+        Intervals.of("2017/2018")
+    );
+
+    LockFilterPolicy requestForExclusiveLowerPriorityLock = new 
LockFilterPolicy(
+        task.getDataSource(),
+        25,
+        ImmutableMap.of(Tasks.TASK_LOCK_TYPE, TaskLockType.REPLACE.name())
+    );
+
+    Map<String, List<Interval>> conflictingIntervals =
+        
lockbox.getLockedIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock));
+    Assert.assertTrue(conflictingIntervals.isEmpty());
+  }
+
+
   @Test
   public void testExclusiveLockCompatibility()
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java
index aae07194bb9..fbe63ffe268 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java
@@ -29,9 +29,8 @@ import org.apache.druid.segment.SegmentUtils;
 import 
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.LinearShardSpec;
-import org.junit.Rule;
+import org.junit.Assert;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
@@ -43,9 +42,6 @@ import java.util.Set;
 @RunWith(MockitoJUnitRunner.class)
 public class SequenceMetadataTest
 {
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
   @Mock
   private SeekableStreamIndexTaskRunner mockSeekableStreamIndexTaskRunner;
 
@@ -59,7 +55,7 @@ public class SequenceMetadataTest
   private TaskToolbox mockTaskToolbox;
 
   @Test
-  public void 
testPublishAnnotatedSegmentsThrowExceptionIfOverwriteSegmentsNotNullAndNotEmpty()
 throws Exception
+  public void 
testPublishAnnotatedSegmentsThrowExceptionIfOverwriteSegmentsNotNullAndNotEmpty()
   {
     DataSegment dataSegment = DataSegment.builder()
                                          .dataSource("foo")
@@ -76,16 +72,21 @@ public class SequenceMetadataTest
         ImmutableMap.of(),
         ImmutableMap.of(),
         true,
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
-    TransactionalSegmentPublisher transactionalSegmentPublisher = 
sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner, 
mockTaskToolbox, true);
+    TransactionalSegmentPublisher transactionalSegmentPublisher
+        = sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner, 
mockTaskToolbox, true);
 
-    expectedException.expect(ISE.class);
-    expectedException.expectMessage(
-        "Stream ingestion task unexpectedly attempted to overwrite segments: " 
+ SegmentUtils.commaSeparatedIdentifiers(notNullNotEmptySegment)
+    ISE exception = Assert.assertThrows(
+        ISE.class,
+        () -> 
transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment, 
ImmutableSet.of(), null)
+    );
+    Assert.assertEquals(
+        "Stream ingestion task unexpectedly attempted to overwrite segments: "
+        + SegmentUtils.commaSeparatedIdentifiers(notNullNotEmptySegment),
+        exception.getMessage()
     );
-
-    
transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment, 
ImmutableSet.of(), null);
   }
 
   @Test
@@ -109,7 +110,8 @@ public class SequenceMetadataTest
         ImmutableMap.of(),
         ImmutableMap.of(),
         true,
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
     TransactionalSegmentPublisher transactionalSegmentPublisher = 
sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner, 
mockTaskToolbox, false);
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index 34d2e44552a..d1c72485011 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -113,7 +113,11 @@ public class TestIndexerMetadataStorageCoordinator 
implements IndexerMetadataSto
   }
 
   @Override
-  public List<DataSegment> retrieveUnusedSegmentsForInterval(String 
dataSource, Interval interval, @Nullable Integer limit)
+  public List<DataSegment> retrieveUnusedSegmentsForInterval(
+      String dataSource,
+      Interval interval,
+      @Nullable Integer limit
+  )
   {
     synchronized (unusedSegments) {
       Stream<DataSegment> resultStream = unusedSegments.stream();
@@ -175,6 +179,17 @@ public class TestIndexerMetadataStorageCoordinator 
implements IndexerMetadataSto
     return SegmentPublishResult.ok(commitSegments(appendSegments));
   }
 
+  @Override
+  public SegmentPublishResult commitAppendSegmentsAndMetadata(
+      Set<DataSegment> appendSegments,
+      Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
+      DataSourceMetadata startMetadata,
+      DataSourceMetadata endMetadata
+  )
+  {
+    return SegmentPublishResult.ok(commitSegments(appendSegments));
+  }
+
   @Override
   public SegmentPublishResult commitSegmentsAndMetadata(
       Set<DataSegment> segments,
@@ -222,6 +237,12 @@ public class TestIndexerMetadataStorageCoordinator 
implements IndexerMetadataSto
     );
   }
 
+  @Override
+  public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradePendingSegmentsOverlappingWith(Set<DataSegment> replaceSegments)
+  {
+    return Collections.emptyMap();
+  }
+
   @Override
   public int deletePendingSegmentsCreatedInInterval(String dataSource, 
Interval deleteInterval)
   {
diff --git 
a/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java
 
b/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java
index bc3769b6236..7e249f72d0a 100644
--- 
a/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java
+++ 
b/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java
@@ -22,6 +22,7 @@ package org.apache.druid.java.util.emitter.service;
 import com.fasterxml.jackson.annotation.JsonValue;
 import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.emitter.core.EventMap;
+import org.apache.druid.timeline.DataSegment;
 import org.joda.time.DateTime;
 
 /**
@@ -62,6 +63,18 @@ public class SegmentMetadataEvent implements Event
    */
   private final boolean isCompacted;
 
+  public static SegmentMetadataEvent create(DataSegment segment, DateTime 
eventTime)
+  {
+    return new SegmentMetadataEvent(
+        segment.getDataSource(),
+        eventTime,
+        segment.getInterval().getStart(),
+        segment.getInterval().getEnd(),
+        segment.getVersion(),
+        segment.getLastCompactionState() != null
+    );
+  }
+
   public SegmentMetadataEvent(
       String dataSource,
       DateTime createdTime,
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java
index 83a4fcba7dc..a926b004c0a 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java
@@ -21,6 +21,11 @@ package org.apache.druid.java.util.emitter.service;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.emitter.core.EventMap;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.DateTime;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -51,4 +56,30 @@ public class SegmentMetadataEventTest
         event.toMap()
     );
   }
+
+  @Test
+  public void testCreate()
+  {
+    final DataSegment segment = DataSegment.builder()
+                                           .dataSource("wiki")
+                                           .interval(Intervals.of("2023/2024"))
+                                           .shardSpec(new NumberedShardSpec(1, 
1))
+                                           .version("v1")
+                                           .size(100)
+                                           .build();
+    final DateTime eventTime = DateTimes.nowUtc();
+    SegmentMetadataEvent event = SegmentMetadataEvent.create(segment, 
eventTime);
+    Assert.assertEquals(
+        EventMap.builder()
+                .put(SegmentMetadataEvent.FEED, "segment_metadata")
+                .put(SegmentMetadataEvent.DATASOURCE, segment.getDataSource())
+                .put(SegmentMetadataEvent.CREATED_TIME, eventTime)
+                .put(SegmentMetadataEvent.START_TIME, 
segment.getInterval().getStart())
+                .put(SegmentMetadataEvent.END_TIME, 
segment.getInterval().getEnd())
+                .put(SegmentMetadataEvent.VERSION, segment.getVersion())
+                .put(SegmentMetadataEvent.IS_COMPACTED, false)
+                .build(),
+        event.toMap()
+    );
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 3cbabea78fa..2c2a6bc0f77 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -300,6 +300,21 @@ public interface IndexerMetadataStorageCoordinator
       Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock
   );
 
+  /**
+   * Commits segments created by an APPEND task. This method also handles 
segment
+   * upgrade scenarios that may result from concurrent append and replace. Also
+   * commits start and end {@link DataSourceMetadata}.
+   *
+   * @see #commitAppendSegments
+   * @see #commitSegmentsAndMetadata
+   */
+  SegmentPublishResult commitAppendSegmentsAndMetadata(
+      Set<DataSegment> appendSegments,
+      Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
+      DataSourceMetadata startMetadata,
+      DataSourceMetadata endMetadata
+  );
+
   /**
    * Commits segments created by a REPLACE task. This method also handles the
    * segment upgrade scenarios that may result from concurrent append and 
replace.
@@ -319,6 +334,23 @@ public interface IndexerMetadataStorageCoordinator
       Set<ReplaceTaskLock> locksHeldByReplaceTask
   );
 
+  /**
+   * Creates and inserts new IDs for the pending segments hat overlap with the 
given
+   * replace segments being committed. The newly created pending segment IDs:
+   * <ul>
+   * <li>Have the same interval and version as that of an overlapping segment
+   * committed by the REPLACE task.</li>
+   * <li>Cannot be committed but are only used to serve realtime queries 
against
+   * those versions.</li>
+   * </ul>
+   *
+   * @param replaceSegments Segments being committed by a REPLACE task
+   * @return Map from originally allocated pending segment to its new upgraded 
ID.
+   */
+  Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradePendingSegmentsOverlappingWith(
+      Set<DataSegment> replaceSegments
+  );
+
   /**
    * Retrieves data source's metadata from the metadata store. Returns null if 
there is no metadata.
    */
@@ -405,5 +437,4 @@ public interface IndexerMetadataStorageCoordinator
    * @return DataSegment used segment corresponding to given id
    */
   DataSegment retrieveSegmentForId(String id, boolean includeUnused);
-
 }
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 7eaac692f7c..226663c3233 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -105,6 +105,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   private static final Logger log = new 
Logger(IndexerSQLMetadataStorageCoordinator.class);
   private static final int MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE = 100;
 
+  private static final String UPGRADED_PENDING_SEGMENT_PREFIX = 
"upgraded_to_version__";
+
   private final ObjectMapper jsonMapper;
   private final MetadataStorageTablesConfig dbTables;
   private final SQLMetadataConnector connector;
@@ -237,44 +239,45 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
 
   /**
    * Fetches all the pending segments, whose interval overlaps with the given
-   * search interval from the metadata store.
+   * search interval from the metadata store. Returns a Map from the
+   * pending segment ID to the sequence name.
    */
-  private Set<SegmentIdWithShardSpec> getPendingSegmentsForIntervalWithHandle(
+  private Map<SegmentIdWithShardSpec, String> 
getPendingSegmentsForIntervalWithHandle(
       final Handle handle,
       final String dataSource,
       final Interval interval
   ) throws IOException
   {
-    final Set<SegmentIdWithShardSpec> identifiers = new HashSet<>();
-
-    final ResultIterator<byte[]> dbSegments =
+    final ResultIterator<PendingSegmentsRecord> dbSegments =
         handle.createQuery(
             StringUtils.format(
                 // This query might fail if the year has a different number of 
digits
                 // See https://github.com/apache/druid/pull/11582 for a 
similar issue
                 // Using long for these timestamps instead of varchar would 
give correct time comparisons
-                "SELECT payload FROM %1$s WHERE dataSource = :dataSource AND 
start < :end and %2$send%2$s > :start",
+                "SELECT sequence_name, payload FROM %1$s"
+                + " WHERE dataSource = :dataSource AND start < :end and 
%2$send%2$s > :start",
                 dbTables.getPendingSegmentsTable(), connector.getQuoteString()
             )
         )
               .bind("dataSource", dataSource)
               .bind("start", interval.getStart().toString())
               .bind("end", interval.getEnd().toString())
-              .map(ByteArrayMapper.FIRST)
+              .map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r))
               .iterator();
 
+    final Map<SegmentIdWithShardSpec, String> pendingSegmentToSequenceName = 
new HashMap<>();
     while (dbSegments.hasNext()) {
-      final byte[] payload = dbSegments.next();
-      final SegmentIdWithShardSpec identifier = jsonMapper.readValue(payload, 
SegmentIdWithShardSpec.class);
+      PendingSegmentsRecord record = dbSegments.next();
+      final SegmentIdWithShardSpec identifier = 
jsonMapper.readValue(record.payload, SegmentIdWithShardSpec.class);
 
       if (interval.overlaps(identifier.getInterval())) {
-        identifiers.add(identifier);
+        pendingSegmentToSequenceName.put(identifier, record.sequenceName);
       }
     }
 
     dbSegments.close();
 
-    return identifiers;
+    return pendingSegmentToSequenceName;
   }
 
   private SegmentTimeline getTimelineForIntervalsWithHandle(
@@ -417,7 +420,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
           (handle, transactionStatus) -> {
             final Set<DataSegment> segmentsToInsert = new 
HashSet<>(replaceSegments);
             segmentsToInsert.addAll(
-                getSegmentsToUpgradeOnReplace(handle, replaceSegments, 
locksHeldByReplaceTask)
+                createNewIdsOfAppendSegmentsAfterReplace(handle, 
replaceSegments, locksHeldByReplaceTask)
             );
             return SegmentPublishResult.ok(
                 insertSegments(handle, segmentsToInsert)
@@ -438,33 +441,28 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       final Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock
   )
   {
-    verifySegmentsToCommit(appendSegments);
-
-    final String dataSource = appendSegments.iterator().next().getDataSource();
-    final Set<DataSegment> upgradedSegments = connector.retryTransaction(
-        (handle, transactionStatus)
-            -> getSegmentsToUpgradeOnAppend(handle, dataSource, 
appendSegments),
-        0,
-        SQLMetadataConnector.DEFAULT_MAX_TRIES
+    return commitAppendSegmentsAndMetadataInTransaction(
+        appendSegments,
+        appendSegmentToReplaceLock,
+        null,
+        null
     );
+  }
 
-    // Create entries for all required versions of the append segments
-    final Set<DataSegment> allSegmentsToInsert = new HashSet<>(appendSegments);
-    allSegmentsToInsert.addAll(upgradedSegments);
-
-    try {
-      return connector.retryTransaction(
-          (handle, transactionStatus) -> {
-            insertIntoUpgradeSegmentsTable(handle, appendSegmentToReplaceLock);
-            return SegmentPublishResult.ok(insertSegments(handle, 
allSegmentsToInsert));
-          },
-          3,
-          getSqlMetadataMaxRetry()
-      );
-    }
-    catch (CallbackFailedException e) {
-      return SegmentPublishResult.fail(e.getMessage());
-    }
+  @Override
+  public SegmentPublishResult commitAppendSegmentsAndMetadata(
+      Set<DataSegment> appendSegments,
+      Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
+      DataSourceMetadata startMetadata,
+      DataSourceMetadata endMetadata
+  )
+  {
+    return commitAppendSegmentsAndMetadataInTransaction(
+        appendSegments,
+        appendSegmentToReplaceLock,
+        startMetadata,
+        endMetadata
+    );
   }
 
   @Override
@@ -601,6 +599,125 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     );
   }
 
+  @Override
+  public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradePendingSegmentsOverlappingWith(
+      Set<DataSegment> replaceSegments
+  )
+  {
+    if (replaceSegments.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    // Any replace interval has exactly one version of segments
+    final Map<Interval, DataSegment> replaceIntervalToMaxId = new HashMap<>();
+    for (DataSegment segment : replaceSegments) {
+      DataSegment committedMaxId = 
replaceIntervalToMaxId.get(segment.getInterval());
+      if (committedMaxId == null
+          || committedMaxId.getShardSpec().getPartitionNum() < 
segment.getShardSpec().getPartitionNum()) {
+        replaceIntervalToMaxId.put(segment.getInterval(), segment);
+      }
+    }
+
+    final String datasource = 
replaceSegments.iterator().next().getDataSource();
+    return connector.retryWithHandle(
+        handle -> upgradePendingSegments(handle, datasource, 
replaceIntervalToMaxId)
+    );
+  }
+
+  /**
+   * Creates and inserts new IDs for the pending segments contained in each 
replace
+   * interval. The newly created pending segment IDs
+   * <ul>
+   * <li>Have the same interval and version as that of an overlapping segment
+   * committed by the REPLACE task.</li>
+   * <li>Cannot be committed but are only used to serve realtime queries 
against
+   * those versions.</li>
+   * </ul>
+   *
+   * @return Map from original pending segment to the new upgraded ID.
+   */
+  private Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradePendingSegments(
+      Handle handle,
+      String datasource,
+      Map<Interval, DataSegment> replaceIntervalToMaxId
+  ) throws IOException
+  {
+    final Map<SegmentCreateRequest, SegmentIdWithShardSpec> 
newPendingSegmentVersions = new HashMap<>();
+    final Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
pendingSegmentToNewId = new HashMap<>();
+
+    for (Map.Entry<Interval, DataSegment> entry : 
replaceIntervalToMaxId.entrySet()) {
+      final Interval replaceInterval = entry.getKey();
+      final DataSegment maxSegmentId = entry.getValue();
+      final String replaceVersion = maxSegmentId.getVersion();
+
+      final int numCorePartitions = 
maxSegmentId.getShardSpec().getNumCorePartitions();
+      int currentPartitionNumber = 
maxSegmentId.getShardSpec().getPartitionNum();
+
+      final Map<SegmentIdWithShardSpec, String> overlappingPendingSegments
+          = getPendingSegmentsForIntervalWithHandle(handle, datasource, 
replaceInterval);
+
+      for (Map.Entry<SegmentIdWithShardSpec, String> overlappingPendingSegment
+          : overlappingPendingSegments.entrySet()) {
+        final SegmentIdWithShardSpec pendingSegmentId = 
overlappingPendingSegment.getKey();
+        final String pendingSegmentSequence = 
overlappingPendingSegment.getValue();
+        if (shouldUpgradePendingSegment(pendingSegmentId, 
pendingSegmentSequence, replaceInterval, replaceVersion)) {
+          // Ensure unique sequence_name_prev_id_sha1 by setting
+          // sequence_prev_id -> pendingSegmentId
+          // sequence_name -> prefix + replaceVersion
+          SegmentIdWithShardSpec newId = new SegmentIdWithShardSpec(
+              datasource,
+              replaceInterval,
+              replaceVersion,
+              new NumberedShardSpec(++currentPartitionNumber, 
numCorePartitions)
+          );
+          newPendingSegmentVersions.put(
+              new SegmentCreateRequest(
+                  UPGRADED_PENDING_SEGMENT_PREFIX + replaceVersion,
+                  pendingSegmentId.toString(),
+                  replaceVersion,
+                  NumberedPartialShardSpec.instance()
+              ),
+              newId
+          );
+          pendingSegmentToNewId.put(pendingSegmentId, newId);
+        }
+      }
+    }
+
+    // Do not skip lineage check so that the sequence_name_prev_id_sha1
+    // includes hash of both sequence_name and prev_segment_id
+    int numInsertedPendingSegments = insertPendingSegmentsIntoMetastore(
+        handle,
+        newPendingSegmentVersions,
+        datasource,
+        false
+    );
+    log.info(
+        "Inserted total [%d] new versions for [%d] pending segments.",
+        numInsertedPendingSegments, newPendingSegmentVersions.size()
+    );
+
+    return pendingSegmentToNewId;
+  }
+
+  private boolean shouldUpgradePendingSegment(
+      SegmentIdWithShardSpec pendingSegmentId,
+      String pendingSegmentSequenceName,
+      Interval replaceInterval,
+      String replaceVersion
+  )
+  {
+    if (pendingSegmentId.getVersion().compareTo(replaceVersion) >= 0) {
+      return false;
+    } else if (!replaceInterval.contains(pendingSegmentId.getInterval())) {
+      return false;
+    } else {
+      // Do not upgrade already upgraded pending segment
+      return pendingSegmentSequenceName == null
+             || 
!pendingSegmentSequenceName.startsWith(UPGRADED_PENDING_SEGMENT_PREFIX);
+    }
+  }
+
   @Nullable
   private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck(
       final Handle handle,
@@ -721,7 +838,6 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
         handle,
         createdSegments,
         dataSource,
-        interval,
         skipSegmentLineageCheck
     );
 
@@ -971,11 +1087,74 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     }
   }
 
-  private void insertPendingSegmentsIntoMetastore(
+  private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction(
+      Set<DataSegment> appendSegments,
+      Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
+      @Nullable DataSourceMetadata startMetadata,
+      @Nullable DataSourceMetadata endMetadata
+  )
+  {
+    verifySegmentsToCommit(appendSegments);
+    if ((startMetadata == null && endMetadata != null)
+        || (startMetadata != null && endMetadata == null)) {
+      throw new IllegalArgumentException("start/end metadata pair must be 
either null or non-null");
+    }
+
+    final String dataSource = appendSegments.iterator().next().getDataSource();
+    final Set<DataSegment> segmentIdsForNewVersions = 
connector.retryTransaction(
+        (handle, transactionStatus)
+            -> createNewIdsForAppendSegments(handle, dataSource, 
appendSegments),
+        0,
+        SQLMetadataConnector.DEFAULT_MAX_TRIES
+    );
+
+    // Create entries for all required versions of the append segments
+    final Set<DataSegment> allSegmentsToInsert = new HashSet<>(appendSegments);
+    allSegmentsToInsert.addAll(segmentIdsForNewVersions);
+
+    final AtomicBoolean metadataNotUpdated = new AtomicBoolean(false);
+    try {
+      return connector.retryTransaction(
+          (handle, transactionStatus) -> {
+            metadataNotUpdated.set(false);
+
+            if (startMetadata != null) {
+              final DataStoreMetadataUpdateResult metadataUpdateResult
+                  = updateDataSourceMetadataWithHandle(handle, dataSource, 
startMetadata, endMetadata);
+
+              if (metadataUpdateResult.isFailed()) {
+                transactionStatus.setRollbackOnly();
+                metadataNotUpdated.set(true);
+
+                if (metadataUpdateResult.canRetry()) {
+                  throw new 
RetryTransactionException(metadataUpdateResult.getErrorMsg());
+                } else {
+                  throw new 
RuntimeException(metadataUpdateResult.getErrorMsg());
+                }
+              }
+            }
+
+            insertIntoUpgradeSegmentsTable(handle, appendSegmentToReplaceLock);
+            return SegmentPublishResult.ok(insertSegments(handle, 
allSegmentsToInsert));
+          },
+          3,
+          getSqlMetadataMaxRetry()
+      );
+    }
+    catch (CallbackFailedException e) {
+      if (metadataNotUpdated.get()) {
+        // Return failed result if metadata was definitely not updated
+        return SegmentPublishResult.fail(e.getMessage());
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  private int insertPendingSegmentsIntoMetastore(
       Handle handle,
       Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments,
       String dataSource,
-      Interval interval,
       boolean skipSegmentLineageCheck
   ) throws JsonProcessingException
   {
@@ -996,6 +1175,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     for (Map.Entry<SegmentIdWithShardSpec, SegmentCreateRequest> entry : 
segmentIdToRequest.entrySet()) {
       final SegmentCreateRequest request = entry.getValue();
       final SegmentIdWithShardSpec segmentId = entry.getKey();
+      final Interval interval = segmentId.getInterval();
+
       insertBatch.add()
                  .bind("id", segmentId.toString())
                  .bind("dataSource", dataSource)
@@ -1010,7 +1191,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                  )
                  .bind("payload", jsonMapper.writeValueAsBytes(segmentId));
     }
-    insertBatch.execute();
+    int[] updated = insertBatch.execute();
+    return Arrays.stream(updated).sum();
   }
 
   private void insertPendingSegmentIntoMetastore(
@@ -1046,15 +1228,12 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   }
 
   /**
-   * Allocates and returns any extra versions that need to be committed for the
-   * given append segments.
-   * <p>
-   * This is typically needed when a REPLACE task started and finished after
-   * these append segments had already been allocated. As such,
-   * there would be some used segments in the DB with versions higher than 
these
-   * append segments.
+   * Creates new IDs for the given append segments if a REPLACE task started 
and
+   * finished after these append segments had already been allocated. The newly
+   * created IDs belong to the same interval and version as the segments 
committed
+   * by the REPLACE task.
    */
-  private Set<DataSegment> getSegmentsToUpgradeOnAppend(
+  private Set<DataSegment> createNewIdsForAppendSegments(
       Handle handle,
       String dataSource,
       Set<DataSegment> segmentsToAppend
@@ -1079,17 +1258,17 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
         Segments.INCLUDING_OVERSHADOWED
     );
 
-    final Map<String, Set<Interval>> committedVersionToIntervals = new 
HashMap<>();
-    final Map<Interval, Set<DataSegment>> committedIntervalToSegments = new 
HashMap<>();
+    final Map<String, Set<Interval>> overlappingVersionToIntervals = new 
HashMap<>();
+    final Map<Interval, Set<DataSegment>> overlappingIntervalToSegments = new 
HashMap<>();
     for (DataSegment segment : overlappingSegments) {
-      committedVersionToIntervals.computeIfAbsent(segment.getVersion(), v -> 
new HashSet<>())
+      overlappingVersionToIntervals.computeIfAbsent(segment.getVersion(), v -> 
new HashSet<>())
                                  .add(segment.getInterval());
-      committedIntervalToSegments.computeIfAbsent(segment.getInterval(), i -> 
new HashSet<>())
+      overlappingIntervalToSegments.computeIfAbsent(segment.getInterval(), i 
-> new HashSet<>())
                                  .add(segment);
     }
 
     final Set<DataSegment> upgradedSegments = new HashSet<>();
-    for (Map.Entry<String, Set<Interval>> entry : 
committedVersionToIntervals.entrySet()) {
+    for (Map.Entry<String, Set<Interval>> entry : 
overlappingVersionToIntervals.entrySet()) {
       final String upgradeVersion = entry.getKey();
       Map<Interval, Set<DataSegment>> segmentsToUpgrade = 
getSegmentsWithVersionLowerThan(
           upgradeVersion,
@@ -1097,12 +1276,18 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
           appendVersionToSegments
       );
       for (Map.Entry<Interval, Set<DataSegment>> upgradeEntry : 
segmentsToUpgrade.entrySet()) {
-        Set<DataSegment> segmentsUpgradedToVersion = upgradeSegmentsToVersion(
+        final Interval upgradeInterval = upgradeEntry.getKey();
+        final Set<DataSegment> segmentsAlreadyOnVersion
+            = overlappingIntervalToSegments.getOrDefault(upgradeInterval, 
Collections.emptySet())
+                                           .stream()
+                                           .filter(s -> 
s.getVersion().equals(upgradeVersion))
+                                           .collect(Collectors.toSet());
+        Set<DataSegment> segmentsUpgradedToVersion = 
createNewIdsForAppendSegmentsWithVersion(
             handle,
             upgradeVersion,
-            upgradeEntry.getKey(),
+            upgradeInterval,
             upgradeEntry.getValue(),
-            committedIntervalToSegments
+            segmentsAlreadyOnVersion
         );
         log.info("Upgraded [%d] segments to version[%s].", 
segmentsUpgradedToVersion.size(), upgradeVersion);
         upgradedSegments.addAll(segmentsUpgradedToVersion);
@@ -1150,23 +1335,20 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   }
 
   /**
-   * Computes new Segment IDs for the {@code segmentsToUpgrade} being upgraded
-   * to the given {@code upgradeVersion}.
+   * Computes new segment IDs that belong to the upgradeInterval and 
upgradeVersion.
+   *
+   * @param committedSegments Segments that already exist in the 
upgradeInterval
+   *                          at upgradeVersion.
    */
-  private Set<DataSegment> upgradeSegmentsToVersion(
+  private Set<DataSegment> createNewIdsForAppendSegmentsWithVersion(
       Handle handle,
       String upgradeVersion,
-      Interval interval,
+      Interval upgradeInterval,
       Set<DataSegment> segmentsToUpgrade,
-      Map<Interval, Set<DataSegment>> committedSegmentsByInterval
+      Set<DataSegment> committedSegments
   ) throws IOException
   {
-    final Set<DataSegment> committedSegments
-        = committedSegmentsByInterval.getOrDefault(interval, 
Collections.emptySet())
-                                     .stream()
-                                     .filter(s -> 
s.getVersion().equals(upgradeVersion))
-                                     .collect(Collectors.toSet());
-
+    // Find the committed segments with the higest partition number
     SegmentIdWithShardSpec committedMaxId = null;
     for (DataSegment committedSegment : committedSegments) {
       if (committedMaxId == null
@@ -1175,14 +1357,14 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       }
     }
 
-    // Get pending segments for the new version, if any
+    // Get pending segments for the new version to determine the next 
partition number to allocate
     final String dataSource = 
segmentsToUpgrade.iterator().next().getDataSource();
-    final Set<SegmentIdWithShardSpec> pendingSegments
-        = getPendingSegmentsForIntervalWithHandle(handle, dataSource, 
interval);
+    final Set<SegmentIdWithShardSpec> pendingSegmentIds
+        = getPendingSegmentsForIntervalWithHandle(handle, dataSource, 
upgradeInterval).keySet();
+    final Set<SegmentIdWithShardSpec> allAllocatedIds = new 
HashSet<>(pendingSegmentIds);
 
-    // Determine new IDs for each append segment by taking into account both
-    // committed and pending segments for this version
-    final Set<DataSegment> upgradedSegments = new HashSet<>();
+    // Create new IDs for each append segment
+    final Set<DataSegment> newSegmentIds = new HashSet<>();
     for (DataSegment segment : segmentsToUpgrade) {
       SegmentCreateRequest request = new SegmentCreateRequest(
           segment.getId() + "__" + upgradeVersion,
@@ -1190,19 +1372,21 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
           upgradeVersion,
           NumberedPartialShardSpec.instance()
       );
-      // allocate new segment id
+
+      // Create new segment ID based on committed segments, allocated pending 
segments
+      // and new IDs created so far in this method
       final SegmentIdWithShardSpec newId = createNewSegment(
           request,
           dataSource,
-          interval,
+          upgradeInterval,
           upgradeVersion,
           committedMaxId,
-          pendingSegments
+          allAllocatedIds
       );
 
-      // Add to set of pending segments so that shard specs are computed 
taking the new id into account
-      pendingSegments.add(newId);
-      upgradedSegments.add(
+      // Update the set so that subsequent segment IDs use a higher partition 
number
+      allAllocatedIds.add(newId);
+      newSegmentIds.add(
           DataSegment.builder(segment)
                      .interval(newId.getInterval())
                      .version(newId.getVersion())
@@ -1211,7 +1395,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       );
     }
 
-    return upgradedSegments;
+    return newSegmentIds;
   }
 
   private Map<SegmentCreateRequest, SegmentIdWithShardSpec> createNewSegments(
@@ -1278,7 +1462,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     // A pending segment having a higher partitionId must also be considered
     // to avoid clashes when inserting the pending segment created here.
     final Set<SegmentIdWithShardSpec> pendingSegments =
-        getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval);
+        new HashSet<>(getPendingSegmentsForIntervalWithHandle(handle, 
dataSource, interval).keySet());
 
     final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments = 
new HashMap<>();
     final Map<String, SegmentIdWithShardSpec> sequenceHashToSegment = new 
HashMap<>();
@@ -1328,23 +1512,24 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   {
     final PartialShardSpec partialShardSpec = request.getPartialShardSpec();
     final String existingVersion = request.getVersion();
+    final Set<SegmentIdWithShardSpec> mutablePendingSegments = new 
HashSet<>(pendingSegments);
 
     // Include the committedMaxId while computing the overallMaxId
     if (committedMaxId != null) {
-      pendingSegments.add(committedMaxId);
+      mutablePendingSegments.add(committedMaxId);
     }
 
     // If there is an existing chunk, find the max id with the same version as 
the existing chunk.
     // There may still be a pending segment with a higher version (but no 
corresponding used segments)
     // which may generate a clash with an existing segment once the new id is 
generated
     final SegmentIdWithShardSpec overallMaxId =
-        pendingSegments.stream()
-                       .filter(id -> 
id.getShardSpec().sharePartitionSpace(partialShardSpec))
-                       .filter(id -> versionOfExistingChunk == null
-                                     || 
id.getVersion().equals(versionOfExistingChunk))
-                       
.max(Comparator.comparing(SegmentIdWithShardSpec::getVersion)
-                                      .thenComparing(id -> 
id.getShardSpec().getPartitionNum()))
-                       .orElse(null);
+        mutablePendingSegments.stream()
+                              .filter(id -> 
id.getShardSpec().sharePartitionSpace(partialShardSpec))
+                              .filter(id -> versionOfExistingChunk == null
+                                            || 
id.getVersion().equals(versionOfExistingChunk))
+                              
.max(Comparator.comparing(SegmentIdWithShardSpec::getVersion)
+                                             .thenComparing(id -> 
id.getShardSpec().getPartitionNum()))
+                              .orElse(null);
 
     // Determine the version of the new segment
     final String newSegmentVersion;
@@ -1484,10 +1669,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       // across all shard specs (published + pending).
       // A pending segment having a higher partitionId must also be considered
       // to avoid clashes when inserting the pending segment created here.
-      final Set<SegmentIdWithShardSpec> pendings = 
getPendingSegmentsForIntervalWithHandle(
-          handle,
-          dataSource,
-          interval
+      final Set<SegmentIdWithShardSpec> pendings = new HashSet<>(
+          getPendingSegmentsForIntervalWithHandle(handle, dataSource, 
interval).keySet()
       );
       if (committedMaxId != null) {
         pendings.add(committedMaxId);
@@ -1666,7 +1849,10 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     return toInsertSegments;
   }
 
-  private Set<DataSegment> getSegmentsToUpgradeOnReplace(
+  /**
+   * Creates new versions of segments appended while a REPLACE task was in 
progress.
+   */
+  private Set<DataSegment> createNewIdsOfAppendSegmentsAfterReplace(
       final Handle handle,
       final Set<DataSegment> replaceSegments,
       final Set<ReplaceTaskLock> locksHeldByReplaceTask
diff --git 
a/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java 
b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java
new file mode 100644
index 00000000000..88ab4673aa8
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Specifies a policy to filter active locks held by a datasource
+ */
+public class LockFilterPolicy
+{
+  private final String datasource;
+  private final int priority;
+  private final Map<String, Object> context;
+
+  @JsonCreator
+  public LockFilterPolicy(
+      @JsonProperty("datasource") String datasource,
+      @JsonProperty("priority") int priority,
+      @JsonProperty("context") Map<String, Object> context
+  )
+  {
+    this.datasource = datasource;
+    this.priority = priority;
+    this.context = context == null ? Collections.emptyMap() : context;
+  }
+
+  @JsonProperty
+  public String getDatasource()
+  {
+    return datasource;
+  }
+
+  @JsonProperty
+  public int getPriority()
+  {
+    return priority;
+  }
+
+  @JsonProperty
+  public Map<String, Object> getContext()
+  {
+    return context;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    LockFilterPolicy that = (LockFilterPolicy) o;
+    return Objects.equals(datasource, that.datasource)
+           && priority == that.priority
+           && Objects.equals(context, that.context);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(datasource, priority, context);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java 
b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
index b4391dc0a8e..6a8e515b327 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
@@ -32,6 +32,7 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.rpc.ServiceRetryPolicy;
 import org.joda.time.Interval;
 
@@ -185,15 +186,15 @@ public interface OverlordClient
   ListenableFuture<CloseableIterator<SupervisorStatus>> supervisorStatuses();
 
   /**
-   * Returns a list of intervals locked by higher priority tasks for each 
datasource.
+   * Returns a list of intervals locked by higher priority conflicting lock 
types
    *
-   * @param minTaskPriority Minimum task priority for each datasource. Only 
the intervals that are locked by tasks with
-   *                        equal or higher priority than this are returned.
-   *
-   * @return Map from dtasource name to list of intervals locked by tasks that 
have priority greater than or equal to
-   * the {@code minTaskPriority} for that datasource.
+   * @param lockFilterPolicies List of all filters for different datasources
+   * @return Map from datasource name to list of intervals locked by tasks 
that have a conflicting lock type with
+   * priority greater than or equal to the {@code minTaskPriority} for that 
datasource.
    */
-  ListenableFuture<Map<String, List<Interval>>> 
findLockedIntervals(Map<String, Integer> minTaskPriority);
+  ListenableFuture<Map<String, List<Interval>>> findLockedIntervals(
+      List<LockFilterPolicy> lockFilterPolicies
+  );
 
   /**
    * Deletes pending segment records from the metadata store for a particular 
datasource. Records with
diff --git 
a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java 
b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java
index 306c909e404..d7fab4b75fa 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java
@@ -39,6 +39,7 @@ import 
org.apache.druid.java.util.common.parsers.CloseableIterator;
 import 
org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
 import 
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
 import 
org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
+import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.rpc.IgnoreHttpResponseHandler;
 import org.apache.druid.rpc.RequestBuilder;
 import org.apache.druid.rpc.ServiceClient;
@@ -188,14 +189,16 @@ public class OverlordClientImpl implements OverlordClient
   }
 
   @Override
-  public ListenableFuture<Map<String, List<Interval>>> 
findLockedIntervals(Map<String, Integer> minTaskPriority)
+  public ListenableFuture<Map<String, List<Interval>>> findLockedIntervals(
+      List<LockFilterPolicy> lockFilterPolicies
+  )
   {
-    final String path = "/druid/indexer/v1/lockedIntervals";
+    final String path = "/druid/indexer/v1/lockedIntervals/v2";
 
     return FutureUtils.transform(
         client.asyncRequest(
             new RequestBuilder(HttpMethod.POST, path)
-                .jsonContent(jsonMapper, minTaskPriority),
+                .jsonContent(jsonMapper, lockFilterPolicies),
             new BytesFullResponseHandler()
         ),
         holder -> {
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 46315dbc0de..81cd4db2556 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -70,6 +70,8 @@ import org.joda.time.Interval;
 
 import java.io.Closeable;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
@@ -92,6 +94,8 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
   private final Cache cache;
   private final CacheConfig cacheConfig;
   private final CachePopulatorStats cachePopulatorStats;
+  private final ConcurrentMap<SegmentDescriptor, SegmentDescriptor> 
newIdToBasePendingSegment
+      = new ConcurrentHashMap<>();
 
   public SinkQuerySegmentWalker(
       String dataSource,
@@ -182,7 +186,8 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
 
     Iterable<QueryRunner<T>> perSegmentRunners = Iterables.transform(
         specs,
-        descriptor -> {
+        newDescriptor -> {
+          final SegmentDescriptor descriptor = 
newIdToBasePendingSegment.getOrDefault(newDescriptor, newDescriptor);
           final PartitionChunk<Sink> chunk = sinkTimeline.findChunk(
               descriptor.getInterval(),
               descriptor.getVersion(),
@@ -297,6 +302,17 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
     );
   }
 
+  public void registerNewVersionOfPendingSegment(
+      SegmentIdWithShardSpec basePendingSegment,
+      SegmentIdWithShardSpec newSegmentVersion
+  )
+  {
+    newIdToBasePendingSegment.put(
+        newSegmentVersion.asSegmentId().toDescriptor(),
+        basePendingSegment.asSegmentId().toDescriptor()
+    );
+  }
+
   @VisibleForTesting
   String getDataSource()
   {
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
index e95852bfddb..f21f67ed504 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
@@ -72,6 +72,7 @@ import org.apache.druid.segment.realtime.FireHydrant;
 import org.apache.druid.segment.realtime.plumber.Sink;
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.joda.time.Interval;
 
@@ -86,6 +87,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -153,6 +155,9 @@ public class StreamAppenderator implements Appenderator
 
   private final AtomicBoolean closed = new AtomicBoolean(false);
 
+  private final ConcurrentHashMap<SegmentId, Set<SegmentIdWithShardSpec>>
+      baseSegmentToUpgradedVersions = new ConcurrentHashMap<>();
+
   private volatile ListeningExecutorService persistExecutor = null;
   private volatile ListeningExecutorService pushExecutor = null;
   // use intermediate executor so that deadlock conditions can be prevented
@@ -998,7 +1003,7 @@ public class StreamAppenderator implements Appenderator
     log.debug("Shutting down immediately...");
     for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
       try {
-        segmentAnnouncer.unannounceSegment(entry.getValue().getSegment());
+        unannounceAllVersionsOfSegment(entry.getValue().getSegment());
       }
       catch (Exception e) {
         log.makeAlert(e, "Failed to unannounce segment[%s]", 
schema.getDataSource())
@@ -1026,6 +1031,66 @@ public class StreamAppenderator implements Appenderator
     }
   }
 
+  /**
+   * Unannounces the given base segment and all its upgraded versions.
+   */
+  private void unannounceAllVersionsOfSegment(DataSegment baseSegment) throws 
IOException
+  {
+    segmentAnnouncer.unannounceSegment(baseSegment);
+
+    final Set<SegmentIdWithShardSpec> upgradedVersionsOfSegment
+        = baseSegmentToUpgradedVersions.remove(baseSegment.getId());
+    if (upgradedVersionsOfSegment == null || 
upgradedVersionsOfSegment.isEmpty()) {
+      return;
+    }
+
+    for (SegmentIdWithShardSpec newId : upgradedVersionsOfSegment) {
+      final DataSegment newSegment = new DataSegment(
+          newId.getDataSource(),
+          newId.getInterval(),
+          newId.getVersion(),
+          baseSegment.getLoadSpec(),
+          baseSegment.getDimensions(),
+          baseSegment.getMetrics(),
+          newId.getShardSpec(),
+          baseSegment.getBinaryVersion(),
+          baseSegment.getSize()
+      );
+      segmentAnnouncer.unannounceSegment(newSegment);
+    }
+  }
+
+  public void registerNewVersionOfPendingSegment(
+      SegmentIdWithShardSpec basePendingSegment,
+      SegmentIdWithShardSpec newSegmentVersion
+  ) throws IOException
+  {
+    if (!sinks.containsKey(basePendingSegment) || 
droppingSinks.contains(basePendingSegment)) {
+      return;
+    }
+
+    // Update query mapping with SinkQuerySegmentWalker
+    ((SinkQuerySegmentWalker) 
texasRanger).registerNewVersionOfPendingSegment(basePendingSegment, 
newSegmentVersion);
+
+    // Announce segments
+    final DataSegment baseSegment = sinks.get(basePendingSegment).getSegment();
+
+    final DataSegment newSegment = new DataSegment(
+        newSegmentVersion.getDataSource(),
+        newSegmentVersion.getInterval(),
+        newSegmentVersion.getVersion(),
+        baseSegment.getLoadSpec(),
+        baseSegment.getDimensions(),
+        baseSegment.getMetrics(),
+        newSegmentVersion.getShardSpec(),
+        baseSegment.getBinaryVersion(),
+        baseSegment.getSize()
+    );
+    segmentAnnouncer.announceSegment(newSegment);
+    
baseSegmentToUpgradedVersions.computeIfAbsent(basePendingSegment.asSegmentId(), 
id -> new HashSet<>())
+                                 .add(newSegmentVersion);
+  }
+
   private void lockBasePersistDirectory()
   {
     if (basePersistDirLock == null) {
@@ -1327,7 +1392,7 @@ public class StreamAppenderator implements Appenderator
 
             // Unannounce the segment.
             try {
-              segmentAnnouncer.unannounceSegment(sink.getSegment());
+              unannounceAllVersionsOfSegment(sink.getSegment());
             }
             catch (Exception e) {
               log.makeAlert(e, "Failed to unannounce segment[%s]", 
schema.getDataSource())
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 0c08da7c8dd..ab92f180149 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -42,6 +42,7 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.granularity.GranularityType;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
@@ -175,7 +176,7 @@ public class CompactSegments implements 
CoordinatorCustomDuty
     // Skip all the intervals locked by higher priority tasks for each 
datasource
     // This must be done after the invalid compaction tasks are cancelled
     // in the loop above so that their intervals are not considered locked
-    getLockedIntervalsToSkip(compactionConfigList).forEach(
+    getLockedIntervals(compactionConfigList).forEach(
         (dataSource, intervals) ->
             intervalsToSkipCompaction
                 .computeIfAbsent(dataSource, ds -> new ArrayList<>())
@@ -247,6 +248,7 @@ public class CompactSegments implements 
CoordinatorCustomDuty
 
   /**
    * Gets a List of Intervals locked by higher priority tasks for each 
datasource.
+   * However, when using a REPLACE lock for compaction, intervals locked with 
any APPEND lock will not be returned
    * Since compaction tasks submitted for these Intervals would have to wait 
anyway,
    * we skip these Intervals until the next compaction run.
    * <p>
@@ -254,25 +256,21 @@ public class CompactSegments implements 
CoordinatorCustomDuty
    * though they lock only a Segment and not the entire Interval. Thus,
    * a compaction task will not be submitted for an Interval if
    * <ul>
-   *   <li>either the whole Interval is locked by a higher priority Task</li>
+   *   <li>either the whole Interval is locked by a higher priority Task with 
an incompatible lock type</li>
    *   <li>or there is atleast one Segment in the Interval that is locked by a
    *   higher priority Task</li>
    * </ul>
    */
-  private Map<String, List<Interval>> getLockedIntervalsToSkip(
+  private Map<String, List<Interval>> getLockedIntervals(
       List<DataSourceCompactionConfig> compactionConfigs
   )
   {
-    final Map<String, Integer> minTaskPriority = compactionConfigs
+    final List<LockFilterPolicy> lockFilterPolicies = compactionConfigs
         .stream()
-        .collect(
-            Collectors.toMap(
-                DataSourceCompactionConfig::getDataSource,
-                DataSourceCompactionConfig::getTaskPriority
-            )
-        );
+        .map(config -> new LockFilterPolicy(config.getDataSource(), 
config.getTaskPriority(), config.getTaskContext()))
+        .collect(Collectors.toList());
     final Map<String, List<Interval>> datasourceToLockedIntervals =
-        new 
HashMap<>(FutureUtils.getUnchecked(overlordClient.findLockedIntervals(minTaskPriority),
 true));
+        new 
HashMap<>(FutureUtils.getUnchecked(overlordClient.findLockedIntervals(lockFilterPolicies),
 true));
     LOG.debug(
         "Skipping the following intervals for Compaction as they are currently 
locked: %s",
         datasourceToLockedIntervals
diff --git 
a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java 
b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
index 1e146736da1..301d9631b7d 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
@@ -61,6 +61,7 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.TimelineLookup;
 import org.apache.druid.timeline.TimelineObjectHolder;
+import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.PartitionChunk;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
@@ -870,29 +871,37 @@ public class DataSourcesResource
       final Interval theInterval = Intervals.of(interval);
       final SegmentDescriptor descriptor = new SegmentDescriptor(theInterval, 
version, partitionNumber);
       final DateTime now = DateTimes.nowUtc();
-      // dropped means a segment will never be handed off, i.e it completed 
hand off
-      // init to true, reset to false only if this segment can be loaded by 
rules
-      boolean dropped = true;
+
+      // A segment that is not eligible for load will never be handed off
+      boolean notEligibleForLoad = true;
       for (Rule rule : rules) {
         if (rule.appliesTo(theInterval, now)) {
           if (rule instanceof LoadRule) {
-            dropped = false;
+            notEligibleForLoad = false;
           }
           break;
         }
       }
-      if (dropped) {
+      if (notEligibleForLoad) {
         return Response.ok(true).build();
       }
 
-      TimelineLookup<String, SegmentLoadInfo> timeline = 
serverInventoryView.getTimeline(
+      VersionedIntervalTimeline<String, SegmentLoadInfo> timeline = 
serverInventoryView.getTimeline(
           new TableDataSource(dataSourceName)
       );
       if (timeline == null) {
-        log.debug("No timeline found for datasource[%s]", dataSourceName);
+        log.error("No timeline found for datasource[%s]", dataSourceName);
         return Response.ok(false).build();
       }
 
+      // A segment with version lower than that of the latest chunk might 
never get handed off
+      // If there are multiple versions of this segment (due to a concurrent 
replace task),
+      // only the latest version would get handed off
+      List<TimelineObjectHolder<String, SegmentLoadInfo>> timelineObjects = 
timeline.lookup(Intervals.of(interval));
+      if (!timelineObjects.isEmpty() && 
timelineObjects.get(0).getVersion().compareTo(version) > 0) {
+        return Response.ok(true).build();
+      }
+
       Iterable<ImmutableSegmentLoadInfo> servedSegmentsInInterval =
           prepareServedSegmentsInInterval(timeline, theInterval);
       if (isSegmentLoaded(servedSegmentsInInterval, descriptor)) {
diff --git 
a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java 
b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java
index 6882c6b762d..42ca59ffee7 100644
--- 
a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java
+++ 
b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java
@@ -24,6 +24,7 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.rpc.ServiceRetryPolicy;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.joda.time.Interval;
@@ -95,7 +96,9 @@ public class NoopOverlordClient implements OverlordClient
   }
 
   @Override
-  public ListenableFuture<Map<String, List<Interval>>> 
findLockedIntervals(Map<String, Integer> minTaskPriority)
+  public ListenableFuture<Map<String, List<Interval>>> findLockedIntervals(
+      List<LockFilterPolicy> lockFilterPolicies
+  )
   {
     throw new UnsupportedOperationException();
   }
diff --git 
a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
 
b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
index 5b9a88d5841..7ba5916a771 100644
--- 
a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
+++ 
b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
@@ -41,6 +41,7 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import 
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
+import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.rpc.HttpResponseException;
 import org.apache.druid.rpc.MockServiceClient;
 import org.apache.druid.rpc.RequestBuilder;
@@ -219,13 +220,15 @@ public class OverlordClientImplTest
   @Test
   public void test_findLockedIntervals() throws Exception
   {
-    final Map<String, Integer> priorityMap = ImmutableMap.of("foo", 3);
     final Map<String, List<Interval>> lockMap =
         ImmutableMap.of("foo", 
Collections.singletonList(Intervals.of("2000/2001")));
+    final List<LockFilterPolicy> requests = ImmutableList.of(
+        new LockFilterPolicy("foo", 3, null)
+    );
 
     serviceClient.expectAndRespond(
-        new RequestBuilder(HttpMethod.POST, 
"/druid/indexer/v1/lockedIntervals")
-            .jsonContent(jsonMapper, priorityMap),
+        new RequestBuilder(HttpMethod.POST, 
"/druid/indexer/v1/lockedIntervals/v2")
+            .jsonContent(jsonMapper, requests),
         HttpResponseStatus.OK,
         ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
         jsonMapper.writeValueAsBytes(lockMap)
@@ -233,18 +236,20 @@ public class OverlordClientImplTest
 
     Assert.assertEquals(
         lockMap,
-        overlordClient.findLockedIntervals(priorityMap).get()
+        overlordClient.findLockedIntervals(requests).get()
     );
   }
 
   @Test
   public void test_findLockedIntervals_nullReturn() throws Exception
   {
-    final Map<String, Integer> priorityMap = ImmutableMap.of("foo", 3);
+    final List<LockFilterPolicy> requests = ImmutableList.of(
+        new LockFilterPolicy("foo", 3, null)
+    );
 
     serviceClient.expectAndRespond(
-        new RequestBuilder(HttpMethod.POST, 
"/druid/indexer/v1/lockedIntervals")
-            .jsonContent(jsonMapper, priorityMap),
+        new RequestBuilder(HttpMethod.POST, 
"/druid/indexer/v1/lockedIntervals/v2")
+            .jsonContent(jsonMapper, requests),
         HttpResponseStatus.OK,
         ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
         jsonMapper.writeValueAsBytes(null)
@@ -252,7 +257,7 @@ public class OverlordClientImplTest
 
     Assert.assertEquals(
         Collections.emptyMap(),
-        overlordClient.findLockedIntervals(priorityMap).get()
+        overlordClient.findLockedIntervals(requests).get()
     );
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 2926dbd6d70..43b1c50c969 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -58,6 +58,7 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.filter.SelectorDimFilter;
@@ -1994,8 +1995,11 @@ public class CompactSegmentsTest
       return Futures.immediateFuture(null);
     }
 
+
     @Override
-    public ListenableFuture<Map<String, List<Interval>>> 
findLockedIntervals(Map<String, Integer> minTaskPriority)
+    public ListenableFuture<Map<String, List<Interval>>> findLockedIntervals(
+        List<LockFilterPolicy> lockFilterPolicies
+    )
     {
       return Futures.immediateFuture(lockedIntervals);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to