This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d6c760f7ce7 Do not kill segments with referenced load specs from deep 
storage (#16667)
d6c760f7ce7 is described below

commit d6c760f7ce74b5864cd7d0750736f1609d171fdf
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Mon Jul 15 14:07:53 2024 +0530

    Do not kill segments with referenced load specs from deep storage (#16667)
    
    Do not kill segments with referenced load specs from deep storage
---
 .../ActionBasedPublishedSegmentRetriever.java      |   2 +-
 .../RetrieveUpgradedFromSegmentIdsAction.java      |  90 ++++++++
 .../RetrieveUpgradedToSegmentIdsAction.java        |  95 +++++++++
 .../druid/indexing/common/actions/TaskAction.java  |   2 +
 .../actions/UpgradedFromSegmentsResponse.java}     |  26 ++-
 .../actions/UpgradedToSegmentsResponse.java}       |  28 ++-
 .../common/task/KillUnusedSegmentsTask.java        | 150 ++++++++++++--
 .../indexing/common/task/IngestionTestBase.java    |  13 +-
 .../common/task/KillUnusedSegmentsTaskTest.java    | 227 ++++++++++++++++++++-
 .../druid/indexing/test/TestDataSegmentKiller.java |  13 +-
 .../TestIndexerMetadataStorageCoordinator.java     |  18 ++
 .../IndexerMetadataStorageCoordinator.java         |  17 ++
 .../IndexerSQLMetadataStorageCoordinator.java      | 118 ++++++++++-
 .../druid/metadata/PendingSegmentRecord.java       |   5 +-
 .../druid/metadata/SQLMetadataConnector.java       |  10 +
 .../druid/metadata/SqlSegmentsMetadataQuery.java   |  13 +-
 .../apache/druid/server/http/DataSegmentPlus.java  |  23 ++-
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 197 +++++++++++++++++-
 ...dexerSqlMetadataStorageCoordinatorTestBase.java |  49 +++++
 .../druid/server/http/DataSegmentPlusTest.java     |   3 +-
 .../druid/server/http/MetadataResourceTest.java    |   2 +-
 21 files changed, 1024 insertions(+), 77 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedPublishedSegmentRetriever.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedPublishedSegmentRetriever.java
index ba5cf923b12..bb349cc9790 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedPublishedSegmentRetriever.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedPublishedSegmentRetriever.java
@@ -79,7 +79,7 @@ public class ActionBasedPublishedSegmentRetriever implements 
PublishedSegmentRet
     catch (Exception e) {
       log.warn(
           e,
-          "Could not retrieve published segment IDs[%s] using task 
action[segmentListById]."
+          "Could not retrieve published segment IDs[%s] using task 
action[retrieveSegmentsById]."
           + " Overlord maybe on an older version, retrying with 
action[segmentListUsed]."
           + " This task may fail to publish segments if there is a concurrent 
replace happening.",
           serializedSegmentIds
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedFromSegmentIdsAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedFromSegmentIdsAction.java
new file mode 100644
index 00000000000..67f7ae6e131
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedFromSegmentIdsAction.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.indexing.common.task.Task;
+
+import java.util.Set;
+
+/**
+ * Task action to retrieve the segment IDs from which a given set of segments 
were upgraded.
+ */
+public class RetrieveUpgradedFromSegmentIdsAction implements 
TaskAction<UpgradedFromSegmentsResponse>
+{
+  private final String dataSource;
+  private final Set<String> segmentIds;
+
+  @JsonCreator
+  public RetrieveUpgradedFromSegmentIdsAction(
+      @JsonProperty("dataSource") String dataSource,
+      @JsonProperty("segmentIds") Set<String> segmentIds
+  )
+  {
+    this.dataSource = dataSource;
+    this.segmentIds = segmentIds;
+  }
+
+  @JsonProperty
+  public String getDataSource()
+  {
+    return dataSource;
+  }
+
+  @JsonProperty
+  public Set<String> getSegmentIds()
+  {
+    return segmentIds;
+  }
+
+  @Override
+  public TypeReference<UpgradedFromSegmentsResponse> getReturnTypeReference()
+  {
+    return new TypeReference<UpgradedFromSegmentsResponse>()
+    {
+    };
+  }
+
+  @Override
+  public UpgradedFromSegmentsResponse perform(Task task, TaskActionToolbox 
toolbox)
+  {
+    return new UpgradedFromSegmentsResponse(
+        toolbox.getIndexerMetadataStorageCoordinator()
+               .retrieveUpgradedFromSegmentIds(dataSource, segmentIds)
+    );
+  }
+
+  @Override
+  public boolean isAudited()
+  {
+    return false;
+  }
+
+  @Override
+  public String toString()
+  {
+    return getClass().getSimpleName() + "{" +
+           "dataSource='" + dataSource + '\'' +
+           ", segmentIds=" + segmentIds +
+           '}';
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedToSegmentIdsAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedToSegmentIdsAction.java
new file mode 100644
index 00000000000..412c9604d11
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedToSegmentIdsAction.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.indexing.common.task.Task;
+
+import java.util.Set;
+
+/**
+ * Task action to determine the set of all segments containing the same load 
spec given the parent id. <br/>
+ * Returns a map from a segment ID to a set containing:
+ * <ol>
+ * <li> all segment IDs that were upgraded from it AND are still present in 
the metadata store </li>
+ * <li> the segment ID itself if and only if it is still present in the 
metadata store </li>
+ * </ol>
+ */
+public class RetrieveUpgradedToSegmentIdsAction implements 
TaskAction<UpgradedToSegmentsResponse>
+{
+  private final String dataSource;
+  private final Set<String> segmentIds;
+
+  @JsonCreator
+  public RetrieveUpgradedToSegmentIdsAction(
+      @JsonProperty("dataSource") String dataSource,
+      @JsonProperty("segmentIds") Set<String> segmentIds
+  )
+  {
+    this.dataSource = dataSource;
+    this.segmentIds = segmentIds;
+  }
+
+  @JsonProperty
+  public String getDataSource()
+  {
+    return dataSource;
+  }
+
+  @JsonProperty
+  public Set<String> getSegmentIds()
+  {
+    return segmentIds;
+  }
+
+  @Override
+  public TypeReference<UpgradedToSegmentsResponse> getReturnTypeReference()
+  {
+    return new TypeReference<UpgradedToSegmentsResponse>()
+    {
+    };
+  }
+
+  @Override
+  public UpgradedToSegmentsResponse perform(Task task, TaskActionToolbox 
toolbox)
+  {
+    return new UpgradedToSegmentsResponse(
+        toolbox.getIndexerMetadataStorageCoordinator()
+               .retrieveUpgradedToSegmentIds(dataSource, segmentIds)
+    );
+  }
+
+  @Override
+  public boolean isAudited()
+  {
+    return false;
+  }
+
+  @Override
+  public String toString()
+  {
+    return getClass().getSimpleName() + "{" +
+           "dataSource='" + dataSource + '\'' +
+           ", segmentIds=" + segmentIds +
+           '}';
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
index 4606bd597a8..973a83ecee4 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
@@ -39,6 +39,8 @@ import java.util.concurrent.Future;
     @JsonSubTypes.Type(name = "segmentTransactionalAppend", value = 
SegmentTransactionalAppendAction.class),
     @JsonSubTypes.Type(name = "segmentTransactionalReplace", value = 
SegmentTransactionalReplaceAction.class),
     @JsonSubTypes.Type(name = "retrieveSegmentsById", value = 
RetrieveSegmentsByIdAction.class),
+    @JsonSubTypes.Type(name = "retrieveUpgradedFromSegmentIds", value = 
RetrieveUpgradedFromSegmentIdsAction.class),
+    @JsonSubTypes.Type(name = "retrieveUpgradedToSegmentIds", value = 
RetrieveUpgradedToSegmentIdsAction.class),
     @JsonSubTypes.Type(name = "segmentListUsed", value = 
RetrieveUsedSegmentsAction.class),
     @JsonSubTypes.Type(name = "segmentListUnused", value = 
RetrieveUnusedSegmentsAction.class),
     @JsonSubTypes.Type(name = "markSegmentsAsUnused", value = 
MarkSegmentsAsUnusedAction.class),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedFromSegmentsResponse.java
similarity index 58%
copy from 
indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java
copy to 
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedFromSegmentsResponse.java
index 33421eb1a5c..5f0f1775f16 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedFromSegmentsResponse.java
@@ -17,22 +17,28 @@
  * under the License.
  */
 
-package org.apache.druid.indexing.test;
+package org.apache.druid.indexing.common.actions;
 
-import org.apache.druid.segment.loading.DataSegmentKiller;
-import org.apache.druid.timeline.DataSegment;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class TestDataSegmentKiller implements DataSegmentKiller
+import java.util.Map;
+
+public class UpgradedFromSegmentsResponse
 {
-  @Override
-  public void kill(DataSegment segment)
+  private final Map<String, String> upgradedFromSegmentIds;
+
+  @JsonCreator
+  public UpgradedFromSegmentsResponse(
+      @JsonProperty("upgradedFromSegmentIds") Map<String, String> 
upgradedFromSegmentIds
+  )
   {
-    // do nothing
+    this.upgradedFromSegmentIds = upgradedFromSegmentIds;
   }
 
-  @Override
-  public void killAll()
+  @JsonProperty
+  public Map<String, String> getUpgradedFromSegmentIds()
   {
-    throw new UnsupportedOperationException("not implemented");
+    return upgradedFromSegmentIds;
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedToSegmentsResponse.java
similarity index 57%
copy from 
indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java
copy to 
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedToSegmentsResponse.java
index 33421eb1a5c..e9bf33a97ce 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedToSegmentsResponse.java
@@ -17,22 +17,30 @@
  * under the License.
  */
 
-package org.apache.druid.indexing.test;
+package org.apache.druid.indexing.common.actions;
 
-import org.apache.druid.segment.loading.DataSegmentKiller;
-import org.apache.druid.timeline.DataSegment;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class TestDataSegmentKiller implements DataSegmentKiller
+import java.util.Map;
+import java.util.Set;
+
+public class UpgradedToSegmentsResponse
 {
-  @Override
-  public void kill(DataSegment segment)
+
+  private final Map<String, Set<String>> upgradedToSegmentIds;
+
+  @JsonCreator
+  public UpgradedToSegmentsResponse(
+      @JsonProperty("upgradedToSegmentIds") Map<String, Set<String>> 
upgradedToSegmentIds
+  )
   {
-    // do nothing
+    this.upgradedToSegmentIds = upgradedToSegmentIds;
   }
 
-  @Override
-  public void killAll()
+  @JsonProperty
+  public Map<String, Set<String>> getUpgradedToSegmentIds()
   {
-    throw new UnsupportedOperationException("not implemented");
+    return upgradedToSegmentIds;
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
index fe49569a3bb..e1f6d2915ee 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
@@ -35,11 +35,14 @@ import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;
+import 
org.apache.druid.indexing.common.actions.RetrieveUpgradedFromSegmentIdsAction;
+import 
org.apache.druid.indexing.common.actions.RetrieveUpgradedToSegmentIdsAction;
 import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
 import org.apache.druid.indexing.common.actions.SegmentNukeAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskLocks;
 import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
+import org.apache.druid.indexing.common.actions.UpgradedToSegmentsResponse;
 import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -47,6 +50,8 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
@@ -54,6 +59,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -63,9 +69,23 @@ import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 /**
+ * <p/>
  * The client representation of this task is {@link 
ClientKillUnusedSegmentsTaskQuery}.
  * JSON serialization fields of this class must correspond to those of {@link
  * ClientKillUnusedSegmentsTaskQuery}, except for {@link #id} and {@link 
#context} fields.
+ * <p/>
+ * The Kill task fetches the set of used segments for the interval and 
computes the set of their load specs. <br/>
+ * Until `limit` segments have been processed in total or all segments for the 
interval have been nuked:
+ * <ol>
+ * <li> Fetch at most `batchSize` unused segments from the metadata store. 
</li>
+ * <li> Determine the mapping from these segments to their parents *before* 
nuking the segments. </li>
+ * <li> Nuke the batch of unused segments from the metadata store. </li>
+ * <li> Determine the mapping of the set of parents to all their children. 
</li>
+ * <li> Check if unused or parent segments exist. </li>
+ * <li> Find the unreferenced segments. </li>
+ * <li> Filter the set of unreferenced segments using load specs from the set 
of used segments. </li>
+ * <li> Kill the filtered set of segments from deep storage. </li>
+ * </ol>
  */
 public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
 {
@@ -76,7 +96,7 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
    * Default nuke batch size. This is a small enough size that we still get 
value from batching, while
    * yielding as quickly as possible. In one real cluster environment backed 
with mysql, ~2000rows/sec,
    * with batch size of 100, means a batch should only less than a second for 
the task lock, and depending
-   * on the segment store latency, unoptimised S3 cleanups typically take 5-10 
seconds per 100. Over time
+   * on the segment store latency, unoptimised S3 cleanups typically take 5-10 
seconds per 100. Over time,
    * we expect the S3 cleanup to get quicker, so this should be < 1 second, 
which means we'll be yielding
    * the task lockbox every 1-2 seconds.
    */
@@ -97,13 +117,15 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
   /**
    * Maximum number of segments that can be killed.
    */
-  @Nullable private final Integer limit;
+  @Nullable
+  private final Integer limit;
 
   /**
    * The maximum used status last updated time. Any segments with
    * {@code used_status_last_updated} no later than this time will be included 
in the kill task.
    */
-  @Nullable private final DateTime maxUsedStatusLastUpdatedTime;
+  @Nullable
+  private final DateTime maxUsedStatusLastUpdatedTime;
 
   @JsonCreator
   public KillUnusedSegmentsTask(
@@ -196,18 +218,17 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
         numTotalBatches != null ? StringUtils.format(" in [%d] batches.", 
numTotalBatches) : "."
     );
 
+    final TaskActionClient taskActionClient = toolbox.getTaskActionClient();
     RetrieveUsedSegmentsAction retrieveUsedSegmentsAction = new 
RetrieveUsedSegmentsAction(
             getDataSource(),
             ImmutableList.of(getInterval()),
             Segments.INCLUDING_OVERSHADOWED
     );
     // Fetch the load specs of all segments overlapping with the unused 
segment intervals
-    final Set<Map<String, Object>> usedSegmentLoadSpecs =
-            new 
HashSet<>(toolbox.getTaskActionClient().submit(retrieveUsedSegmentsAction)
-                    .stream()
-                    .map(DataSegment::getLoadSpec)
-                    .collect(Collectors.toSet())
-            );
+    final Set<Map<String, Object>> usedSegmentLoadSpecs = 
taskActionClient.submit(retrieveUsedSegmentsAction)
+                                                                          
.stream()
+                                                                          
.map(DataSegment::getLoadSpec)
+                                                                          
.collect(Collectors.toSet());
 
     do {
       if (nextBatchSize <= 0) {
@@ -231,20 +252,47 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
         );
       }
 
-      // Kill segments
-      // Order is important here: we want the nuke action to clean up the 
metadata records _before_ the
-      // segments are removed from storage, this helps maintain that we will 
always have a storage segment if
-      // the metadata segment is present. If the segment nuke throws an 
exception, then the segment cleanup is
-      // abandoned.
+      // Kill segments. Order is important here:
+      // Retrieve the segment upgrade infos for the batch _before_ the 
segments are nuked
+      // We then want the nuke action to clean up the metadata records 
_before_ the segments are removed from storage.
+      // This helps maintain that we will always have a storage segment if the 
metadata segment is present.
+      // Determine the subset of segments to be killed from deep storage based 
on loadspecs.
+      // If the segment nuke throws an exception, then the segment cleanup is 
abandoned.
+
+      // Determine upgraded segment ids before nuking
+      final Set<String> segmentIds = unusedSegments.stream()
+                                                   .map(DataSegment::getId)
+                                                   .map(SegmentId::toString)
+                                                   
.collect(Collectors.toSet());
+      final Map<String, String> upgradedFromSegmentIds = new HashMap<>();
+      try {
+        upgradedFromSegmentIds.putAll(
+            taskActionClient.submit(
+                new RetrieveUpgradedFromSegmentIdsAction(getDataSource(), 
segmentIds)
+            ).getUpgradedFromSegmentIds()
+        );
+      }
+      catch (Exception e) {
+        LOG.warn(
+            e,
+            "Could not retrieve parent segment ids using task 
action[retrieveUpgradedFromSegmentIds]."
+            + " Overlord may be on an older version."
+        );
+      }
 
-      toolbox.getTaskActionClient().submit(new SegmentNukeAction(new 
HashSet<>(unusedSegments)));
+      // Nuke Segments
+      taskActionClient.submit(new SegmentNukeAction(new 
HashSet<>(unusedSegments)));
 
-      // Kill segments from the deep storage only if their load specs are not 
being used by any used segments
-      final List<DataSegment> segmentsToBeKilled = unusedSegments
-          .stream()
-          .filter(unusedSegment -> unusedSegment.getLoadSpec() == null
-                                   || 
!usedSegmentLoadSpecs.contains(unusedSegment.getLoadSpec()))
-          .collect(Collectors.toList());
+      // Determine segments to be killed
+      final List<DataSegment> segmentsToBeKilled
+          = getKillableSegments(unusedSegments, upgradedFromSegmentIds, 
usedSegmentLoadSpecs, taskActionClient);
+
+      final Set<DataSegment> segmentsNotKilled = new HashSet<>(unusedSegments);
+      segmentsToBeKilled.forEach(segmentsNotKilled::remove);
+      LOG.infoSegments(
+          segmentsNotKilled,
+          "Skipping segment kill from deep storage as their load specs are 
referenced by other segments."
+      );
 
       toolbox.getDataSegmentKiller().kill(segmentsToBeKilled);
       numBatchesProcessed++;
@@ -253,7 +301,7 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
       LOG.info("Processed [%d] batches for kill task[%s].", 
numBatchesProcessed, getId());
 
       nextBatchSize = computeNextBatchSize(numSegmentsKilled);
-    } while (unusedSegments.size() != 0 && (null == numTotalBatches || 
numBatchesProcessed < numTotalBatches));
+    } while (!unusedSegments.isEmpty() && (null == numTotalBatches || 
numBatchesProcessed < numTotalBatches));
 
     final String taskId = getId();
     LOG.info(
@@ -300,6 +348,64 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
     return taskLockMap;
   }
 
+  /**
+   * Determines subset of segments without referenced load specs that can be 
safely killed by
+   * looking at the segment upgrades and used segment load specs
+   * @param unusedSegments input segments
+   * @param upgradedFromSegmentIds segment to parent mapping
+   * @param usedSegmentLoadSpecs load specs of used segments
+   * @param taskActionClient task action client
+   * @return list of segments to kill from deep storage
+   */
+  private List<DataSegment> getKillableSegments(
+      List<DataSegment> unusedSegments,
+      Map<String, String> upgradedFromSegmentIds,
+      Set<Map<String, Object>> usedSegmentLoadSpecs,
+      TaskActionClient taskActionClient
+  )
+  {
+
+    // Determine parentId for each unused segment
+    final Map<String, Set<DataSegment>> parentIdToUnusedSegments = new 
HashMap<>();
+    for (DataSegment segment : unusedSegments) {
+      final String segmentId = segment.getId().toString();
+      parentIdToUnusedSegments.computeIfAbsent(
+          upgradedFromSegmentIds.getOrDefault(segmentId, segmentId),
+          k -> new HashSet<>()
+      ).add(segment);
+    }
+
+    // Check if the parent or any of its children exist in metadata store
+    try {
+      UpgradedToSegmentsResponse response = taskActionClient.submit(
+          new RetrieveUpgradedToSegmentIdsAction(getDataSource(), 
parentIdToUnusedSegments.keySet())
+      );
+      if (response != null && response.getUpgradedToSegmentIds() != null) {
+        response.getUpgradedToSegmentIds().forEach((parent, children) -> {
+          if (!CollectionUtils.isNullOrEmpty(children)) {
+            // Do not kill segment if its parent or any of its siblings still 
exist in metadata store
+            parentIdToUnusedSegments.remove(parent);
+          }
+        });
+      }
+    }
+    catch (Exception e) {
+      LOG.warn(
+          e,
+          "Could not retrieve referenced ids using task 
action[retrieveUpgradedToSegmentIds]."
+          + " Overlord may be on an older version."
+      );
+    }
+
+    // Filter using the used segment load specs as segment upgrades predate 
the above task action
+    return parentIdToUnusedSegments.values()
+                                   .stream()
+                                   .flatMap(Set::stream)
+                                   .filter(segment -> 
!usedSegmentLoadSpecs.contains(segment.getLoadSpec()))
+                                   .collect(Collectors.toList());
+  }
+
+
   @Override
   public LookupLoadingSpec getLookupLoadingSpec()
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 133ced3907d..d6687efbf34 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -61,6 +61,7 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
+import org.apache.druid.indexing.test.TestDataSegmentKiller;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.RE;
@@ -81,7 +82,6 @@ import 
org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.join.NoopJoinableFactory;
 import org.apache.druid.segment.loading.LocalDataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
-import org.apache.druid.segment.loading.NoopDataSegmentKiller;
 import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
 import org.apache.druid.segment.metadata.SegmentSchemaCache;
@@ -130,6 +130,7 @@ public abstract class IngestionTestBase extends 
InitializedNullHandlingTest
   private SegmentSchemaManager segmentSchemaManager;
   private SegmentSchemaCache segmentSchemaCache;
   private SupervisorManager supervisorManager;
+  private TestDataSegmentKiller dataSegmentKiller;
   protected File reportsFile;
 
   @Before
@@ -169,6 +170,7 @@ public abstract class IngestionTestBase extends 
InitializedNullHandlingTest
     lockbox = new TaskLockbox(taskStorage, storageCoordinator);
     segmentCacheManagerFactory = new 
SegmentCacheManagerFactory(TestIndex.INDEX_IO, getObjectMapper());
     reportsFile = temporaryFolder.newFile();
+    dataSegmentKiller = new TestDataSegmentKiller();
   }
 
   @After
@@ -243,6 +245,11 @@ public abstract class IngestionTestBase extends 
InitializedNullHandlingTest
     return testUtils.getRowIngestionMetersFactory();
   }
 
+  public TestDataSegmentKiller getDataSegmentKiller()
+  {
+    return dataSegmentKiller;
+  }
+
   public TaskActionToolbox createTaskActionToolbox()
   {
     storageCoordinator.start();
@@ -265,7 +272,7 @@ public abstract class IngestionTestBase extends 
InitializedNullHandlingTest
         .taskExecutorNode(new DruidNode("druid/middlemanager", "localhost", 
false, 8091, null, true, false))
         .taskActionClient(createActionClient(task))
         .segmentPusher(new LocalDataSegmentPusher(new 
LocalDataSegmentPusherConfig()))
-        .dataSegmentKiller(new NoopDataSegmentKiller())
+        .dataSegmentKiller(dataSegmentKiller)
         .joinableFactory(NoopJoinableFactory.INSTANCE)
         .jsonMapper(objectMapper)
         .taskWorkDir(baseDir)
@@ -450,7 +457,7 @@ public abstract class IngestionTestBase extends 
InitializedNullHandlingTest
             .taskExecutorNode(new DruidNode("druid/middlemanager", 
"localhost", false, 8091, null, true, false))
             .taskActionClient(taskActionClient)
             .segmentPusher(new LocalDataSegmentPusher(new 
LocalDataSegmentPusherConfig()))
-            .dataSegmentKiller(new NoopDataSegmentKiller())
+            .dataSegmentKiller(dataSegmentKiller)
             .joinableFactory(NoopJoinableFactory.INSTANCE)
             .jsonMapper(objectMapper)
             .taskWorkDir(baseDir)
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
index f888ace5a54..fe2b5a51c86 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
@@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.metadata.IndexerSqlMetadataStorageCoordinatorTestBase;
 import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.timeline.DataSegment;
 import org.assertj.core.api.Assertions;
@@ -72,10 +73,10 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
     taskRunner = new TestTaskRunner();
 
     final String version = DateTimes.nowUtc().toString();
-    segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), version);
-    segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), version);
-    segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), version);
-    segment4 = newSegment(Intervals.of("2019-04-01/2019-05-01"), version);
+    segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), 
version).withLoadSpec(ImmutableMap.of("k", 1));
+    segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), 
version).withLoadSpec(ImmutableMap.of("k", 2));
+    segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), 
version).withLoadSpec(ImmutableMap.of("k", 3));
+    segment4 = newSegment(Intervals.of("2019-04-01/2019-05-01"), 
version).withLoadSpec(ImmutableMap.of("k", 4));
   }
 
   @Test
@@ -125,6 +126,212 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
         new KillTaskReport.Stats(1, 2),
         getReportedStats()
     );
+    Assert.assertEquals(ImmutableSet.of(segment3), 
getDataSegmentKiller().getKilledSegments());
+  }
+
+  @Test
+  public void testKillSegmentsDeleteUnreferencedSiblings() throws Exception
+  {
+    final Map<String, String> upgradeSegmentMapping = ImmutableMap.of(
+        segment1.getId().toString(),
+        "nonExistentParent",
+        segment2.getId().toString(),
+        "nonExistentParent"
+    );
+    insertUsedSegments(ImmutableSet.of(segment1, segment2), 
upgradeSegmentMapping);
+    getStorageCoordinator().markSegmentsAsUnusedWithinInterval(DATA_SOURCE, 
Intervals.ETERNITY);
+
+
+    final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
+        .dataSource(DATA_SOURCE)
+        .interval(Intervals.ETERNITY)
+        .build();
+
+    Assert.assertEquals(TaskState.SUCCESS, 
taskRunner.run(task).get().getStatusCode());
+
+    final List<DataSegment> observedUnusedSegments =
+        getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+            DATA_SOURCE,
+            Intervals.ETERNITY,
+            null,
+            null,
+            null
+        );
+
+    Assert.assertEquals(Collections.emptyList(), observedUnusedSegments);
+
+    Assert.assertEquals(
+        new KillTaskReport.Stats(2, 2),
+        getReportedStats()
+    );
+    Assert.assertEquals(ImmutableSet.of(segment1, segment2), 
getDataSegmentKiller().getKilledSegments());
+  }
+
+  @Test
+  public void testKillSegmentsDoNotDeleteReferencedSibling() throws Exception
+  {
+    final Map<String, String> upgradeSegmentMapping = ImmutableMap.of(
+        segment1.getId().toString(),
+        "nonExistentParent",
+        segment2.getId().toString(),
+        "nonExistentParent"
+    );
+    insertUsedSegments(ImmutableSet.of(segment1, segment2), 
upgradeSegmentMapping);
+    getStorageCoordinator().markSegmentsAsUnusedWithinInterval(DATA_SOURCE, 
Intervals.ETERNITY);
+
+
+    final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
+        .dataSource(DATA_SOURCE)
+        .interval(segment1.getInterval())
+        .build();
+
+    Assert.assertEquals(TaskState.SUCCESS, 
taskRunner.run(task).get().getStatusCode());
+
+    final List<DataSegment> observedUnusedSegments =
+        getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+            DATA_SOURCE,
+            Intervals.ETERNITY,
+            null,
+            null,
+            null
+        );
+
+    Assert.assertEquals(Collections.singletonList(segment2), 
observedUnusedSegments);
+
+    Assert.assertEquals(
+        new KillTaskReport.Stats(0, 2),
+        getReportedStats()
+    );
+    Assert.assertEquals(Collections.emptySet(), 
getDataSegmentKiller().getKilledSegments());
+  }
+
+  @Test
+  public void testKillSegmentsDoNotDeleteParentWithReferencedChildren() throws 
Exception
+  {
+    final Map<String, String> upgradeSegmentMapping = ImmutableMap.of(
+        segment1.getId().toString(),
+        segment3.getId().toString(),
+        segment2.getId().toString(),
+        segment3.getId().toString()
+    );
+    insertUsedSegments(ImmutableSet.of(segment1, segment2, segment3), 
upgradeSegmentMapping);
+    getSegmentsMetadataManager().markSegmentAsUnused(segment2.getId());
+    getSegmentsMetadataManager().markSegmentAsUnused(segment3.getId());
+
+
+    final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
+        .dataSource(DATA_SOURCE)
+        .interval(Intervals.ETERNITY)
+        .build();
+
+    Assert.assertEquals(TaskState.SUCCESS, 
taskRunner.run(task).get().getStatusCode());
+
+    final List<DataSegment> observedUnusedSegments =
+        getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+            DATA_SOURCE,
+            Intervals.ETERNITY,
+            null,
+            null,
+            null
+        );
+    Assert.assertEquals(ImmutableList.of(), observedUnusedSegments);
+    Assertions.assertThat(
+        getMetadataStorageCoordinator().retrieveUsedSegmentsForInterval(
+            DATA_SOURCE,
+            Intervals.ETERNITY,
+            Segments.ONLY_VISIBLE
+        )
+    ).containsExactlyInAnyOrder(segment1);
+
+    Assert.assertEquals(
+        new KillTaskReport.Stats(0, 2),
+        getReportedStats()
+    );
+    Assert.assertEquals(Collections.emptySet(), 
getDataSegmentKiller().getKilledSegments());
+  }
+
+  @Test
+  public void testKillSegmentsDoNotDeleteChildrenWithReferencedParent() throws 
Exception
+  {
+    final Map<String, String> upgradeSegmentMapping = ImmutableMap.of(
+        segment1.getId().toString(),
+        segment3.getId().toString(),
+        segment2.getId().toString(),
+        segment3.getId().toString()
+    );
+    insertUsedSegments(ImmutableSet.of(segment1, segment2, segment3), 
upgradeSegmentMapping);
+    getSegmentsMetadataManager().markSegmentAsUnused(segment1.getId());
+    getSegmentsMetadataManager().markSegmentAsUnused(segment2.getId());
+
+
+    final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
+        .dataSource(DATA_SOURCE)
+        .interval(Intervals.ETERNITY)
+        .build();
+
+    Assert.assertEquals(TaskState.SUCCESS, 
taskRunner.run(task).get().getStatusCode());
+
+    final List<DataSegment> observedUnusedSegments =
+        getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+            DATA_SOURCE,
+            Intervals.ETERNITY,
+            null,
+            null,
+            null
+        );
+    Assert.assertEquals(ImmutableList.of(), observedUnusedSegments);
+    Assertions.assertThat(
+        getMetadataStorageCoordinator().retrieveUsedSegmentsForInterval(
+            DATA_SOURCE,
+            Intervals.ETERNITY,
+            Segments.ONLY_VISIBLE
+        )
+    ).containsExactlyInAnyOrder(segment3);
+
+    Assert.assertEquals(
+        new KillTaskReport.Stats(0, 2),
+        getReportedStats()
+    );
+    Assert.assertEquals(Collections.emptySet(), 
getDataSegmentKiller().getKilledSegments());
+  }
+
+  @Test
+  public void testKillSegmentsDeleteChildrenAndParent() throws Exception
+  {
+    final Map<String, String> upgradeSegmentMapping = ImmutableMap.of(
+        segment1.getId().toString(),
+        segment3.getId().toString(),
+        segment2.getId().toString(),
+        segment3.getId().toString()
+    );
+    insertUsedSegments(ImmutableSet.of(segment1, segment2, segment3), 
upgradeSegmentMapping);
+    getSegmentsMetadataManager().markSegmentAsUnused(segment1.getId());
+    getSegmentsMetadataManager().markSegmentAsUnused(segment2.getId());
+    getSegmentsMetadataManager().markSegmentAsUnused(segment3.getId());
+
+
+    final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
+        .dataSource(DATA_SOURCE)
+        .interval(Intervals.ETERNITY)
+        .build();
+
+    Assert.assertEquals(TaskState.SUCCESS, 
taskRunner.run(task).get().getStatusCode());
+
+    final List<DataSegment> observedUnusedSegments =
+        getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+            DATA_SOURCE,
+            Intervals.ETERNITY,
+            null,
+            null,
+            null
+        );
+    Assert.assertEquals(ImmutableList.of(), observedUnusedSegments);
+
+    Assert.assertEquals(
+        new KillTaskReport.Stats(3, 2),
+        getReportedStats()
+    );
+    Assert.assertEquals(ImmutableSet.of(segment1, segment2, segment3), 
getDataSegmentKiller().getKilledSegments());
   }
 
   @Test
@@ -1247,4 +1454,16 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
         10L
     );
   }
+
+  private void insertUsedSegments(Set<DataSegment> segments, Map<String, 
String> upgradedFromSegmentIdMap)
+  {
+    final String table = 
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
+    IndexerSqlMetadataStorageCoordinatorTestBase.insertUsedSegments(
+        segments,
+        upgradedFromSegmentIdMap,
+        derbyConnectorRule.getConnector(),
+        table,
+        getObjectMapper()
+    );
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java
index 33421eb1a5c..92581f6dd1e 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java
@@ -22,12 +22,18 @@ package org.apache.druid.indexing.test;
 import org.apache.druid.segment.loading.DataSegmentKiller;
 import org.apache.druid.timeline.DataSegment;
 
+import java.util.HashSet;
+import java.util.Set;
+
 public class TestDataSegmentKiller implements DataSegmentKiller
 {
+
+  private final Set<DataSegment> killedSegments = new HashSet<>();
+
   @Override
   public void kill(DataSegment segment)
   {
-    // do nothing
+    killedSegments.add(segment);
   }
 
   @Override
@@ -35,4 +41,9 @@ public class TestDataSegmentKiller implements 
DataSegmentKiller
   {
     throw new UnsupportedOperationException("not implemented");
   }
+
+  public Set<DataSegment> getKilledSegments()
+  {
+    return killedSegments;
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index 61a57e94842..d2055d6e0c9 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -314,6 +314,24 @@ public class TestIndexerMetadataStorageCoordinator 
implements IndexerMetadataSto
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public Map<String, String> retrieveUpgradedFromSegmentIds(
+      final String dataSource,
+      final Set<String> segmentIds
+  )
+  {
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public Map<String, Set<String>> retrieveUpgradedToSegmentIds(
+      final String dataSource,
+      final Set<String> segmentIds
+  )
+  {
+    return Collections.emptyMap();
+  }
+
   public Set<DataSegment> getPublished()
   {
     return ImmutableSet.copyOf(published);
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index c055a8d9e9f..83b4ac7e474 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -473,4 +473,21 @@ public interface IndexerMetadataStorageCoordinator
    * @return List of pending segment records
    */
   List<PendingSegmentRecord> getPendingSegments(String datasource, Interval 
interval);
+
+  /**
+   * Map from a segment ID to the segment ID from which it was upgraded
+   * There should be no entry in the map for an original non-upgraded segment
+   * @param dataSource data source
+   * @param segmentIds ids of segments
+   */
+  Map<String, String> retrieveUpgradedFromSegmentIds(String dataSource, 
Set<String> segmentIds);
+
+  /**
+   * Map from a segment ID to a set containing
+   * 1) all segment IDs that were upgraded from it AND are still present in 
the metadata store
+   * 2) the segment ID itself if and only if it is still present in the 
metadata store
+   * @param dataSource data source
+   * @param segmentIds ids of the first segments which had the corresponding 
load spec
+   */
+  Map<String, Set<String>> retrieveUpgradedToSegmentIds(String dataSource, 
Set<String> segmentIds);
 }
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index fd637728908..54f75ccb920 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -564,6 +564,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                 createNewIdsOfAppendSegmentsAfterReplace(handle, 
replaceSegments, locksHeldByReplaceTask);
 
             Map<SegmentId, SegmentMetadata> upgradeSegmentMetadata = new 
HashMap<>();
+            final Map<String, String> upgradedFromSegmentIdMap = new 
HashMap<>();
             for (DataSegmentPlus dataSegmentPlus : upgradedSegments) {
               segmentsToInsert.add(dataSegmentPlus.getDataSegment());
               if (dataSegmentPlus.getSchemaFingerprint() != null && 
dataSegmentPlus.getNumRows() != null) {
@@ -572,6 +573,12 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                     new SegmentMetadata(dataSegmentPlus.getNumRows(), 
dataSegmentPlus.getSchemaFingerprint())
                 );
               }
+              if (dataSegmentPlus.getUpgradedFromSegmentId() != null) {
+                upgradedFromSegmentIdMap.put(
+                    dataSegmentPlus.getDataSegment().getId().toString(),
+                    dataSegmentPlus.getUpgradedFromSegmentId()
+                );
+              }
             }
             SegmentPublishResult result = SegmentPublishResult.ok(
                 insertSegments(
@@ -579,7 +586,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                     segmentsToInsert,
                     segmentSchemaMapping,
                     upgradeSegmentMetadata,
-                    Collections.emptyMap()
+                    Collections.emptyMap(),
+                    upgradedFromSegmentIdMap
                 ),
                 upgradePendingSegmentsOverlappingWith(segmentsToInsert)
             );
@@ -1408,6 +1416,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     final Set<DataSegment> allSegmentsToInsert = new HashSet<>(appendSegments);
     final Map<SegmentId, SegmentId> newVersionSegmentToParent = new 
HashMap<>();
     final Map<String, DataSegment> segmentIdMap = new HashMap<>();
+    final Map<String, String> upgradedFromSegmentIdMap = new HashMap<>();
     appendSegments.forEach(segment -> 
segmentIdMap.put(segment.getId().toString(), segment));
     segmentIdsForNewVersions.forEach(
         pendingSegment -> {
@@ -1415,6 +1424,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
             final DataSegment oldSegment = 
segmentIdMap.get(pendingSegment.getUpgradedFromSegmentId());
             final SegmentId newVersionSegmentId = 
pendingSegment.getId().asSegmentId();
             newVersionSegmentToParent.put(newVersionSegmentId, 
oldSegment.getId());
+            upgradedFromSegmentIdMap.put(newVersionSegmentId.toString(), 
oldSegment.getId().toString());
             allSegmentsToInsert.add(
                 new DataSegment(
                     pendingSegment.getId().asSegmentId(),
@@ -1473,7 +1483,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                     allSegmentsToInsert,
                     segmentSchemaMapping,
                     Collections.emptyMap(),
-                    newVersionSegmentToParent
+                    newVersionSegmentToParent,
+                    upgradedFromSegmentIdMap
                 )
             );
           },
@@ -2092,7 +2103,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
               .bind("version", segment.getVersion())
               .bind("used", usedSegments.contains(segment))
               .bind("payload", jsonMapper.writeValueAsBytes(segment))
-              .bind("used_status_last_updated", now);
+              .bind("used_status_last_updated", now)
+              .bind("upgraded_from_segment_id", (String) null);
 
           if (schemaPersistEnabled) {
             Long numRows = null;
@@ -2217,6 +2229,11 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                                            .shardSpec(shardSpec)
                                            .build();
 
+      // When the segment already has an upgraded_from_segment_id, reuse it 
for its children
+      final String upgradedFromSegmentId = 
oldSegmentMetadata.getUpgradedFromSegmentId() == null
+                                           ? 
oldSegmentMetadata.getDataSegment().getId().toString()
+                                           : 
oldSegmentMetadata.getUpgradedFromSegmentId();
+
       upgradedSegments.add(
           new DataSegmentPlus(
               dataSegment,
@@ -2224,7 +2241,9 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
               null,
               null,
               oldSegmentMetadata.getSchemaFingerprint(),
-              oldSegmentMetadata.getNumRows())
+              oldSegmentMetadata.getNumRows(),
+              upgradedFromSegmentId
+          )
       );
     }
 
@@ -2266,7 +2285,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       Set<DataSegment> segments,
       @Nullable SegmentSchemaMapping segmentSchemaMapping,
       Map<SegmentId, SegmentMetadata> upgradeSegmentMetadata,
-      Map<SegmentId, SegmentId> newVersionForAppendToParent
+      Map<SegmentId, SegmentId> newVersionForAppendToParent,
+      Map<String, String> upgradedFromSegmentIdMap
   ) throws IOException
   {
     boolean shouldPersistSchema = shouldPersistSchema(segmentSchemaMapping);
@@ -2302,7 +2322,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                  .bind("version", segment.getVersion())
                  .bind("used", true)
                  .bind("payload", jsonMapper.writeValueAsBytes(segment))
-                 .bind("used_status_last_updated", now);
+                 .bind("used_status_last_updated", now)
+                 .bind("upgraded_from_segment_id", 
upgradedFromSegmentIdMap.get(segment.getId().toString()));
 
         if (schemaPersistEnabled) {
           SegmentMetadata segmentMetadata =
@@ -2449,9 +2470,9 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   {
     String insertStatement =
         "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s,"
-        + " partitioned, version, used, payload, used_status_last_updated 
%3$s) "
+        + " partitioned, version, used, payload, used_status_last_updated, 
upgraded_from_segment_id %3$s) "
         + "VALUES (:id, :dataSource, :created_date, :start, :end,"
-        + " :partitioned, :version, :used, :payload, :used_status_last_updated 
%4$s)";
+        + " :partitioned, :version, :used, :payload, 
:used_status_last_updated, :upgraded_from_segment_id %4$s)";
 
     if (schemaPersistEnabled) {
       return StringUtils.format(
@@ -2923,6 +2944,87 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     );
   }
 
+  @Override
+  public Map<String, String> retrieveUpgradedFromSegmentIds(
+      final String dataSource,
+      final Set<String> segmentIds
+  )
+  {
+    if (segmentIds.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    final List<String> segmentIdList = ImmutableList.copyOf(segmentIds);
+    final String sql = StringUtils.format(
+        "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = 
:dataSource %s",
+        dbTables.getSegmentsTable(),
+        SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id", 
segmentIdList)
+    );
+    final Map<String, String> upgradedFromSegmentIds = new HashMap<>();
+    connector.retryWithHandle(
+        handle -> {
+          Query<Map<String, Object>> query = handle.createQuery(sql)
+                                                   .bind("dataSource", 
dataSource);
+          
SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition("id", 
segmentIdList, query);
+          return query.map((index, r, ctx) -> {
+            final String id = r.getString(1);
+            final String upgradedFromSegmentId = r.getString(2);
+            if (upgradedFromSegmentId != null) {
+              upgradedFromSegmentIds.put(id, upgradedFromSegmentId);
+            }
+            return null;
+          }).list();
+        }
+    );
+    return upgradedFromSegmentIds;
+  }
+
+  @Override
+  public Map<String, Set<String>> retrieveUpgradedToSegmentIds(
+      final String dataSource,
+      final Set<String> segmentIds
+  )
+  {
+    if (segmentIds.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    final List<String> upgradedFromSegmentIdList = 
ImmutableList.copyOf(segmentIds);
+    final String sql = StringUtils.format(
+        "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = 
:dataSource %s",
+        dbTables.getSegmentsTable(),
+        SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn(
+            "upgraded_from_segment_id",
+            upgradedFromSegmentIdList
+        )
+    );
+    final Map<String, Set<String>> upgradedToSegmentIds = new HashMap<>();
+    retrieveSegmentsById(dataSource, segmentIds)
+        .stream()
+        .map(DataSegment::getId)
+        .map(SegmentId::toString)
+        .forEach(id -> upgradedToSegmentIds.computeIfAbsent(id, k -> new 
HashSet<>()).add(id));
+    connector.retryWithHandle(
+        handle -> {
+          Query<Map<String, Object>> query = handle.createQuery(sql)
+                                                   .bind("dataSource", 
dataSource);
+          SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition(
+              "upgraded_from_segment_id",
+              upgradedFromSegmentIdList,
+              query
+          );
+          return query.map((index, r, ctx) -> {
+            final String upgradedToId = r.getString(1);
+            final String id = r.getString(2);
+            upgradedToSegmentIds.computeIfAbsent(id, k -> new HashSet<>())
+                                .add(upgradedToId);
+            return null;
+          }).list();
+        }
+    );
+    return upgradedToSegmentIds;
+  }
+
   private static class PendingSegmentsRecord
   {
     private final String sequenceName;
diff --git 
a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java 
b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
index bfbaad18ef1..f117fe7f28b 100644
--- a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
+++ b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
@@ -40,7 +40,10 @@ import java.sql.ResultSet;
  * <li>  id -> id (Unique identifier for pending segment) <li/>
  * <li>  sequence_name -> sequenceName (sequence name used for segment 
allocation) <li/>
  * <li>  sequence_prev_id -> sequencePrevId (previous segment id used for 
segment allocation) <li/>
- * <li>  upgraded_from_segment_id -> upgradedFromSegmentId (Id of the root 
segment from which this was upgraded) <li/>
+ * <li>  upgraded_from_segment_id -> upgradedFromSegmentId
+ * (ID of the segment which was upgraded to create the current segment.
+ * If the former was itself created as a result of an upgrade, then this ID
+ * must refer to the original non-upgraded segment in the hierarchy.) <li/>
  * <li>  task_allocator_id -> taskAllocatorId (Associates a task / task group 
/ replica group with the pending segment) <li/>
  * </ul>
  */
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java 
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
index 2d315d19fc8..dc87b9fc2fd 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
@@ -587,6 +587,8 @@ public abstract class SQLMetadataConnector implements 
MetadataStorageConnector
     Map<String, String> columnNameTypes = new HashMap<>();
     columnNameTypes.put("used_status_last_updated", "VARCHAR(255)");
 
+    columnNameTypes.put("upgraded_from_segment_id", "VARCHAR(255)");
+
     if (centralizedDatasourceSchemaConfig.isEnabled()) {
       columnNameTypes.put("schema_fingerprint", "VARCHAR(255)");
       columnNameTypes.put("num_rows", "BIGINT");
@@ -619,6 +621,14 @@ public abstract class SQLMetadataConnector implements 
MetadataStorageConnector
     }
 
     alterTable(tableName, alterCommands);
+
+    final Set<String> createdIndexSet = getIndexOnTable(tableName);
+    createIndex(
+        tableName,
+        StringUtils.format("idx_%1$s_datasource_upgraded_from_segment_id", 
tableName),
+        ImmutableList.of("dataSource", "upgraded_from_segment_id"),
+        createdIndexSet
+    );
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index f14cc995050..fc1c84a7037 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -286,7 +286,7 @@ public class SqlSegmentsMetadataQuery
     if (includeSchemaInfo) {
       final Query<Map<String, Object>> query = handle.createQuery(
           StringUtils.format(
-              "SELECT payload, used, schema_fingerprint, num_rows FROM %s 
WHERE dataSource = :dataSource %s",
+              "SELECT payload, used, schema_fingerprint, num_rows, 
upgraded_from_segment_id FROM %s WHERE dataSource = :dataSource %s",
               dbTables.getSegmentsTable(), 
getParameterizedInConditionForColumn("id", segmentIds)
           )
       );
@@ -306,7 +306,8 @@ public class SqlSegmentsMetadataQuery
                     null,
                     r.getBoolean(2),
                     schemaFingerprint,
-                    numRows
+                    numRows,
+                    r.getString(5)
                 );
               }
           )
@@ -314,7 +315,7 @@ public class SqlSegmentsMetadataQuery
     } else {
       final Query<Map<String, Object>> query = handle.createQuery(
           StringUtils.format(
-              "SELECT payload, used FROM %s WHERE dataSource = :dataSource %s",
+              "SELECT payload, used, upgraded_from_segment_id FROM %s WHERE 
dataSource = :dataSource %s",
               dbTables.getSegmentsTable(), 
getParameterizedInConditionForColumn("id", segmentIds)
           )
       );
@@ -331,7 +332,8 @@ public class SqlSegmentsMetadataQuery
                   null,
                   r.getBoolean(2),
                   null,
-                  null
+                  null,
+                  r.getString(3)
               )
           )
           .iterator();
@@ -864,6 +866,7 @@ public class SqlSegmentsMetadataQuery
             DateTimes.of(r.getString(3)),
             null,
             null,
+            null,
             null
         ))
         .iterator();
@@ -980,7 +983,7 @@ public class SqlSegmentsMetadataQuery
    *
    * @see #getParameterizedInConditionForColumn(String, List)
    */
-  private static void bindColumnValuesToQueryWithInCondition(
+  static void bindColumnValuesToQueryWithInCondition(
       final String columnName,
       final List<String> values,
       final SQLStatement<?> query
diff --git 
a/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java 
b/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java
index 9841e09a1a7..bfda5cbf3ad 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java
@@ -36,6 +36,8 @@ import java.util.Objects;
  * <li>{@link DataSegmentPlus#createdDate} - The time when the segment was 
created.</li>
  * <li>{@link DataSegmentPlus#usedStatusLastUpdatedDate} - The time when the 
segments
  * used status was last updated.</li>
+ * <li>{@link DataSegmentPlus#upgradedFromSegmentId} - The segment id to which 
the same load spec originally belonged.
+ * Load specs can be shared as a result of segment version upgrades.</li>
  * </ul>
  * <p>
  * This class closely resembles the row structure of the {@link 
MetadataStorageTablesConfig#getSegmentsTable()}.
@@ -53,6 +55,9 @@ public class DataSegmentPlus
   private final String schemaFingerprint;
   private final Long numRows;
 
+  @Nullable
+  private final String upgradedFromSegmentId;
+
   @JsonCreator
   public DataSegmentPlus(
       @JsonProperty("dataSegment") final DataSegment dataSegment,
@@ -60,7 +65,8 @@ public class DataSegmentPlus
       @JsonProperty("usedStatusLastUpdatedDate") @Nullable final DateTime 
usedStatusLastUpdatedDate,
       @JsonProperty("used") @Nullable final Boolean used,
       @JsonProperty("schemaFingerprint") @Nullable final String 
schemaFingerprint,
-      @JsonProperty("numRows") @Nullable final Long numRows
+      @JsonProperty("numRows") @Nullable final Long numRows,
+      @JsonProperty("upgradedFromSegmentId") @Nullable final String 
upgradedFromSegmentId
   )
   {
     this.dataSegment = dataSegment;
@@ -69,6 +75,7 @@ public class DataSegmentPlus
     this.used = used;
     this.schemaFingerprint = schemaFingerprint;
     this.numRows = numRows;
+    this.upgradedFromSegmentId = upgradedFromSegmentId;
   }
 
   @Nullable
@@ -112,6 +119,13 @@ public class DataSegmentPlus
     return numRows;
   }
 
+  @Nullable
+  @JsonProperty
+  public String getUpgradedFromSegmentId()
+  {
+    return upgradedFromSegmentId;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -127,7 +141,8 @@ public class DataSegmentPlus
            && Objects.equals(usedStatusLastUpdatedDate, 
that.getUsedStatusLastUpdatedDate())
            && Objects.equals(used, that.getUsed())
            && Objects.equals(schemaFingerprint, that.getSchemaFingerprint())
-           && Objects.equals(numRows, that.getNumRows());
+           && Objects.equals(numRows, that.getNumRows())
+           && Objects.equals(upgradedFromSegmentId, 
that.getUpgradedFromSegmentId());
   }
 
   @Override
@@ -139,7 +154,8 @@ public class DataSegmentPlus
         usedStatusLastUpdatedDate,
         used,
         schemaFingerprint,
-        numRows
+        numRows,
+        upgradedFromSegmentId
     );
   }
 
@@ -153,6 +169,7 @@ public class DataSegmentPlus
            ", used=" + getUsed() +
            ", schemaFingerprint=" + getSchemaFingerprint() +
            ", numRows=" + getNumRows() +
+           ", upgradedFromSegmentId=" + getUpgradedFromSegmentId() +
            '}';
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 222c1ece89f..f352d5e2609 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -138,8 +138,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     final String v1 = "2023-01-01";
     final String v2 = "2023-01-02";
     final String v3 = "2023-01-03";
+    final String alreadyUpgradedVersion = "2023-02-01";
     final String lockVersion = "2024-01-01";
 
+    final String taskAllocatorId = "appendTask";
     final String replaceTaskId = "replaceTask1";
     final ReplaceTaskLock replaceLock = new ReplaceTaskLock(
         replaceTaskId,
@@ -148,6 +150,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     );
 
     final Set<DataSegment> appendSegments = new HashSet<>();
+    final List<PendingSegmentRecord> pendingSegmentsForTask = new 
ArrayList<>();
     final Set<DataSegment> expectedSegmentsToUpgrade = new HashSet<>();
     for (int i = 0; i < 10; i++) {
       final DataSegment segment = createSegment(
@@ -157,6 +160,31 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
       );
       appendSegments.add(segment);
       expectedSegmentsToUpgrade.add(segment);
+      // Add the same segment
+      pendingSegmentsForTask.add(
+          new PendingSegmentRecord(
+              SegmentIdWithShardSpec.fromDataSegment(segment),
+              v1,
+              segment.getId().toString(),
+              null,
+              taskAllocatorId
+          )
+      );
+      // Add upgraded pending segment
+      pendingSegmentsForTask.add(
+          new PendingSegmentRecord(
+              new SegmentIdWithShardSpec(
+                  DS.WIKI,
+                  Intervals.of("2023-01-01/2023-02-01"),
+                  alreadyUpgradedVersion,
+                  new NumberedShardSpec(i, 0)
+              ),
+              alreadyUpgradedVersion,
+              segment.getId().toString(),
+              segment.getId().toString(),
+              taskAllocatorId
+          )
+      );
     }
 
     for (int i = 0; i < 10; i++) {
@@ -167,6 +195,31 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
       );
       appendSegments.add(segment);
       expectedSegmentsToUpgrade.add(segment);
+      // Add the same segment
+      pendingSegmentsForTask.add(
+          new PendingSegmentRecord(
+              SegmentIdWithShardSpec.fromDataSegment(segment),
+              v2,
+              segment.getId().toString(),
+              null,
+              taskAllocatorId
+          )
+      );
+      // Add upgraded pending segment
+      pendingSegmentsForTask.add(
+          new PendingSegmentRecord(
+              new SegmentIdWithShardSpec(
+                  DS.WIKI,
+                  Intervals.of("2023-01-01/2023-02-01"),
+                  alreadyUpgradedVersion,
+                  new NumberedShardSpec(10 + i, 0)
+              ),
+              alreadyUpgradedVersion,
+              segment.getId().toString(),
+              segment.getId().toString(),
+              taskAllocatorId
+          )
+      );
     }
 
     for (int i = 0; i < 10; i++) {
@@ -176,23 +229,78 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
           new LinearShardSpec(i)
       );
       appendSegments.add(segment);
+      // Add the same segment
+      pendingSegmentsForTask.add(
+          new PendingSegmentRecord(
+              SegmentIdWithShardSpec.fromDataSegment(segment),
+              v3,
+              segment.getId().toString(),
+              null,
+              taskAllocatorId
+          )
+      );
+      // Add upgraded pending segment
+      pendingSegmentsForTask.add(
+          new PendingSegmentRecord(
+              new SegmentIdWithShardSpec(
+                  DS.WIKI,
+                  Intervals.of("2023-01-01/2023-02-01"),
+                  alreadyUpgradedVersion,
+                  new NumberedShardSpec(20 + i, 0)
+              ),
+              alreadyUpgradedVersion,
+              segment.getId().toString(),
+              segment.getId().toString(),
+              taskAllocatorId
+          )
+      );
     }
 
+    derbyConnector.retryWithHandle(
+        handle -> coordinator.insertPendingSegmentsIntoMetastore(handle, 
pendingSegmentsForTask, DS.WIKI, false)
+    );
+
     final Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock
         = expectedSegmentsToUpgrade.stream()
                                    .collect(Collectors.toMap(s -> s, s -> 
replaceLock));
 
     // Commit the segment and verify the results
     SegmentPublishResult commitResult
-        = coordinator.commitAppendSegments(appendSegments, 
segmentToReplaceLock, "append", null);
+        = coordinator.commitAppendSegments(appendSegments, 
segmentToReplaceLock, taskAllocatorId, null);
     Assert.assertTrue(commitResult.isSuccess());
-    Assert.assertEquals(appendSegments, commitResult.getSegments());
 
-    // Verify the segments present in the metadata store
-    Assert.assertEquals(
-        appendSegments,
-        
ImmutableSet.copyOf(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get()))
+    Set<DataSegment> allCommittedSegments
+        = new 
HashSet<>(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get()));
+    Map<String, String> upgradedFromSegmentIdMap = 
coordinator.retrieveUpgradedFromSegmentIds(
+        DS.WIKI,
+        
allCommittedSegments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet())
     );
+    // Verify the segments present in the metadata store
+    Assert.assertTrue(allCommittedSegments.containsAll(appendSegments));
+    for (DataSegment segment : appendSegments) {
+      
Assert.assertNull(upgradedFromSegmentIdMap.get(segment.getId().toString()));
+    }
+    allCommittedSegments.removeAll(appendSegments);
+
+    // Verify the commit of upgraded pending segments
+    Assert.assertEquals(appendSegments.size(), allCommittedSegments.size());
+    Map<String, DataSegment> segmentMap = new HashMap<>();
+    for (DataSegment segment : appendSegments) {
+      segmentMap.put(segment.getId().toString(), segment);
+    }
+    for (DataSegment segment : allCommittedSegments) {
+      for (PendingSegmentRecord pendingSegmentRecord : pendingSegmentsForTask) 
{
+        if 
(pendingSegmentRecord.getId().asSegmentId().toString().equals(segment.getId().toString()))
 {
+          DataSegment upgradedFromSegment = 
segmentMap.get(pendingSegmentRecord.getUpgradedFromSegmentId());
+          Assert.assertNotNull(upgradedFromSegment);
+          Assert.assertEquals(segment.getLoadSpec(), 
upgradedFromSegment.getLoadSpec());
+          Assert.assertEquals(
+              pendingSegmentRecord.getUpgradedFromSegmentId(),
+              upgradedFromSegmentIdMap.get(segment.getId().toString())
+          );
+        }
+      }
+    }
 
     // Verify entries in the segment task lock table
     final Set<String> expectedUpgradeSegmentIds
@@ -290,12 +398,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
         
retrieveUsedSegmentIds(derbyConnectorRule.metadataTablesConfigSupplier().get()).size()
     );
 
-    final Set<DataSegment> usedSegments = new 
HashSet<>(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get()));
+    final Set<DataSegment> usedSegments
+        = new 
HashSet<>(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get()));
+
+    final Map<String, String> upgradedFromSegmentIdMap = 
coordinator.retrieveUpgradedFromSegmentIds(
+        "foo",
+        
usedSegments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet())
+    );
 
     
Assert.assertTrue(usedSegments.containsAll(segmentsAppendedWithReplaceLock));
+    for (DataSegment appendSegment : segmentsAppendedWithReplaceLock) {
+      
Assert.assertNull(upgradedFromSegmentIdMap.get(appendSegment.getId().toString()));
+    }
     usedSegments.removeAll(segmentsAppendedWithReplaceLock);
 
     Assert.assertTrue(usedSegments.containsAll(replacingSegments));
+    for (DataSegment replaceSegment : replacingSegments) {
+      
Assert.assertNull(upgradedFromSegmentIdMap.get(replaceSegment.getId().toString()));
+    }
     usedSegments.removeAll(replacingSegments);
 
     Assert.assertEquals(segmentsAppendedWithReplaceLock.size(), 
usedSegments.size());
@@ -303,6 +423,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
       boolean hasBeenCarriedForward = false;
       for (DataSegment appendedSegment : segmentsAppendedWithReplaceLock) {
         if 
(appendedSegment.getLoadSpec().equals(segmentReplicaWithNewVersion.getLoadSpec()))
 {
+          Assert.assertEquals(
+              appendedSegment.getId().toString(),
+              
upgradedFromSegmentIdMap.get(segmentReplicaWithNewVersion.getId().toString())
+          );
           hasBeenCarriedForward = true;
           break;
         }
@@ -3300,4 +3424,63 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
         unusedSegmentIdsForIntervalAndVersion.get(0)
     );
   }
+
+  @Test
+  public void testRetrieveUpgradedFromSegmentIds()
+  {
+    final String datasource = defaultSegment.getDataSource();
+    final Map<String, String> upgradedFromSegmentIdMap = new HashMap<>();
+    upgradedFromSegmentIdMap.put(defaultSegment2.getId().toString(), 
defaultSegment.getId().toString());
+    insertUsedSegments(ImmutableSet.of(defaultSegment, defaultSegment2), 
upgradedFromSegmentIdMap);
+    coordinator.markSegmentsAsUnusedWithinInterval(datasource, 
Intervals.ETERNITY);
+    upgradedFromSegmentIdMap.clear();
+    upgradedFromSegmentIdMap.put(defaultSegment3.getId().toString(), 
defaultSegment.getId().toString());
+    insertUsedSegments(ImmutableSet.of(defaultSegment3, defaultSegment4), 
upgradedFromSegmentIdMap);
+
+    Map<String, String> expected = new HashMap<>();
+    expected.put(defaultSegment2.getId().toString(), 
defaultSegment.getId().toString());
+    expected.put(defaultSegment3.getId().toString(), 
defaultSegment.getId().toString());
+
+    Set<String> segmentIds = new HashSet<>();
+    segmentIds.add(defaultSegment.getId().toString());
+    segmentIds.add(defaultSegment2.getId().toString());
+    segmentIds.add(defaultSegment3.getId().toString());
+    segmentIds.add(defaultSegment4.getId().toString());
+    Assert.assertEquals(
+        expected,
+        coordinator.retrieveUpgradedFromSegmentIds(datasource, segmentIds)
+    );
+  }
+
+  @Test
+  public void testRetrieveUpgradedToSegmentIds()
+  {
+    final String datasource = defaultSegment.getDataSource();
+    final Map<String, String> upgradedFromSegmentIdMap = new HashMap<>();
+    upgradedFromSegmentIdMap.put(defaultSegment2.getId().toString(), 
defaultSegment.getId().toString());
+    insertUsedSegments(ImmutableSet.of(defaultSegment, defaultSegment2), 
upgradedFromSegmentIdMap);
+    coordinator.markSegmentsAsUnusedWithinInterval(datasource, 
Intervals.ETERNITY);
+    upgradedFromSegmentIdMap.clear();
+    upgradedFromSegmentIdMap.put(defaultSegment3.getId().toString(), 
defaultSegment.getId().toString());
+    insertUsedSegments(ImmutableSet.of(defaultSegment3, defaultSegment4), 
upgradedFromSegmentIdMap);
+
+    Map<String, Set<String>> expected = new HashMap<>();
+    expected.put(defaultSegment.getId().toString(), new HashSet<>());
+    
expected.get(defaultSegment.getId().toString()).add(defaultSegment.getId().toString());
+    
expected.get(defaultSegment.getId().toString()).add(defaultSegment2.getId().toString());
+    
expected.get(defaultSegment.getId().toString()).add(defaultSegment3.getId().toString());
+
+    Set<String> upgradedIds = new HashSet<>();
+    upgradedIds.add(defaultSegment.getId().toString());
+    Assert.assertEquals(
+        expected,
+        coordinator.retrieveUpgradedToSegmentIds(datasource, upgradedIds)
+    );
+  }
+
+  private void insertUsedSegments(Set<DataSegment> segments, Map<String, 
String> upgradedFromSegmentIdMap)
+  {
+    final String table = 
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
+    insertUsedSegments(segments, upgradedFromSegmentIdMap, derbyConnector, 
table, mapper);
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java
index ce0e0686058..2076e5ffa46 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java
@@ -58,6 +58,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
@@ -322,6 +323,8 @@ public class IndexerSqlMetadataStorageCoordinatorTestBase
                       .version(version)
                       .shardSpec(shardSpec)
                       .size(100)
+                      // hash to get a unique load spec as segmentId has not 
yet been generated
+                      .loadSpec(ImmutableMap.of("hash", Objects.hash(interval, 
version, shardSpec)))
                       .build();
   }
 
@@ -559,4 +562,50 @@ public class IndexerSqlMetadataStorageCoordinatorTestBase
         }
     );
   }
+
+  public static void insertUsedSegments(
+      Set<DataSegment> dataSegments,
+      Map<String, String> upgradedFromSegmentIdMap,
+      SQLMetadataConnector connector,
+      String table,
+      ObjectMapper jsonMapper
+  )
+  {
+    connector.retryWithHandle(
+        handle -> {
+          PreparedBatch preparedBatch = handle.prepareBatch(
+              StringUtils.format(
+                  "INSERT INTO %1$s (id, dataSource, created_date, start, 
%2$send%2$s, partitioned, version,"
+                  + " used, payload, used_status_last_updated, 
upgraded_from_segment_id) "
+                  + "VALUES (:id, :dataSource, :created_date, :start, :end, 
:partitioned, :version,"
+                  + " :used, :payload, :used_status_last_updated, 
:upgraded_from_segment_id)",
+                  table,
+                  connector.getQuoteString()
+              )
+          );
+          for (DataSegment segment : dataSegments) {
+            String id = segment.getId().toString();
+            preparedBatch.add()
+                         .bind("id", id)
+                         .bind("dataSource", segment.getDataSource())
+                         .bind("created_date", DateTimes.nowUtc().toString())
+                         .bind("start", 
segment.getInterval().getStart().toString())
+                         .bind("end", 
segment.getInterval().getEnd().toString())
+                         .bind("partitioned", !(segment.getShardSpec() 
instanceof NoneShardSpec))
+                         .bind("version", segment.getVersion())
+                         .bind("used", true)
+                         .bind("payload", 
jsonMapper.writeValueAsBytes(segment))
+                         .bind("used_status_last_updated", 
DateTimes.nowUtc().toString())
+                         .bind("upgraded_from_segment_id", 
upgradedFromSegmentIdMap.get(segment.getId().toString()));
+          }
+
+          final int[] affectedRows = preparedBatch.execute();
+          final boolean succeeded = 
Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1);
+          if (!succeeded) {
+            throw new ISE("Failed to publish segments to DB");
+          }
+          return true;
+        }
+    );
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java 
b/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java
index 0f20fc96bdc..b963f433708 100644
--- a/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java
@@ -100,6 +100,7 @@ public class DataSegmentPlusTest
         usedStatusLastUpdatedDate,
         null,
         null,
+        null,
         null
     );
 
@@ -108,7 +109,7 @@ public class DataSegmentPlusTest
         JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
     );
 
-    Assert.assertEquals(6, objectMap.size());
+    Assert.assertEquals(7, objectMap.size());
     final Map<String, Object> segmentObjectMap = MAPPER.readValue(
         MAPPER.writeValueAsString(segmentPlus.getDataSegment()),
         JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
diff --git 
a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java 
b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
index 4d6bbf5929b..9c52d639300 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
@@ -77,7 +77,7 @@ public class MetadataResourceTest
           .toArray(new DataSegment[0]);
 
   private final List<DataSegmentPlus> segmentsPlus = Arrays.stream(segments)
-          .map(s -> new DataSegmentPlus(s, DateTimes.nowUtc(), 
DateTimes.nowUtc(), null, null, null))
+          .map(s -> new DataSegmentPlus(s, DateTimes.nowUtc(), 
DateTimes.nowUtc(), null, null, null, null))
           .collect(Collectors.toList());
   private HttpServletRequest request;
   private SegmentsMetadataManager segmentsMetadataManager;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to