This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch 30.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/30.0.0 by this push:
     new b18d836624f [Backport] Check for handoff of upgraded segments (#16162) 
(#16344)
b18d836624f is described below

commit b18d836624f6925453cbd77200801d8d0c299269
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Tue Apr 30 15:09:17 2024 +0530

    [Backport] Check for handoff of upgraded segments (#16162) (#16344)
    
    Changes:
    1) Check for handoff of upgraded realtime segments.
    2) Drop sink only when all associated realtime segments have been abandoned.
    3) Delete pending segments upon commit to prevent unnecessary upgrades and
    partition space exhaustion when a concurrent replace happens. This also 
prevents
    potential data duplication.
    4) Register pending segment upgrade only on those tasks to which the 
segment is associated.
    
    Co-authored-by: Kashif Faraz <[email protected]>
---
 .../MaterializedViewSupervisor.java                |   6 -
 .../MaterializedViewSupervisorSpecTest.java        |   2 -
 .../apache/druid/msq/indexing/MSQWorkerTask.java   |  10 +-
 .../druid/msq/indexing/MSQWorkerTaskTest.java      |   8 -
 .../actions/SegmentTransactionalReplaceAction.java |  51 ++----
 .../task/batch/parallel/SinglePhaseSubTask.java    |   9 +-
 .../druid/indexing/overlord/TaskLockbox.java       |   2 +-
 .../overlord/supervisor/SupervisorManager.java     |  21 ++-
 .../seekablestream/PendingSegmentVersions.java     |  56 ------
 .../SeekableStreamIndexTaskClient.java             |  11 +-
 .../SeekableStreamIndexTaskClientAsyncImpl.java    |   7 +-
 .../SeekableStreamIndexTaskRunner.java             |  14 +-
 .../supervisor/SeekableStreamSupervisor.java       |  57 +++----
 .../common/task/concurrent/ActionsTestTask.java    |   6 +-
 .../ConcurrentReplaceAndStreamingAppendTest.java   |  63 ++-----
 .../druid/indexing/overlord/TaskLockboxTest.java   |   4 +-
 .../overlord/supervisor/SupervisorManagerTest.java |  51 ++++++
 .../SeekableStreamSupervisorStateTest.java         |  64 +++++--
 .../TestIndexerMetadataStorageCoordinator.java     |   6 +-
 .../IndexerMetadataStorageCoordinator.java         |   6 +-
 .../indexing/overlord/SegmentPublishResult.java    |  31 ++++
 .../overlord/supervisor/NoopSupervisorSpec.java    |   7 -
 .../indexing/overlord/supervisor/Supervisor.java   |   6 -
 .../IndexerSQLMetadataStorageCoordinator.java      |  78 +++++++--
 .../druid/metadata/PendingSegmentRecord.java       |  18 +-
 .../druid/metadata/SqlSegmentsMetadataQuery.java   |   2 +-
 .../appenderator/BaseAppenderatorDriver.java       |  21 ++-
 .../appenderator/SegmentsAndCommitMetadata.java    |  53 +++++-
 .../appenderator/SinkQuerySegmentWalker.java       |   6 +-
 .../realtime/appenderator/StreamAppenderator.java  | 189 +++++++++++++--------
 .../appenderator/StreamAppenderatorDriver.java     |  25 ++-
 .../StreamAppenderatorDriverFailTest.java          |   8 +-
 .../appenderator/StreamAppenderatorDriverTest.java |  75 ++++++++
 33 files changed, 581 insertions(+), 392 deletions(-)

diff --git 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index 7e0eaf60d83..9da665adde4 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -298,12 +298,6 @@ public class MaterializedViewSupervisor implements 
Supervisor
     throw new UnsupportedOperationException("Compute Lag Stats not supported 
in MaterializedViewSupervisor");
   }
 
-  @Override
-  public Set<String> getActiveRealtimeSequencePrefixes()
-  {
-    throw new UnsupportedOperationException();
-  }
-
   @Override
   public int getActiveTaskGroupsCount()
   {
diff --git 
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
 
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
index 365fb1751ea..14bd5987125 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
@@ -207,8 +207,6 @@ public class MaterializedViewSupervisorSpecTest
 
       Assert.assertThrows(UnsupportedOperationException.class, () -> 
supervisor.getActiveTaskGroupsCount());
 
-      Assert.assertThrows(UnsupportedOperationException.class, () -> 
supervisor.getActiveRealtimeSequencePrefixes());
-
       Callable<Integer> noop = new Callable<Integer>() {
         @Override
         public Integer call()
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
index c0494840207..b4d18ea390e 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
@@ -32,7 +32,6 @@ import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.AbstractTask;
-import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.msq.exec.MSQTasks;
 import org.apache.druid.msq.exec.Worker;
@@ -46,7 +45,7 @@ import java.util.Objects;
 import java.util.Set;
 
 @JsonTypeName(MSQWorkerTask.TYPE)
-public class MSQWorkerTask extends AbstractTask implements 
PendingSegmentAllocatingTask
+public class MSQWorkerTask extends AbstractTask
 {
   public static final String TYPE = "query_worker";
 
@@ -126,13 +125,6 @@ public class MSQWorkerTask extends AbstractTask implements 
PendingSegmentAllocat
     return ImmutableSet.of();
   }
 
-  @Override
-  public String getTaskAllocatorId()
-  {
-    return getControllerTaskId();
-  }
-
-
   @Override
   public boolean isReady(final TaskActionClient taskActionClient)
   {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
index 482d67d81ab..6eff77184ea 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
@@ -108,12 +108,4 @@ public class MSQWorkerTaskTest
     MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, 
dataSource, workerNumber, context, retryCount);
     Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty());
   }
-
-  @Test
-  public void testGetTaskAllocatorId()
-  {
-    MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, 
dataSource, workerNumber, context, retryCount);
-    Assert.assertEquals(controllerTaskId, msqWorkerTask.getTaskAllocatorId());
-  }
-
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
index f2b080cff6e..df188ac8153 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
@@ -34,13 +34,10 @@ import org.apache.druid.metadata.PendingSegmentRecord;
 import org.apache.druid.metadata.ReplaceTaskLock;
 import org.apache.druid.segment.SegmentSchemaMapping;
 import org.apache.druid.segment.SegmentUtils;
-import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.timeline.DataSegment;
 
 import javax.annotation.Nullable;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
+import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -155,7 +152,7 @@ public class SegmentTransactionalReplaceAction implements 
TaskAction<SegmentPubl
     // failure to upgrade pending segments does not affect success of the 
commit
     if (publishResult.isSuccess() && toolbox.getSupervisorManager() != null) {
       try {
-        registerUpgradedPendingSegmentsOnSupervisor(task, toolbox);
+        registerUpgradedPendingSegmentsOnSupervisor(task, toolbox, 
publishResult.getUpgradedPendingSegments());
       }
       catch (Exception e) {
         log.error(e, "Error while upgrading pending segments for task[%s]", 
task.getId());
@@ -168,7 +165,11 @@ public class SegmentTransactionalReplaceAction implements 
TaskAction<SegmentPubl
   /**
    * Registers upgraded pending segments on the active supervisor, if any
    */
-  private void registerUpgradedPendingSegmentsOnSupervisor(Task task, 
TaskActionToolbox toolbox)
+  private void registerUpgradedPendingSegmentsOnSupervisor(
+      Task task,
+      TaskActionToolbox toolbox,
+      List<PendingSegmentRecord> upgradedPendingSegments
+  )
   {
     final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
     final Optional<String> activeSupervisorIdWithAppendLock =
@@ -178,42 +179,10 @@ public class SegmentTransactionalReplaceAction implements 
TaskAction<SegmentPubl
       return;
     }
 
-    final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
-        .getTaskLockbox()
-        .getAllReplaceLocksForDatasource(task.getDataSource())
-        .stream()
-        .filter(lock -> task.getId().equals(lock.getSupervisorTaskId()))
-        .collect(Collectors.toSet());
-
-
-    Set<PendingSegmentRecord> pendingSegments = new HashSet<>();
-    for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
-      pendingSegments.addAll(
-          toolbox.getIndexerMetadataStorageCoordinator()
-                 .getPendingSegments(task.getDataSource(), 
replaceLock.getInterval())
-      );
-    }
-    Map<String, SegmentIdWithShardSpec> idToPendingSegment = new HashMap<>();
-    pendingSegments.forEach(pendingSegment -> idToPendingSegment.put(
-        pendingSegment.getId().asSegmentId().toString(),
-        pendingSegment.getId()
-    ));
-    Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> segmentToParent = new 
HashMap<>();
-    pendingSegments.forEach(pendingSegment -> {
-      if (pendingSegment.getUpgradedFromSegmentId() != null
-          && 
!pendingSegment.getUpgradedFromSegmentId().equals(pendingSegment.getId().asSegmentId().toString()))
 {
-        segmentToParent.put(
-            pendingSegment.getId(),
-            idToPendingSegment.get(pendingSegment.getUpgradedFromSegmentId())
-        );
-      }
-    });
-
-    segmentToParent.forEach(
-        (newId, oldId) -> 
supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor(
+    upgradedPendingSegments.forEach(
+        upgradedPendingSegment -> 
supervisorManager.registerUpgradedPendingSegmentOnSupervisor(
             activeSupervisorIdWithAppendLock.get(),
-            oldId,
-            newId
+            upgradedPendingSegment
         )
     );
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 0a1f00f9025..b8027fcc5ea 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -43,7 +43,6 @@ import org.apache.druid.indexing.common.task.AbstractTask;
 import org.apache.druid.indexing.common.task.BatchAppenderators;
 import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.indexing.common.task.IndexTaskUtils;
-import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
 import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
 import org.apache.druid.indexing.common.task.SegmentAllocators;
 import org.apache.druid.indexing.common.task.TaskResource;
@@ -109,7 +108,7 @@ import java.util.stream.Collectors;
  * generates and pushes segments, and reports them to the {@link 
SinglePhaseParallelIndexTaskRunner} instead of
  * publishing on its own.
  */
-public class SinglePhaseSubTask extends AbstractBatchSubtask implements 
ChatHandler, PendingSegmentAllocatingTask
+public class SinglePhaseSubTask extends AbstractBatchSubtask implements 
ChatHandler
 {
   public static final String TYPE = "single_phase_sub_task";
   public static final String OLD_TYPE_NAME = "index_sub";
@@ -240,12 +239,6 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
     return subtaskSpecId;
   }
 
-  @Override
-  public String getTaskAllocatorId()
-  {
-    return getGroupId();
-  }
-
   @Override
   public TaskStatus runTask(final TaskToolbox toolbox) throws Exception
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 7248fcab865..5d71940d470 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -1242,7 +1242,7 @@ public class TaskLockbox
               idsInSameGroup.remove(task.getId());
               if (idsInSameGroup.isEmpty()) {
                 final int pendingSegmentsDeleted
-                    = 
metadataStorageCoordinator.deletePendingSegmentsForTaskGroup(taskAllocatorId);
+                    = 
metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId(taskAllocatorId);
                 log.info(
                     "Deleted [%d] entries from pendingSegments table for 
pending segments group [%s] with APPEND locks.",
                     pendingSegmentsDeleted, taskAllocatorId
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index dd57b560660..288b2a14156 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -33,9 +33,9 @@ import 
org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.metadata.MetadataSupervisorManager;
+import org.apache.druid.metadata.PendingSegmentRecord;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
-import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 
 import javax.annotation.Nullable;
 
@@ -308,16 +308,19 @@ public class SupervisorManager
    * allows the supervisor to include the pending segment in queries fired 
against
    * that segment version.
    */
-  public boolean registerNewVersionOfPendingSegmentOnSupervisor(
+  public boolean registerUpgradedPendingSegmentOnSupervisor(
       String supervisorId,
-      SegmentIdWithShardSpec basePendingSegment,
-      SegmentIdWithShardSpec newSegmentVersion
+      PendingSegmentRecord upgradedPendingSegment
   )
   {
     try {
       Preconditions.checkNotNull(supervisorId, "supervisorId cannot be null");
-      Preconditions.checkNotNull(basePendingSegment, "rootPendingSegment 
cannot be null");
-      Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion cannot 
be null");
+      Preconditions.checkNotNull(upgradedPendingSegment, "upgraded pending 
segment cannot be null");
+      Preconditions.checkNotNull(upgradedPendingSegment.getTaskAllocatorId(), 
"taskAllocatorId cannot be null");
+      Preconditions.checkNotNull(
+          upgradedPendingSegment.getUpgradedFromSegmentId(),
+          "upgradedFromSegmentId cannot be null"
+      );
 
       Pair<Supervisor, SupervisorSpec> supervisor = 
supervisors.get(supervisorId);
       Preconditions.checkNotNull(supervisor, "supervisor could not be found");
@@ -326,12 +329,12 @@ public class SupervisorManager
       }
 
       SeekableStreamSupervisor<?, ?, ?> seekableStreamSupervisor = 
(SeekableStreamSupervisor<?, ?, ?>) supervisor.lhs;
-      
seekableStreamSupervisor.registerNewVersionOfPendingSegment(basePendingSegment, 
newSegmentVersion);
+      
seekableStreamSupervisor.registerNewVersionOfPendingSegment(upgradedPendingSegment);
       return true;
     }
     catch (Exception e) {
-      log.error(e, "PendingSegmentRecord[%s] mapping update request to 
version[%s] on Supervisor[%s] failed",
-                basePendingSegment.asSegmentId(), 
newSegmentVersion.getVersion(), supervisorId);
+      log.error(e, "Failed to upgrade pending segment[%s] to new pending 
segment[%s] on Supervisor[%s].",
+                upgradedPendingSegment.getUpgradedFromSegmentId(), 
upgradedPendingSegment.getId().getVersion(), supervisorId);
     }
     return false;
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java
deleted file mode 100644
index 146b0afc4b9..00000000000
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.seekablestream;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
-
-/**
- * Contains a new version of an existing base pending segment. Used by realtime
- * tasks to serve queries against multiple versions of the same pending 
segment.
- */
-public class PendingSegmentVersions
-{
-  private final SegmentIdWithShardSpec baseSegment;
-  private final SegmentIdWithShardSpec newVersion;
-
-  @JsonCreator
-  public PendingSegmentVersions(
-      @JsonProperty("baseSegment") SegmentIdWithShardSpec baseSegment,
-      @JsonProperty("newVersion") SegmentIdWithShardSpec newVersion
-  )
-  {
-    this.baseSegment = baseSegment;
-    this.newVersion = newVersion;
-  }
-
-  @JsonProperty
-  public SegmentIdWithShardSpec getBaseSegment()
-  {
-    return baseSegment;
-  }
-
-  @JsonProperty
-  public SegmentIdWithShardSpec getNewVersion()
-  {
-    return newVersion;
-  }
-}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java
index 5e592424960..7fd282e44ce 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java
@@ -21,8 +21,8 @@ package org.apache.druid.indexing.seekablestream;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.metadata.PendingSegmentRecord;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
-import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.joda.time.DateTime;
 
 import java.util.List;
@@ -158,15 +158,14 @@ public interface 
SeekableStreamIndexTaskClient<PartitionIdType, SequenceOffsetTy
    * Update the task state to redirect queries for later versions to the root 
pending segment.
    * The task also announces that it is serving the segments belonging to the 
subsequent versions.
    * The update is processed only if the task is serving the original pending 
segment.
-   * @param taskId - task id
-   * @param basePendingSegment - the pending segment that was originally 
allocated
-   * @param newVersionOfSegment - the ids belonging to the versions to which 
the root segment needs to be updated
+   *
+   * @param taskId               - task id
+   * @param pendingSegmentRecord - the ids belonging to the versions to which 
the root segment needs to be updated
    * @return true if the update succeeds
    */
   ListenableFuture<Boolean> registerNewVersionOfPendingSegmentAsync(
       String taskId,
-      SegmentIdWithShardSpec basePendingSegment,
-      SegmentIdWithShardSpec newVersionOfSegment
+      PendingSegmentRecord pendingSegmentRecord
   );
 
   Class<PartitionIdType> getPartitionType();
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java
index 40d475909e6..5de1cb50a97 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java
@@ -43,6 +43,7 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import 
org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
 import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
+import org.apache.druid.metadata.PendingSegmentRecord;
 import org.apache.druid.rpc.HttpResponseException;
 import org.apache.druid.rpc.IgnoreHttpResponseHandler;
 import org.apache.druid.rpc.RequestBuilder;
@@ -57,7 +58,6 @@ import org.apache.druid.rpc.ServiceRetryPolicy;
 import org.apache.druid.rpc.StandardRetryPolicy;
 import org.apache.druid.rpc.indexing.SpecificTaskRetryPolicy;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
-import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.joda.time.DateTime;
@@ -197,13 +197,12 @@ public abstract class 
SeekableStreamIndexTaskClientAsyncImpl<PartitionIdType, Se
   @Override
   public ListenableFuture<Boolean> registerNewVersionOfPendingSegmentAsync(
       String taskId,
-      SegmentIdWithShardSpec basePendingSegment,
-      SegmentIdWithShardSpec newVersionOfSegment
+      PendingSegmentRecord pendingSegmentRecord
   )
   {
     final RequestBuilder requestBuilder
         = new RequestBuilder(HttpMethod.POST, "/pendingSegmentVersion")
-        .jsonContent(jsonMapper, new 
PendingSegmentVersions(basePendingSegment, newVersionOfSegment));
+        .jsonContent(jsonMapper, pendingSegmentRecord);
 
     return makeRequest(taskId, requestBuilder)
         .handler(IgnoreHttpResponseHandler.INSTANCE)
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 96e1dd40145..94ce367fc84 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -78,6 +78,7 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.PendingSegmentRecord;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
@@ -1575,18 +1576,15 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   @Path("/pendingSegmentVersion")
   @Consumes(MediaType.APPLICATION_JSON)
   @Produces(MediaType.APPLICATION_JSON)
-  public Response registerNewVersionOfPendingSegment(
-      PendingSegmentVersions pendingSegmentVersions,
+  public Response registerUpgradedPendingSegment(
+      PendingSegmentRecord upgradedPendingSegment,
       // this field is only for internal purposes, shouldn't be usually set by 
users
       @Context final HttpServletRequest req
   )
   {
     authorizationCheck(req, Action.WRITE);
     try {
-      ((StreamAppenderator) appenderator).registerNewVersionOfPendingSegment(
-          pendingSegmentVersions.getBaseSegment(),
-          pendingSegmentVersions.getNewVersion()
-      );
+      ((StreamAppenderator) 
appenderator).registerUpgradedPendingSegment(upgradedPendingSegment);
       return Response.ok().build();
     }
     catch (DruidException e) {
@@ -1598,8 +1596,8 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     catch (Exception e) {
       log.error(
           e,
-          "Could not register new version[%s] of pending segment[%s]",
-          pendingSegmentVersions.getNewVersion(), 
pendingSegmentVersions.getBaseSegment()
+          "Could not register pending segment[%s] upgraded from[%s]",
+          upgradedPendingSegment.getId().asSegmentId(), 
upgradedPendingSegment.getUpgradedFromSegmentId()
       );
       return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index f15a975694f..58a433325a3 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -89,12 +89,12 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.metadata.MetadataSupervisorManager;
+import org.apache.druid.metadata.PendingSegmentRecord;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.ordering.StringComparators;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nonnull;
@@ -178,7 +178,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * time, there should only be up to a maximum of [taskCount] 
actively-reading task groups (tracked in the [activelyReadingTaskGroups]
    * map) + zero or more pending-completion task groups (tracked in 
[pendingCompletionTaskGroups]).
    */
-  private class TaskGroup
+  @VisibleForTesting
+  public class TaskGroup
   {
     final int groupId;
 
@@ -265,6 +266,11 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       return tasks.keySet();
     }
 
+    @VisibleForTesting
+    public String getBaseSequenceName()
+    {
+      return baseSequenceName;
+    }
   }
 
   private class TaskData
@@ -1096,42 +1102,23 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     addNotice(new ResetOffsetsNotice(resetDataSourceMetadata));
   }
 
-  /**
-   * The base sequence name of a seekable stream task group is used as a 
prefix of the sequence names
-   * of pending segments published by it.
-   * This method can be used to identify the active pending segments for a 
datasource
-   * by checking if the sequence name begins with any of the active realtime 
sequence prefix returned by this method
-   * @return the set of base sequence names of both active and pending 
completion task gruops.
-   */
-  @Override
-  public Set<String> getActiveRealtimeSequencePrefixes()
-  {
-    final Set<String> activeBaseSequences = new HashSet<>();
-    for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
-      activeBaseSequences.add(taskGroup.baseSequenceName);
-    }
-    for (List<TaskGroup> taskGroupList : pendingCompletionTaskGroups.values()) 
{
-      for (TaskGroup taskGroup : taskGroupList) {
-        activeBaseSequences.add(taskGroup.baseSequenceName);
-      }
-    }
-    return activeBaseSequences;
-  }
-
   public void registerNewVersionOfPendingSegment(
-      SegmentIdWithShardSpec basePendingSegment,
-      SegmentIdWithShardSpec newSegmentVersion
+      PendingSegmentRecord pendingSegmentRecord
   )
   {
     for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
-      for (String taskId : taskGroup.taskIds()) {
-        taskClient.registerNewVersionOfPendingSegmentAsync(taskId, 
basePendingSegment, newSegmentVersion);
+      if 
(taskGroup.baseSequenceName.equals(pendingSegmentRecord.getTaskAllocatorId())) {
+        for (String taskId : taskGroup.taskIds()) {
+          taskClient.registerNewVersionOfPendingSegmentAsync(taskId, 
pendingSegmentRecord);
+        }
       }
     }
     for (List<TaskGroup> taskGroupList : pendingCompletionTaskGroups.values()) 
{
       for (TaskGroup taskGroup : taskGroupList) {
-        for (String taskId : taskGroup.taskIds()) {
-          taskClient.registerNewVersionOfPendingSegmentAsync(taskId, 
basePendingSegment, newSegmentVersion);
+        if 
(taskGroup.baseSequenceName.equals(pendingSegmentRecord.getTaskAllocatorId())) {
+          for (String taskId : taskGroup.taskIds()) {
+            taskClient.registerNewVersionOfPendingSegmentAsync(taskId, 
pendingSegmentRecord);
+          }
         }
       }
     }
@@ -1548,7 +1535,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   }
 
   @VisibleForTesting
-  public void addTaskGroupToActivelyReadingTaskGroup(
+  public TaskGroup addTaskGroupToActivelyReadingTaskGroup(
       int taskGroupId,
       ImmutableMap<PartitionIdType, SequenceOffsetType> partitionOffsets,
       Optional<DateTime> minMsgTime,
@@ -1572,10 +1559,11 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           taskGroupId
       );
     }
+    return group;
   }
 
   @VisibleForTesting
-  public void addTaskGroupToPendingCompletionTaskGroup(
+  public TaskGroup addTaskGroupToPendingCompletionTaskGroup(
       int taskGroupId,
       ImmutableMap<PartitionIdType, SequenceOffsetType> partitionOffsets,
       Optional<DateTime> minMsgTime,
@@ -1595,6 +1583,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x -> 
new TaskData())));
     pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, x -> new 
CopyOnWriteArrayList<>())
                                .add(group);
+    return group;
   }
 
   @VisibleForTesting
@@ -3202,9 +3191,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         // If we received invalid endOffset values, we clear the known offset 
to refetch the last committed offset
         // from metadata. If any endOffset values are invalid, we treat the 
entire set as invalid as a safety measure.
         if (!endOffsetsAreInvalid) {
-          for (Entry<PartitionIdType, SequenceOffsetType> entry : 
endOffsets.entrySet()) {
-            partitionOffsets.put(entry.getKey(), entry.getValue());
-          }
+          partitionOffsets.putAll(endOffsets);
         } else {
           for (Entry<PartitionIdType, SequenceOffsetType> entry : 
endOffsets.entrySet()) {
             partitionOffsets.put(entry.getKey(), getNotSetMarker());
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
index b80641fe94b..62b5e48e00b 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
@@ -52,7 +52,7 @@ public class ActionsTestTask extends CommandQueueTask
 {
   private final TaskActionClient client;
   private final AtomicInteger sequenceId = new AtomicInteger(0);
-  private final Map<SegmentId, SegmentId> announcedSegmentsToParentSegments = 
new HashMap<>();
+  private final Map<SegmentId, String> announcedSegmentsToParentSegments = new 
HashMap<>();
 
   public ActionsTestTask(String datasource, String groupId, 
TaskActionClientFactory factory)
   {
@@ -82,7 +82,7 @@ public class ActionsTestTask extends CommandQueueTask
     );
   }
 
-  public Map<SegmentId, SegmentId> getAnnouncedSegmentsToParentSegments()
+  public Map<SegmentId, String> getAnnouncedSegmentsToParentSegments()
   {
     return announcedSegmentsToParentSegments;
   }
@@ -114,7 +114,7 @@ public class ActionsTestTask extends CommandQueueTask
             TaskLockType.APPEND
         )
     );
-    announcedSegmentsToParentSegments.put(pendingSegment.asSegmentId(), 
pendingSegment.asSegmentId());
+    announcedSegmentsToParentSegments.put(pendingSegment.asSegmentId(), 
pendingSegment.asSegmentId().toString());
     return pendingSegment;
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
index 50c318683e8..7da5a3d19fe 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
@@ -55,6 +55,7 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.metadata.PendingSegmentRecord;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
@@ -74,7 +75,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -83,7 +83,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -122,10 +121,9 @@ public class ConcurrentReplaceAndStreamingAppendTest 
extends IngestionTestBase
   private final AtomicInteger groupId = new AtomicInteger(0);
   private final SupervisorManager supervisorManager = 
EasyMock.mock(SupervisorManager.class);
   private Capture<String> supervisorId;
-  private Capture<SegmentIdWithShardSpec> oldPendingSegment;
-  private Capture<SegmentIdWithShardSpec> newPendingSegment;
+  private Capture<PendingSegmentRecord> pendingSegment;
   private Map<String, Map<Interval, Set<Object>>> versionToIntervalToLoadSpecs;
-  private Map<SegmentId, Object> parentSegmentToLoadSpec;
+  private Map<String, Object> parentSegmentToLoadSpec;
 
   @Override
   @Before
@@ -169,12 +167,10 @@ public class ConcurrentReplaceAndStreamingAppendTest 
extends IngestionTestBase
     groupId.set(0);
     appendTask = createAndStartTask();
     supervisorId = Capture.newInstance(CaptureType.ALL);
-    oldPendingSegment = Capture.newInstance(CaptureType.ALL);
-    newPendingSegment = Capture.newInstance(CaptureType.ALL);
-    
EasyMock.expect(supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor(
+    pendingSegment = Capture.newInstance(CaptureType.ALL);
+    
EasyMock.expect(supervisorManager.registerUpgradedPendingSegmentOnSupervisor(
         EasyMock.capture(supervisorId),
-        EasyMock.capture(oldPendingSegment),
-        EasyMock.capture(newPendingSegment)
+        EasyMock.capture(pendingSegment)
     )).andReturn(true).anyTimes();
     replaceTask = createAndStartTask();
     EasyMock.replay(supervisorManager);
@@ -682,20 +678,6 @@ public class ConcurrentReplaceAndStreamingAppendTest 
extends IngestionTestBase
     verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11, 
segmentV12);
   }
 
-
-  @Nullable
-  private DataSegment findSegmentWith(String version, Map<String, Object> 
loadSpec, Set<DataSegment> segments)
-  {
-    for (DataSegment segment : segments) {
-      if (version.equals(segment.getVersion())
-          && Objects.equals(segment.getLoadSpec(), loadSpec)) {
-        return segment;
-      }
-    }
-
-    return null;
-  }
-
   private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment)
   {
     final SegmentId id = pendingSegment.asSegmentId();
@@ -739,23 +721,6 @@ public class ConcurrentReplaceAndStreamingAppendTest 
extends IngestionTestBase
     }
   }
 
-  private void verifyInputSegments(Task task, Interval interval, 
DataSegment... expectedSegments)
-  {
-    try {
-      final TaskActionClient taskActionClient = 
taskActionClientFactory.create(task);
-      Collection<DataSegment> allUsedSegments = taskActionClient.submit(
-          new RetrieveUsedSegmentsAction(
-              WIKI,
-              Collections.singletonList(interval)
-          )
-      );
-      Assert.assertEquals(Sets.newHashSet(expectedSegments), 
Sets.newHashSet(allUsedSegments));
-    }
-    catch (IOException e) {
-      throw new ISE(e, "Error while fetching segments to replace in 
interval[%s]", interval);
-    }
-  }
-
   private TaskToolboxFactory createToolboxFactory(
       TaskConfig taskConfig,
       TaskActionClientFactory taskActionClientFactory
@@ -799,11 +764,10 @@ public class ConcurrentReplaceAndStreamingAppendTest 
extends IngestionTestBase
   {
     replaceTask.commitReplaceSegments(dataSegments);
     for (int i = 0; i < supervisorId.getValues().size(); i++) {
-      announceUpgradedPendingSegment(oldPendingSegment.getValues().get(i), 
newPendingSegment.getValues().get(i));
+      announceUpgradedPendingSegment(pendingSegment.getValues().get(i));
     }
     supervisorId.reset();
-    oldPendingSegment.reset();
-    newPendingSegment.reset();
+    pendingSegment.reset();
     replaceTask.finishRunAndGetStatus();
   }
 
@@ -812,19 +776,16 @@ public class ConcurrentReplaceAndStreamingAppendTest 
extends IngestionTestBase
     SegmentPublishResult result = 
appendTask.commitAppendSegments(dataSegments);
     result.getSegments().forEach(this::unannounceUpgradedPendingSegment);
     for (DataSegment segment : dataSegments) {
-      parentSegmentToLoadSpec.put(segment.getId(), 
Iterables.getOnlyElement(segment.getLoadSpec().values()));
+      parentSegmentToLoadSpec.put(segment.getId().toString(), 
Iterables.getOnlyElement(segment.getLoadSpec().values()));
     }
     appendTask.finishRunAndGetStatus();
     return result;
   }
 
-  private void announceUpgradedPendingSegment(
-      SegmentIdWithShardSpec oldPendingSegment,
-      SegmentIdWithShardSpec newPendingSegment
-  )
+  private void announceUpgradedPendingSegment(PendingSegmentRecord 
pendingSegment)
   {
     appendTask.getAnnouncedSegmentsToParentSegments()
-              .put(newPendingSegment.asSegmentId(), 
oldPendingSegment.asSegmentId());
+              .put(pendingSegment.getId().asSegmentId(), 
pendingSegment.getUpgradedFromSegmentId());
   }
 
   private void unannounceUpgradedPendingSegment(
@@ -849,7 +810,7 @@ public class ConcurrentReplaceAndStreamingAppendTest 
extends IngestionTestBase
       loadSpecs.add(loadSpec);
     }
 
-    for (Map.Entry<SegmentId, SegmentId> entry : 
appendTask.getAnnouncedSegmentsToParentSegments().entrySet()) {
+    for (Map.Entry<SegmentId, String> entry : 
appendTask.getAnnouncedSegmentsToParentSegments().entrySet()) {
       final String version = entry.getKey().getVersion();
       final Interval interval = entry.getKey().getInterval();
       final Object loadSpec = parentSegmentToLoadSpec.get(entry.getValue());
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 7c16e2efc24..3af74235e82 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -1948,8 +1948,8 @@ public class TaskLockboxTest
     // Only the replaceTask should attempt a delete on the upgradeSegments 
table
     
EasyMock.expect(coordinator.deleteUpgradeSegmentsForTask(replaceTask.getId())).andReturn(0).once();
     // Any task may attempt pending segment clean up
-    
EasyMock.expect(coordinator.deletePendingSegmentsForTaskGroup(replaceTask.getId())).andReturn(0).once();
-    
EasyMock.expect(coordinator.deletePendingSegmentsForTaskGroup(appendTask.getId())).andReturn(0).once();
+    
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(replaceTask.getId())).andReturn(0).once();
+    
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(appendTask.getId())).andReturn(0).once();
     EasyMock.replay(coordinator);
 
     final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index 5ffbd4b9460..4a9fccd4663 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -31,7 +31,11 @@ import 
org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMeta
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.metadata.MetadataSupervisorManager;
+import org.apache.druid.metadata.PendingSegmentRecord;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
@@ -544,6 +548,53 @@ public class SupervisorManagerTest extends EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void testRegisterUpgradedPendingSegmentOnSupervisor()
+  {
+    
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(Collections.emptyMap());
+
+    NoopSupervisorSpec noopSpec = new NoopSupervisorSpec("noop", 
ImmutableList.of("noopDS"));
+    metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
+
+    SeekableStreamSupervisorSpec streamingSpec = 
EasyMock.mock(SeekableStreamSupervisorSpec.class);
+    SeekableStreamSupervisor streamSupervisor = 
EasyMock.mock(SeekableStreamSupervisor.class);
+    streamSupervisor.registerNewVersionOfPendingSegment(EasyMock.anyObject());
+    EasyMock.expectLastCall().once();
+    EasyMock.expect(streamingSpec.getId()).andReturn("sss").anyTimes();
+    EasyMock.expect(streamingSpec.isSuspended()).andReturn(false).anyTimes();
+    
EasyMock.expect(streamingSpec.getDataSources()).andReturn(ImmutableList.of("DS")).anyTimes();
+    
EasyMock.expect(streamingSpec.createSupervisor()).andReturn(streamSupervisor).anyTimes();
+    
EasyMock.expect(streamingSpec.createAutoscaler(streamSupervisor)).andReturn(null).anyTimes();
+    EasyMock.expect(streamingSpec.getContext()).andReturn(null).anyTimes();
+    EasyMock.replay(streamingSpec);
+    metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
+    EasyMock.expectLastCall().once();
+
+    replayAll();
+
+    final PendingSegmentRecord pendingSegment = new PendingSegmentRecord(
+        new SegmentIdWithShardSpec(
+            "DS",
+            Intervals.ETERNITY,
+            "version",
+            new NumberedShardSpec(0, 0)
+        ),
+        "sequenceName",
+        "prevSegmentId",
+        "upgradedFromSegmentId",
+        "taskAllocatorId"
+    );
+    manager.start();
+
+    manager.createOrUpdateAndStartSupervisor(noopSpec);
+    
Assert.assertFalse(manager.registerUpgradedPendingSegmentOnSupervisor("noop", 
pendingSegment));
+
+    manager.createOrUpdateAndStartSupervisor(streamingSpec);
+    
Assert.assertTrue(manager.registerUpgradedPendingSegmentOnSupervisor("sss", 
pendingSegment));
+
+    verifyAll();
+  }
+
   private static class TestSupervisorSpec implements SupervisorSpec
   {
     private final String id;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 2602f8e5441..489315cc249 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -74,11 +74,13 @@ import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.metadata.PendingSegmentRecord;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -86,6 +88,10 @@ import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
 import org.hamcrest.MatcherAssert;
@@ -1548,10 +1554,19 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
   }
 
   @Test
-  public void testGetActiveRealtimeSequencePrefixes()
+  public void testRegisterNewVersionOfPendingSegment()
   {
     EasyMock.expect(spec.isSuspended()).andReturn(false);
 
+    Capture<PendingSegmentRecord> captured0 = 
Capture.newInstance(CaptureType.FIRST);
+    Capture<PendingSegmentRecord> captured1 = 
Capture.newInstance(CaptureType.FIRST);
+    EasyMock.expect(
+        
indexTaskClient.registerNewVersionOfPendingSegmentAsync(EasyMock.eq("task0"), 
EasyMock.capture(captured0))
+    ).andReturn(Futures.immediateFuture(true));
+    EasyMock.expect(
+        
indexTaskClient.registerNewVersionOfPendingSegmentAsync(EasyMock.eq("task2"), 
EasyMock.capture(captured1))
+    ).andReturn(Futures.immediateFuture(true));
+
     replayAll();
 
     final TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor();
@@ -1559,34 +1574,63 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     // Spin off two active tasks with each task serving one partition.
     supervisor.getIoConfig().setTaskCount(3);
     supervisor.start();
-    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+
+    final SeekableStreamSupervisor.TaskGroup taskGroup0 = 
supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("0"),
         ImmutableMap.of("0", "5"),
         Optional.absent(),
         Optional.absent(),
-        ImmutableSet.of("task1"),
+        ImmutableSet.of("task0"),
         ImmutableSet.of()
     );
-
-    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+    final SeekableStreamSupervisor.TaskGroup taskGroup1 = 
supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("1"),
         ImmutableMap.of("1", "6"),
         Optional.absent(),
         Optional.absent(),
-        ImmutableSet.of("task2"),
+        ImmutableSet.of("task1"),
         ImmutableSet.of()
     );
-
-    supervisor.addTaskGroupToPendingCompletionTaskGroup(
+    final SeekableStreamSupervisor.TaskGroup taskGroup2 = 
supervisor.addTaskGroupToPendingCompletionTaskGroup(
         supervisor.getTaskGroupIdForPartition("2"),
         ImmutableMap.of("2", "100"),
         Optional.absent(),
         Optional.absent(),
-        ImmutableSet.of("task3"),
+        ImmutableSet.of("task2"),
         ImmutableSet.of()
     );
 
-    Assert.assertEquals(3, 
supervisor.getActiveRealtimeSequencePrefixes().size());
+    final PendingSegmentRecord pendingSegmentRecord0 = new 
PendingSegmentRecord(
+        new SegmentIdWithShardSpec(
+            "DS",
+            Intervals.of("2024/2025"),
+            "2024",
+            new NumberedShardSpec(1, 0)
+        ),
+        taskGroup0.getBaseSequenceName(),
+        "prevId0",
+        "someAppendedSegment0",
+        taskGroup0.getBaseSequenceName()
+    );
+    final PendingSegmentRecord pendingSegmentRecord1 = new 
PendingSegmentRecord(
+        new SegmentIdWithShardSpec(
+            "DS",
+            Intervals.of("2024/2025"),
+            "2024",
+            new NumberedShardSpec(2, 0)
+        ),
+        taskGroup2.getBaseSequenceName(),
+        "prevId1",
+        "someAppendedSegment1",
+        taskGroup2.getBaseSequenceName()
+    );
+
+    supervisor.registerNewVersionOfPendingSegment(pendingSegmentRecord0);
+    supervisor.registerNewVersionOfPendingSegment(pendingSegmentRecord1);
+
+    Assert.assertEquals(pendingSegmentRecord0, captured0.getValue());
+    Assert.assertEquals(pendingSegmentRecord1, captured1.getValue());
+    verifyAll();
   }
 
   @Test
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index 1de41bb43a0..6c4f556133e 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -252,11 +252,11 @@ public class TestIndexerMetadataStorageCoordinator 
implements IndexerMetadataSto
   }
 
   @Override
-  public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradePendingSegmentsOverlappingWith(
+  public List<PendingSegmentRecord> upgradePendingSegmentsOverlappingWith(
       Set<DataSegment> replaceSegments
   )
   {
-    return Collections.emptyMap();
+    return Collections.emptyList();
   }
 
   @Override
@@ -297,7 +297,7 @@ public class TestIndexerMetadataStorageCoordinator 
implements IndexerMetadataSto
   }
 
   @Override
-  public int deletePendingSegmentsForTaskGroup(final String taskGroup)
+  public int deletePendingSegmentsForTaskAllocatorId(final String taskGroup)
   {
     throw new UnsupportedOperationException();
   }
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 23513c82ad7..aea2674f6b8 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -389,9 +389,9 @@ public interface IndexerMetadataStorageCoordinator
    * </ul>
    *
    * @param replaceSegments Segments being committed by a REPLACE task
-   * @return Map from originally allocated pending segment to its new upgraded 
ID.
+   * @return List of inserted pending segment records
    */
-  Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradePendingSegmentsOverlappingWith(
+  List<PendingSegmentRecord> upgradePendingSegmentsOverlappingWith(
       Set<DataSegment> replaceSegments
   );
 
@@ -495,7 +495,7 @@ public interface IndexerMetadataStorageCoordinator
    * @param taskAllocatorId task id / task group / replica group for an 
appending task
    * @return number of pending segments deleted from the metadata store
    */
-  int deletePendingSegmentsForTaskGroup(String taskAllocatorId);
+  int deletePendingSegmentsForTaskAllocatorId(String taskAllocatorId);
 
   /**
    * Fetches all the pending segments of the datasource that overlap with a 
given interval.
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java
index 620ff8831b0..e4bc1645f71 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java
@@ -23,10 +23,13 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.metadata.PendingSegmentRecord;
 import org.apache.druid.segment.SegmentUtils;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.utils.CollectionUtils;
 
 import javax.annotation.Nullable;
+import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 
@@ -46,12 +49,19 @@ public class SegmentPublishResult
   private final boolean success;
   @Nullable
   private final String errorMsg;
+  @Nullable
+  private final List<PendingSegmentRecord> upgradedPendingSegments;
 
   public static SegmentPublishResult ok(Set<DataSegment> segments)
   {
     return new SegmentPublishResult(segments, true, null);
   }
 
+  public static SegmentPublishResult ok(Set<DataSegment> segments, 
List<PendingSegmentRecord> upgradedPendingSegments)
+  {
+    return new SegmentPublishResult(segments, true, null, 
upgradedPendingSegments);
+  }
+
   public static SegmentPublishResult fail(String errorMsg)
   {
     return new SegmentPublishResult(ImmutableSet.of(), false, errorMsg);
@@ -63,13 +73,28 @@ public class SegmentPublishResult
       @JsonProperty("success") boolean success,
       @JsonProperty("errorMsg") @Nullable String errorMsg
   )
+  {
+    this(segments, success, errorMsg, null);
+  }
+
+  private SegmentPublishResult(
+      Set<DataSegment> segments,
+      boolean success,
+       @Nullable String errorMsg,
+      List<PendingSegmentRecord> upgradedPendingSegments
+  )
   {
     this.segments = Preconditions.checkNotNull(segments, "segments");
     this.success = success;
     this.errorMsg = errorMsg;
+    this.upgradedPendingSegments = upgradedPendingSegments;
 
     if (!success) {
       Preconditions.checkArgument(segments.isEmpty(), "segments must be empty 
for unsuccessful publishes");
+      Preconditions.checkArgument(
+          CollectionUtils.isNullOrEmpty(upgradedPendingSegments),
+          "upgraded pending segments must be null or empty for unsuccessful 
publishes"
+      );
     }
   }
 
@@ -92,6 +117,12 @@ public class SegmentPublishResult
     return errorMsg;
   }
 
+  @Nullable
+  public List<PendingSegmentRecord> getUpgradedPendingSegments()
+  {
+    return upgradedPendingSegments;
+  }
+
   @Override
   public boolean equals(Object o)
   {
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index 20c10253386..e733ef6c233 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -31,7 +31,6 @@ import org.apache.druid.server.security.ResourceAction;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -186,12 +185,6 @@ public class NoopSupervisorSpec implements SupervisorSpec
       {
         return -1;
       }
-
-      @Override
-      public Set<String> getActiveRealtimeSequencePrefixes()
-      {
-        return Collections.emptySet();
-      }
     };
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index 9b9511cbf3d..b1fb439184d 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -29,7 +29,6 @@ import 
org.apache.druid.segment.incremental.ParseExceptionReport;
 import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 public interface Supervisor
 {
@@ -103,9 +102,4 @@ public interface Supervisor
   }
 
   int getActiveTaskGroupsCount();
-
-  /**
-   * @return active sequence prefixes for reading and pending completion task 
groups of a seekable stream supervisor
-   */
-  Set<String> getActiveRealtimeSequencePrefixes();
 }
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index e36412e5dc1..c5a36656c9a 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -79,6 +79,7 @@ import org.skife.jdbi.v2.ResultIterator;
 import org.skife.jdbi.v2.StatementContext;
 import org.skife.jdbi.v2.TransactionCallback;
 import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.Update;
 import org.skife.jdbi.v2.exceptions.CallbackFailedException;
 import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.skife.jdbi.v2.util.ByteArrayMapper;
@@ -530,9 +531,9 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                     segmentSchemaMapping,
                     upgradeSegmentMetadata,
                     Collections.emptyMap()
-                )
+                ),
+                upgradePendingSegmentsOverlappingWith(segmentsToInsert)
             );
-            upgradePendingSegmentsOverlappingWith(segmentsToInsert);
             return result;
           },
           3,
@@ -735,12 +736,12 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   }
 
   @Override
-  public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradePendingSegmentsOverlappingWith(
+  public List<PendingSegmentRecord> upgradePendingSegmentsOverlappingWith(
       Set<DataSegment> replaceSegments
   )
   {
     if (replaceSegments.isEmpty()) {
-      return Collections.emptyMap();
+      return Collections.emptyList();
     }
 
     // Any replace interval has exactly one version of segments
@@ -769,16 +770,15 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
    * those versions.</li>
    * </ul>
    *
-   * @return Map from original pending segment to the new upgraded ID.
+   * @return Inserted pending segment records
    */
-  private Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradePendingSegments(
+  private List<PendingSegmentRecord> upgradePendingSegments(
       Handle handle,
       String datasource,
       Map<Interval, DataSegment> replaceIntervalToMaxId
   ) throws JsonProcessingException
   {
     final List<PendingSegmentRecord> upgradedPendingSegments = new 
ArrayList<>();
-    final Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
pendingSegmentToNewId = new HashMap<>();
 
     for (Map.Entry<Interval, DataSegment> entry : 
replaceIntervalToMaxId.entrySet()) {
       final Interval replaceInterval = entry.getKey();
@@ -813,7 +813,6 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                   overlappingPendingSegment.getTaskAllocatorId()
               )
           );
-          pendingSegmentToNewId.put(pendingSegmentId, newId);
         }
       }
     }
@@ -831,7 +830,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
         numInsertedPendingSegments, upgradedPendingSegments.size()
     );
 
-    return pendingSegmentToNewId;
+    return upgradedPendingSegments;
   }
 
   private boolean shouldUpgradePendingSegment(
@@ -1114,8 +1113,15 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     );
 
     // always insert empty previous sequence id
-    insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, 
interval, "", sequenceName, sequenceNamePrevIdSha1,
-                                      taskAllocatorId
+    insertPendingSegmentIntoMetastore(
+        handle,
+        newIdentifier,
+        dataSource,
+        interval,
+        "",
+        sequenceName,
+        sequenceNamePrevIdSha1,
+        taskAllocatorId
     );
 
     log.info(
@@ -1320,6 +1326,39 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     }
   }
 
+  private static void bindColumnValuesToQueryWithInCondition(
+      final String columnName,
+      final List<String> values,
+      final Update query
+  )
+  {
+    if (values == null) {
+      return;
+    }
+
+    for (int i = 0; i < values.size(); i++) {
+      query.bind(StringUtils.format("%s%d", columnName, i), values.get(i));
+    }
+  }
+
+  private int deletePendingSegmentsById(Handle handle, String datasource, 
List<String> pendingSegmentIds)
+  {
+    if (pendingSegmentIds.isEmpty()) {
+      return 0;
+    }
+
+    Update query = handle.createStatement(
+        StringUtils.format(
+            "DELETE FROM %s WHERE dataSource = :dataSource %s",
+            dbTables.getPendingSegmentsTable(),
+            
SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id", 
pendingSegmentIds)
+        )
+    ).bind("dataSource", datasource);
+    bindColumnValuesToQueryWithInCondition("id", pendingSegmentIds, query);
+
+    return query.execute();
+  }
+
   private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction(
       Set<DataSegment> appendSegments,
       Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
@@ -1383,7 +1422,6 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
               if (metadataUpdateResult.isFailed()) {
                 transactionStatus.setRollbackOnly();
                 metadataNotUpdated.set(true);
-
                 if (metadataUpdateResult.canRetry()) {
                   throw new 
RetryTransactionException(metadataUpdateResult.getErrorMsg());
                 } else {
@@ -1393,6 +1431,20 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
             }
 
             insertIntoUpgradeSegmentsTable(handle, appendSegmentToReplaceLock);
+
+            // Delete the pending segments to be committed in this transaction 
in batches of at most 100
+            final List<List<String>> pendingSegmentIdBatches = Lists.partition(
+                allSegmentsToInsert.stream()
+                                   .map(pendingSegment -> 
pendingSegment.getId().toString())
+                                   .collect(Collectors.toList()),
+                100
+            );
+            int numDeletedPendingSegments = 0;
+            for (List<String> pendingSegmentIdBatch : pendingSegmentIdBatches) 
{
+              numDeletedPendingSegments += deletePendingSegmentsById(handle, 
dataSource, pendingSegmentIdBatch);
+            }
+            log.info("Deleted [%d] entries from pending segments table upon 
commit.", numDeletedPendingSegments);
+
             return SegmentPublishResult.ok(
                 insertSegments(
                     handle,
@@ -2761,7 +2813,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   }
 
   @Override
-  public int deletePendingSegmentsForTaskGroup(final String 
pendingSegmentsGroup)
+  public int deletePendingSegmentsForTaskAllocatorId(final String 
pendingSegmentsGroup)
   {
     return connector.getDBI().inTransaction(
         (handle, status) -> handle
diff --git 
a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java 
b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
index 44c62bf47ad..bfbaad18ef1 100644
--- a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
+++ b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.metadata;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
@@ -50,12 +52,13 @@ public class PendingSegmentRecord
   private final String upgradedFromSegmentId;
   private final String taskAllocatorId;
 
+  @JsonCreator
   public PendingSegmentRecord(
-      SegmentIdWithShardSpec id,
-      String sequenceName,
-      String sequencePrevId,
-      @Nullable String upgradedFromSegmentId,
-      @Nullable String taskAllocatorId
+      @JsonProperty("id") SegmentIdWithShardSpec id,
+      @JsonProperty("sequenceName") String sequenceName,
+      @JsonProperty("sequencePrevId") String sequencePrevId,
+      @JsonProperty("upgradedFromSegmentId") @Nullable String 
upgradedFromSegmentId,
+      @JsonProperty("taskAllocatorId") @Nullable String taskAllocatorId
   )
   {
     this.id = id;
@@ -65,16 +68,19 @@ public class PendingSegmentRecord
     this.taskAllocatorId = taskAllocatorId;
   }
 
+  @JsonProperty
   public SegmentIdWithShardSpec getId()
   {
     return id;
   }
 
+  @JsonProperty
   public String getSequenceName()
   {
     return sequenceName;
   }
 
+  @JsonProperty
   public String getSequencePrevId()
   {
     return sequencePrevId;
@@ -85,6 +91,7 @@ public class PendingSegmentRecord
    * Can be null for pending segments allocated before this column was added 
or for segments that have not been upgraded.
    */
   @Nullable
+  @JsonProperty
   public String getUpgradedFromSegmentId()
   {
     return upgradedFromSegmentId;
@@ -95,6 +102,7 @@ public class PendingSegmentRecord
    * Can be null for pending segments allocated before this column was added.
    */
   @Nullable
+  @JsonProperty
   public String getTaskAllocatorId()
   {
     return taskAllocatorId;
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index fc990e107dd..f14cc995050 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -955,7 +955,7 @@ public class SqlSegmentsMetadataQuery
    *
    * @implNote JDBI 3.x has better support for binding {@code IN} clauses 
directly.
    */
-  private static String getParameterizedInConditionForColumn(final String 
columnName, final List<String> values)
+  static String getParameterizedInConditionForColumn(final String columnName, 
final List<String> values)
   {
     if (values == null) {
       return "";
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index d022580f7c1..65df4f56761 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -563,7 +563,8 @@ public abstract class BaseAppenderatorDriver implements 
Closeable
           return new SegmentsAndCommitMetadata(
               segmentsAndCommitMetadata.getSegments(),
               metadata == null ? null : ((AppenderatorDriverMetadata) 
metadata).getCallerMetadata(),
-              segmentsAndCommitMetadata.getSegmentSchemaMapping()
+              segmentsAndCommitMetadata.getSegmentSchemaMapping(),
+              segmentsAndCommitMetadata.getUpgradedSegments()
           );
         },
         MoreExecutors.directExecutor()
@@ -618,9 +619,10 @@ public abstract class BaseAppenderatorDriver implements 
Closeable
     return executor.submit(
       () -> {
         try {
-          RetryUtils.retry(
+          return RetryUtils.retry(
               () -> {
               try {
+                final Set<DataSegment> upgradedSegments = new HashSet<>();
                 final ImmutableSet<DataSegment> ourSegments = 
ImmutableSet.copyOf(pushedAndTombstones);
                 final SegmentPublishResult publishResult = 
publisher.publishSegments(
                     segmentsToBeOverwritten,
@@ -629,7 +631,6 @@ public abstract class BaseAppenderatorDriver implements 
Closeable
                     callerMetadata,
                     segmentsAndCommitMetadata.getSegmentSchemaMapping()
                 );
-
                 if (publishResult.isSuccess()) {
                   log.info(
                       "Published [%s] segments with commit metadata [%s]",
@@ -637,6 +638,13 @@ public abstract class BaseAppenderatorDriver implements 
Closeable
                       callerMetadata
                   );
                   log.infoSegments(segmentsAndCommitMetadata.getSegments(), 
"Published segments");
+                  // This set must contain only those segments that were 
upgraded as a result of a concurrent replace.
+                  upgradedSegments.addAll(publishResult.getSegments());
+                  
segmentsAndCommitMetadata.getSegments().forEach(upgradedSegments::remove);
+                  if (!upgradedSegments.isEmpty()) {
+                    log.info("Published [%d] upgraded segments.", 
upgradedSegments.size());
+                    log.infoSegments(upgradedSegments, "Upgraded segments");
+                  }
                   log.info("Published segment schemas: [%s]", 
segmentsAndCommitMetadata.getSegmentSchemaMapping());
                 } else {
                   // Publishing didn't affirmatively succeed. However, 
segments with our identifiers may still be active
@@ -691,6 +699,7 @@ public abstract class BaseAppenderatorDriver implements 
Closeable
                     throw new ISE("Failed to publish segments");
                   }
                 }
+                return 
segmentsAndCommitMetadata.withUpgradedSegments(upgradedSegments);
               }
               catch (Exception e) {
                 // Must not remove segments here, we aren't sure if our 
transaction succeeded or not.
@@ -703,9 +712,10 @@ public abstract class BaseAppenderatorDriver implements 
Closeable
                 Throwables.propagateIfPossible(e);
                 throw new RuntimeException(e);
               }
-              return segmentsAndCommitMetadata;
             },
-              e -> (e.getMessage() != null && e.getMessage().contains("Failed 
to update the metadata Store. The new start metadata is ahead of last commited 
end state.")),
+              e -> (e != null && e.getMessage() != null
+                    && e.getMessage().contains("Failed to update the metadata 
Store."
+                                               + " The new start metadata is 
ahead of last commited end state.")),
               RetryUtils.DEFAULT_MAX_TRIES
           );
         }
@@ -717,7 +727,6 @@ public abstract class BaseAppenderatorDriver implements 
Closeable
           Throwables.propagateIfPossible(e);
           throw new RuntimeException(e);
         }
-        return segmentsAndCommitMetadata;
       }
     );
   }
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java
index 4f0a53398e4..72187688057 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java
@@ -20,6 +20,7 @@
 package org.apache.druid.segment.realtime.appenderator;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import org.apache.druid.segment.SegmentSchemaMapping;
 import org.apache.druid.segment.SegmentUtils;
 import org.apache.druid.timeline.DataSegment;
@@ -28,26 +29,59 @@ import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 
 public class SegmentsAndCommitMetadata
 {
-  private static final SegmentsAndCommitMetadata NIL = new 
SegmentsAndCommitMetadata(Collections.emptyList(), null, null);
+  private static final SegmentsAndCommitMetadata NIL
+      = new SegmentsAndCommitMetadata(Collections.emptyList(), null, null, 
null);
 
   private final Object commitMetadata;
   private final ImmutableList<DataSegment> segments;
   private final SegmentSchemaMapping segmentSchemaMapping;
 
+  private final ImmutableSet<DataSegment> upgradedSegments;
+
+  public SegmentsAndCommitMetadata(
+      List<DataSegment> segments,
+      Object commitMetadata
+  )
+  {
+    this(segments, commitMetadata, null, null);
+  }
+
+  public SegmentsAndCommitMetadata(
+      List<DataSegment> segments,
+      Object commitMetadata,
+      SegmentSchemaMapping segmentSchemaMapping
+  )
+  {
+    this(segments, commitMetadata, segmentSchemaMapping, null);
+  }
+
   public SegmentsAndCommitMetadata(
       List<DataSegment> segments,
       @Nullable Object commitMetadata,
-      @Nullable SegmentSchemaMapping segmentSchemaMapping
+      @Nullable SegmentSchemaMapping segmentSchemaMapping,
+      @Nullable Set<DataSegment> upgradedSegments
   )
   {
     this.segments = ImmutableList.copyOf(segments);
     this.commitMetadata = commitMetadata;
+    this.upgradedSegments = upgradedSegments == null ? null : 
ImmutableSet.copyOf(upgradedSegments);
     this.segmentSchemaMapping = segmentSchemaMapping;
   }
 
+  public SegmentsAndCommitMetadata withUpgradedSegments(Set<DataSegment> 
upgradedSegments)
+  {
+    return new SegmentsAndCommitMetadata(
+        this.segments,
+        this.commitMetadata,
+        this.segmentSchemaMapping,
+        upgradedSegments
+    );
+  }
+
   @Nullable
   public Object getCommitMetadata()
   {
@@ -59,6 +93,15 @@ public class SegmentsAndCommitMetadata
     return segments;
   }
 
+  /**
+   * @return the set of extra upgraded segments committed due to a concurrent 
replace.
+   */
+  @Nullable
+  public Set<DataSegment> getUpgradedSegments()
+  {
+    return upgradedSegments;
+  }
+
   public SegmentSchemaMapping getSegmentSchemaMapping()
   {
     return segmentSchemaMapping;
@@ -75,13 +118,15 @@ public class SegmentsAndCommitMetadata
     }
     SegmentsAndCommitMetadata that = (SegmentsAndCommitMetadata) o;
     return Objects.equals(commitMetadata, that.commitMetadata) &&
+           Objects.equals(upgradedSegments, that.upgradedSegments) &&
+           Objects.equals(segmentSchemaMapping, that.segmentSchemaMapping) &&
            Objects.equals(segments, that.segments);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(commitMetadata, segments);
+    return Objects.hash(commitMetadata, segments, upgradedSegments, 
segmentSchemaMapping);
   }
 
   @Override
@@ -90,6 +135,8 @@ public class SegmentsAndCommitMetadata
     return getClass().getSimpleName() + "{" +
            "commitMetadata=" + commitMetadata +
            ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
+           ", upgradedSegments=" + 
SegmentUtils.commaSeparatedIdentifiers(upgradedSegments) +
+           ", segmentSchemaMapping=" + segmentSchemaMapping +
            '}';
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index d02e200cfcb..aba071de1df 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -356,13 +356,13 @@ public class SinkQuerySegmentWalker implements 
QuerySegmentWalker
     }
   }
 
-  public void registerNewVersionOfPendingSegment(
+  public void registerUpgradedPendingSegment(
       SegmentIdWithShardSpec basePendingSegment,
-      SegmentIdWithShardSpec newSegmentVersion
+      SegmentIdWithShardSpec upgradedPendingSegment
   )
   {
     newIdToBasePendingSegment.put(
-        newSegmentVersion.asSegmentId().toDescriptor(),
+        upgradedPendingSegment.asSegmentId().toDescriptor(),
         basePendingSegment.asSegmentId().toDescriptor()
     );
   }
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
index 35ff42d3dab..1c5dd42dd77 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
@@ -51,6 +51,7 @@ import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.PendingSegmentRecord;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QuerySegmentWalker;
@@ -147,6 +148,7 @@ public class StreamAppenderator implements Appenderator
    * of any thread from {@link #drop}.
    */
   private final ConcurrentMap<SegmentIdWithShardSpec, Sink> sinks = new 
ConcurrentHashMap<>();
+  private final ConcurrentMap<String, SegmentIdWithShardSpec> 
idToPendingSegment = new ConcurrentHashMap<>();
   private final Set<SegmentIdWithShardSpec> droppingSinks = 
Sets.newConcurrentHashSet();
   private final VersionedIntervalTimeline<String, Sink> sinkTimeline;
   private final long maxBytesTuningConfig;
@@ -166,8 +168,25 @@ public class StreamAppenderator implements Appenderator
 
   private final AtomicBoolean closed = new AtomicBoolean(false);
 
-  private final ConcurrentHashMap<SegmentId, Set<SegmentIdWithShardSpec>>
-      baseSegmentToUpgradedVersions = new ConcurrentHashMap<>();
+  /**
+   * Map from base segment identifier of a sink to the set of all the segment 
ids associated with it.
+   * The set contains the base segment itself and its upgraded versions 
announced as a result of a concurrent replace.
+   * The map contains all the available sinks' identifiers in its keyset.
+   */
+  private final ConcurrentMap<SegmentIdWithShardSpec, 
Set<SegmentIdWithShardSpec>> baseSegmentToUpgradedSegments
+      = new ConcurrentHashMap<>();
+  /**
+   * Map from the id of an upgraded pending segment to the segment 
corresponding to its upgradedFromSegmentId.
+   */
+  private final ConcurrentMap<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradedSegmentToBaseSegment
+      = new ConcurrentHashMap<>();
+  /**
+   * Set of all segment identifiers that have been marked to be abandoned.
+   * This is used to determine if all the segments corresponding to a sink 
have been abandoned and it can be dropped.
+   */
+  private final ConcurrentHashMap.KeySetView<SegmentIdWithShardSpec, Boolean> 
abandonedSegments
+      = ConcurrentHashMap.newKeySet();
+
 
   private final SinkSchemaAnnouncer sinkSchemaAnnouncer;
   private final CentralizedDatasourceSchemaConfig 
centralizedDatasourceSchemaConfig;
@@ -527,9 +546,7 @@ public class StreamAppenderator implements Appenderator
            .emit();
       }
 
-      sinks.put(identifier, retVal);
-      metrics.setSinkCount(sinks.size());
-      sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), 
identifier.getShardSpec().createChunk(retVal));
+      addSink(identifier, retVal);
     }
 
     return retVal;
@@ -1058,14 +1075,7 @@ public class StreamAppenderator implements Appenderator
 
     log.debug("Shutting down immediately...");
     for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
-      try {
-        unannounceAllVersionsOfSegment(entry.getValue().getSegment());
-      }
-      catch (Exception e) {
-        log.makeAlert(e, "Failed to unannounce segment[%s]", 
schema.getDataSource())
-           .addData("identifier", entry.getKey().toString())
-           .emit();
-      }
+      unannounceAllVersionsOfSegment(entry.getValue().getSegment(), 
entry.getValue());
     }
     try {
       shutdownExecutors();
@@ -1098,61 +1108,78 @@ public class StreamAppenderator implements Appenderator
   /**
    * Unannounces the given base segment and all its upgraded versions.
    */
-  private void unannounceAllVersionsOfSegment(DataSegment baseSegment) throws 
IOException
+  private void unannounceAllVersionsOfSegment(DataSegment baseSegment, Sink 
sink)
   {
-    segmentAnnouncer.unannounceSegment(baseSegment);
+    synchronized (sink) {
+      final SegmentIdWithShardSpec baseId = 
SegmentIdWithShardSpec.fromDataSegment(baseSegment);
+      if (!baseSegmentToUpgradedSegments.containsKey(baseId)) {
+        return;
+      }
 
-    final Set<SegmentIdWithShardSpec> upgradedVersionsOfSegment
-        = baseSegmentToUpgradedVersions.remove(baseSegment.getId());
-    if (upgradedVersionsOfSegment == null || 
upgradedVersionsOfSegment.isEmpty()) {
-      return;
+      final Set<SegmentIdWithShardSpec> upgradedVersionsOfSegment = 
baseSegmentToUpgradedSegments.remove(baseId);
+      for (SegmentIdWithShardSpec newId : upgradedVersionsOfSegment) {
+        final DataSegment newSegment = new DataSegment(
+            newId.getDataSource(),
+            newId.getInterval(),
+            newId.getVersion(),
+            baseSegment.getLoadSpec(),
+            baseSegment.getDimensions(),
+            baseSegment.getMetrics(),
+            newId.getShardSpec(),
+            baseSegment.getBinaryVersion(),
+            baseSegment.getSize()
+        );
+        unannounceSegment(newSegment);
+        upgradedSegmentToBaseSegment.remove(newId);
+      }
     }
+  }
 
-    for (SegmentIdWithShardSpec newId : upgradedVersionsOfSegment) {
-      final DataSegment newSegment = new DataSegment(
-          newId.getDataSource(),
-          newId.getInterval(),
-          newId.getVersion(),
-          baseSegment.getLoadSpec(),
-          baseSegment.getDimensions(),
-          baseSegment.getMetrics(),
-          newId.getShardSpec(),
-          baseSegment.getBinaryVersion(),
-          baseSegment.getSize()
-      );
-      segmentAnnouncer.unannounceSegment(newSegment);
+  private void unannounceSegment(DataSegment segment)
+  {
+    try {
+      segmentAnnouncer.unannounceSegment(segment);
+    }
+    catch (Exception e) {
+      log.makeAlert(e, "Failed to unannounce segment[%s]", 
schema.getDataSource())
+         .addData("identifier", segment.getId().toString())
+         .emit();
     }
   }
 
-  public void registerNewVersionOfPendingSegment(
-      SegmentIdWithShardSpec basePendingSegment,
-      SegmentIdWithShardSpec newSegmentVersion
-  ) throws IOException
+  public void registerUpgradedPendingSegment(PendingSegmentRecord 
pendingSegmentRecord) throws IOException
   {
+    SegmentIdWithShardSpec basePendingSegment = 
idToPendingSegment.get(pendingSegmentRecord.getUpgradedFromSegmentId());
+    SegmentIdWithShardSpec upgradedPendingSegment = 
pendingSegmentRecord.getId();
     if (!sinks.containsKey(basePendingSegment) || 
droppingSinks.contains(basePendingSegment)) {
       return;
     }
 
     // Update query mapping with SinkQuerySegmentWalker
-    ((SinkQuerySegmentWalker) 
texasRanger).registerNewVersionOfPendingSegment(basePendingSegment, 
newSegmentVersion);
+    ((SinkQuerySegmentWalker) 
texasRanger).registerUpgradedPendingSegment(basePendingSegment, 
upgradedPendingSegment);
 
     // Announce segments
     final DataSegment baseSegment = sinks.get(basePendingSegment).getSegment();
+    final DataSegment newSegment = getUpgradedSegment(baseSegment, 
upgradedPendingSegment);
 
-    final DataSegment newSegment = new DataSegment(
-        newSegmentVersion.getDataSource(),
-        newSegmentVersion.getInterval(),
-        newSegmentVersion.getVersion(),
+    segmentAnnouncer.announceSegment(newSegment);
+    
baseSegmentToUpgradedSegments.get(basePendingSegment).add(upgradedPendingSegment);
+    upgradedSegmentToBaseSegment.put(upgradedPendingSegment, 
basePendingSegment);
+  }
+
+  private DataSegment getUpgradedSegment(DataSegment baseSegment, 
SegmentIdWithShardSpec upgradedVersion)
+  {
+    return new DataSegment(
+        upgradedVersion.getDataSource(),
+        upgradedVersion.getInterval(),
+        upgradedVersion.getVersion(),
         baseSegment.getLoadSpec(),
         baseSegment.getDimensions(),
         baseSegment.getMetrics(),
-        newSegmentVersion.getShardSpec(),
+        upgradedVersion.getShardSpec(),
         baseSegment.getBinaryVersion(),
         baseSegment.getSize()
     );
-    segmentAnnouncer.announceSegment(newSegment);
-    
baseSegmentToUpgradedVersions.computeIfAbsent(basePendingSegment.asSegmentId(), 
id -> new HashSet<>())
-                                 .add(newSegmentVersion);
   }
 
   private void lockBasePersistDirectory()
@@ -1367,13 +1394,8 @@ public class StreamAppenderator implements Appenderator
             hydrants
         );
         rowsSoFar += currSink.getNumRows();
-        sinks.put(identifier, currSink);
-        sinkTimeline.add(
-            currSink.getInterval(),
-            currSink.getVersion(),
-            identifier.getShardSpec().createChunk(currSink)
-        );
 
+        addSink(identifier, currSink);
         segmentAnnouncer.announceSegment(currSink.getSegment());
       }
       catch (IOException e) {
@@ -1396,12 +1418,49 @@ public class StreamAppenderator implements Appenderator
     return committed.getMetadata();
   }
 
+  /**
+   * Update the state of the appenderator when adding a sink.
+   *
+   * @param identifier sink identifier
+   * @param sink sink to be added
+   */
+  private void addSink(SegmentIdWithShardSpec identifier, Sink sink)
+  {
+    sinks.put(identifier, sink);
+    // Asoociate the base segment of a sink with its string identifier
+    // Needed to get the base segment using upgradedFromSegmentId of a pending 
segment
+    idToPendingSegment.put(identifier.asSegmentId().toString(), identifier);
+
+    // The base segment is associated with itself in the maps to maintain all 
the upgraded ids of a sink.
+    baseSegmentToUpgradedSegments.put(identifier, new HashSet<>());
+    baseSegmentToUpgradedSegments.get(identifier).add(identifier);
+
+    sinkTimeline.add(
+        sink.getInterval(),
+        sink.getVersion(),
+        identifier.getShardSpec().createChunk(sink)
+    );
+  }
+
   private ListenableFuture<?> abandonSegment(
       final SegmentIdWithShardSpec identifier,
       final Sink sink,
       final boolean removeOnDiskData
   )
   {
+    abandonedSegments.add(identifier);
+    final SegmentIdWithShardSpec baseIdentifier = 
upgradedSegmentToBaseSegment.getOrDefault(identifier, identifier);
+    synchronized (sink) {
+      if (baseSegmentToUpgradedSegments.containsKey(baseIdentifier)) {
+        Set<SegmentIdWithShardSpec> relevantSegments = new 
HashSet<>(baseSegmentToUpgradedSegments.get(baseIdentifier));
+        relevantSegments.removeAll(abandonedSegments);
+        // If there are unabandoned segments associated with the sink, return 
early
+        // This may be the case if segments have been upgraded as the result 
of a concurrent replace
+        if (!relevantSegments.isEmpty()) {
+          return Futures.immediateFuture(null);
+        }
+      }
+    }
     // Ensure no future writes will be made to this sink.
     if (sink.finishWriting()) {
       // Decrement this sink's rows from the counters. we only count active 
sinks so that we don't double decrement,
@@ -1419,7 +1478,7 @@ public class StreamAppenderator implements Appenderator
     }
 
     // Mark this identifier as dropping, so no future push tasks will pick it 
up.
-    droppingSinks.add(identifier);
+    droppingSinks.add(baseIdentifier);
 
     // Wait for any outstanding pushes to finish, then abandon the segment 
inside the persist thread.
     return Futures.transform(
@@ -1430,8 +1489,8 @@ public class StreamAppenderator implements Appenderator
           @Override
           public Void apply(@Nullable Object input)
           {
-            if (!sinks.remove(identifier, sink)) {
-              log.error("Sink for segment[%s] no longer valid, not 
abandoning.", identifier);
+            if (!sinks.remove(baseIdentifier, sink)) {
+              log.error("Sink for segment[%s] no longer valid, not 
abandoning.", baseIdentifier);
               return null;
             }
 
@@ -1439,17 +1498,17 @@ public class StreamAppenderator implements Appenderator
 
             if (removeOnDiskData) {
               // Remove this segment from the committed list. This must be 
done from the persist thread.
-              log.debug("Removing commit metadata for segment[%s].", 
identifier);
+              log.debug("Removing commit metadata for segment[%s].", 
baseIdentifier);
               try {
                 commitLock.lock();
                 final Committed oldCommit = readCommit();
                 if (oldCommit != null) {
-                  writeCommit(oldCommit.without(identifier.toString()));
+                  writeCommit(oldCommit.without(baseIdentifier.toString()));
                 }
               }
               catch (Exception e) {
                 log.makeAlert(e, "Failed to update committed segments[%s]", 
schema.getDataSource())
-                   .addData("identifier", identifier.toString())
+                   .addData("identifier", baseIdentifier.toString())
                    .emit();
                 throw new RuntimeException(e);
               }
@@ -1458,22 +1517,14 @@ public class StreamAppenderator implements Appenderator
               }
             }
 
-            // Unannounce the segment.
-            try {
-              unannounceAllVersionsOfSegment(sink.getSegment());
-            }
-            catch (Exception e) {
-              log.makeAlert(e, "Failed to unannounce segment[%s]", 
schema.getDataSource())
-                 .addData("identifier", identifier.toString())
-                 .emit();
-            }
+            unannounceAllVersionsOfSegment(sink.getSegment(), sink);
 
             Runnable removeRunnable = () -> {
-              droppingSinks.remove(identifier);
+              droppingSinks.remove(baseIdentifier);
               sinkTimeline.remove(
                   sink.getInterval(),
                   sink.getVersion(),
-                  identifier.getShardSpec().createChunk(sink)
+                  baseIdentifier.getShardSpec().createChunk(sink)
               );
               for (FireHydrant hydrant : sink) {
                 if (cache != null) {
@@ -1483,7 +1534,7 @@ public class StreamAppenderator implements Appenderator
               }
 
               if (removeOnDiskData) {
-                removeDirectory(computePersistDir(identifier));
+                removeDirectory(computePersistDir(baseIdentifier));
               }
 
               log.info("Dropped segment[%s].", identifier);
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
index 164b81b0c49..2b5c153d602 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
@@ -51,10 +51,12 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -322,10 +324,14 @@ public class StreamAppenderatorDriver extends 
BaseAppenderatorDriver
       return Futures.immediateFuture(null);
 
     } else {
-      final List<SegmentIdWithShardSpec> waitingSegmentIdList = 
segmentsAndCommitMetadata.getSegments().stream()
-                                                                               
          .map(
-                                                                               
        SegmentIdWithShardSpec::fromDataSegment)
-                                                                               
          .collect(Collectors.toList());
+      final Set<DataSegment> segmentsToBeHandedOff = new 
HashSet<>(segmentsAndCommitMetadata.getSegments());
+      if (segmentsAndCommitMetadata.getUpgradedSegments() != null) {
+        
segmentsToBeHandedOff.addAll(segmentsAndCommitMetadata.getUpgradedSegments());
+      }
+      final List<SegmentIdWithShardSpec> waitingSegmentIdList =
+          segmentsToBeHandedOff.stream()
+                               .map(SegmentIdWithShardSpec::fromDataSegment)
+                               .collect(Collectors.toList());
       final Object metadata = 
Preconditions.checkNotNull(segmentsAndCommitMetadata.getCommitMetadata(), 
"commitMetadata");
 
       if (waitingSegmentIdList.isEmpty()) {
@@ -333,7 +339,8 @@ public class StreamAppenderatorDriver extends 
BaseAppenderatorDriver
             new SegmentsAndCommitMetadata(
                 segmentsAndCommitMetadata.getSegments(),
                 ((AppenderatorDriverMetadata) metadata).getCallerMetadata(),
-                segmentsAndCommitMetadata.getSegmentSchemaMapping()
+                segmentsAndCommitMetadata.getSegmentSchemaMapping(),
+                segmentsAndCommitMetadata.getUpgradedSegments()
             )
         );
       }
@@ -365,8 +372,7 @@ public class StreamAppenderatorDriver extends 
BaseAppenderatorDriver
                     public void onSuccess(Object result)
                     {
                       if (numRemainingHandoffSegments.decrementAndGet() == 0) {
-                        List<DataSegment> segments = 
segmentsAndCommitMetadata.getSegments();
-                        log.info("Successfully handed off [%d] segments.", 
segments.size());
+                        log.info("Successfully handed off [%d] segments.", 
segmentsToBeHandedOff.size());
                         final long handoffTotalTime = 
System.currentTimeMillis() - handoffStartTime;
                         metrics.reportMaxSegmentHandoffTime(handoffTotalTime);
                         if (handoffTotalTime > HANDOFF_TIME_THRESHOLD) {
@@ -375,9 +381,10 @@ public class StreamAppenderatorDriver extends 
BaseAppenderatorDriver
                         }
                         resultFuture.set(
                             new SegmentsAndCommitMetadata(
-                                segments,
+                                segmentsAndCommitMetadata.getSegments(),
                                 ((AppenderatorDriverMetadata) 
metadata).getCallerMetadata(),
-                                
segmentsAndCommitMetadata.getSegmentSchemaMapping()
+                                
segmentsAndCommitMetadata.getSegmentSchemaMapping(),
+                                segmentsAndCommitMetadata.getUpgradedSegments()
                             )
                         );
                       }
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
index eb8f9358cef..5a21a4331fe 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
@@ -201,7 +201,7 @@ public class StreamAppenderatorDriverFailTest extends 
EasyMockSupport
     expectedException.expect(ExecutionException.class);
     expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
     expectedException.expectMessage(
-        "Fail test while dropping 
segment[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123]"
+        "Fail test while dropping segment"
     );
 
     driver = new StreamAppenderatorDriver(
@@ -221,10 +221,8 @@ public class StreamAppenderatorDriverFailTest extends 
EasyMockSupport
 
     Assert.assertNull(driver.startJob(null));
 
-    for (int i = 0; i < ROWS.size(); i++) {
-      committerSupplier.setMetadata(i + 1);
-      Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, 
false, true).isOk());
-    }
+    committerSupplier.setMetadata(1);
+    Assert.assertTrue(driver.add(ROWS.get(0), "dummy", committerSupplier, 
false, true).isOk());
 
     final SegmentsAndCommitMetadata published = driver.publish(
         StreamAppenderatorDriverTest.makeOkPublisher(),
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index 335d1b219fe..63775e2dc3b 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -43,6 +43,7 @@ import 
org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
 import org.apache.druid.segment.loading.DataSegmentKiller;
 import org.apache.druid.segment.realtime.FireDepartmentMetrics;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
@@ -58,6 +59,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -72,6 +74,7 @@ public class StreamAppenderatorDriverTest extends 
EasyMockSupport
 {
   private static final String DATA_SOURCE = "foo";
   private static final String VERSION = "abc123";
+  private static final String UPGRADED_VERSION = "xyz456";
   private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
   private static final int MAX_ROWS_IN_MEMORY = 100;
   private static final int MAX_ROWS_PER_SEGMENT = 3;
@@ -246,6 +249,44 @@ public class StreamAppenderatorDriverTest extends 
EasyMockSupport
     driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS);
   }
 
+  @Test
+  public void testHandoffUpgradedSegments()
+      throws IOException, InterruptedException, TimeoutException, 
ExecutionException
+  {
+    final TestCommitterSupplier<Integer> committerSupplier = new 
TestCommitterSupplier<>();
+
+    Assert.assertNull(driver.startJob(null));
+
+    for (int i = 0; i < ROWS.size(); i++) {
+      committerSupplier.setMetadata(i + 1);
+      Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, 
false, true).isOk());
+    }
+
+    driver.persist(committerSupplier.get());
+
+    // There is no remaining rows in the driver, and thus the result must be 
empty
+    final SegmentsAndCommitMetadata segmentsAndCommitMetadata = 
driver.publishAndRegisterHandoff(
+        makeUpgradingPublisher(),
+        committerSupplier.get(),
+        ImmutableList.of("dummy")
+    ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+    Assert.assertNotNull(segmentsAndCommitMetadata.getUpgradedSegments());
+    Assert.assertEquals(
+        segmentsAndCommitMetadata.getSegments().size(),
+        segmentsAndCommitMetadata.getUpgradedSegments().size()
+    );
+
+    Set<SegmentDescriptor> expectedHandedOffSegments = new HashSet<>();
+    for (DataSegment segment : segmentsAndCommitMetadata.getSegments()) {
+      expectedHandedOffSegments.add(segment.toDescriptor());
+    }
+    for (DataSegment segment : 
segmentsAndCommitMetadata.getUpgradedSegments()) {
+      expectedHandedOffSegments.add(segment.toDescriptor());
+    }
+    Assert.assertEquals(expectedHandedOffSegments, 
segmentHandoffNotifierFactory.getHandedOffSegmentDescriptors());
+  }
+
   @Test
   public void testPublishPerRow() throws IOException, InterruptedException, 
TimeoutException, ExecutionException
   {
@@ -379,6 +420,29 @@ public class StreamAppenderatorDriverTest extends 
EasyMockSupport
         SegmentPublishResult.ok(Collections.emptySet());
   }
 
+  private TransactionalSegmentPublisher makeUpgradingPublisher()
+  {
+    return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, 
segmentSchemaMapping) -> {
+      Set<DataSegment> allSegments = new HashSet<>(segmentsToPublish);
+      int id = 0;
+      for (DataSegment segment : segmentsToPublish) {
+        DataSegment upgradedSegment = new DataSegment(
+            SegmentId.of(DATA_SOURCE, Intervals.ETERNITY, UPGRADED_VERSION, 
id),
+            segment.getLoadSpec(),
+            segment.getDimensions(),
+            segment.getMetrics(),
+            new NumberedShardSpec(id, 0),
+            null,
+            segment.getBinaryVersion(),
+            segment.getSize()
+        );
+        id++;
+        allSegments.add(upgradedSegment);
+      }
+      return SegmentPublishResult.ok(allSegments);
+    };
+  }
+
   static TransactionalSegmentPublisher makeFailingPublisher(boolean 
failWithException)
   {
     return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, 
segmentSchemaMapping) -> {
@@ -459,6 +523,7 @@ public class StreamAppenderatorDriverTest extends 
EasyMockSupport
   {
     private boolean handoffEnabled = true;
     private long handoffDelay;
+    private final Set<SegmentDescriptor> handedOffSegmentDescriptors = new 
HashSet<>();
 
     public void disableHandoff()
     {
@@ -470,6 +535,13 @@ public class StreamAppenderatorDriverTest extends 
EasyMockSupport
       handoffDelay = delay;
     }
 
+    public Set<SegmentDescriptor> getHandedOffSegmentDescriptors()
+    {
+      synchronized (handedOffSegmentDescriptors) {
+        return ImmutableSet.copyOf(handedOffSegmentDescriptors);
+      }
+    }
+
     @Override
     public SegmentHandoffNotifier createSegmentHandoffNotifier(String 
dataSource)
     {
@@ -494,6 +566,9 @@ public class StreamAppenderatorDriverTest extends 
EasyMockSupport
             }
 
             exec.execute(handOffRunnable);
+            synchronized (handedOffSegmentDescriptors) {
+              handedOffSegmentDescriptors.add(descriptor);
+            }
           }
           return true;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to