AmatyaAvadhanula commented on code in PR #16667:
URL: https://github.com/apache/druid/pull/16667#discussion_r1671727273


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -231,16 +234,57 @@ public TaskStatus runTask(TaskToolbox toolbox) throws 
Exception
         );
       }
 
-      // Kill segments
-      // Order is important here: we want the nuke action to clean up the 
metadata records _before_ the
-      // segments are removed from storage, this helps maintain that we will 
always have a storage segment if
-      // the metadata segment is present. If the segment nuke throws an 
exception, then the segment cleanup is
-      // abandoned.
+      // Kill segments. Order is important here:
+      // Retrieve the segment upgrade infos for the batch _before_ the 
segments are nuked
+      // We then want the nuke action to clean up the metadata records 
_before_ the segments are removed from storage.
+      // This helps maintain that we will always have a storage segment if the 
metadata segment is present.
+      // Determine the subset of segments to be killed from deep storage based 
on loadspecs.
+      // If the segment nuke throws an exception, then the segment cleanup is 
abandoned.
+
+      // Determine upgraded_from_segment_id before nuking
+      final TaskActionClient taskActionClient = toolbox.getTaskActionClient();
+      final Set<String> upgradedSegmentIds = unusedSegments.stream()
+                                                           
.map(DataSegment::getId)
+                                                           
.map(SegmentId::toString).collect(Collectors.toSet());
+      try {
+        taskActionClient.submit(new 
RetrieveUpgradedFromSegmentsIdsAction(getDataSource(), upgradedSegmentIds))
+                        .stream()
+                        .filter(upgradeInfo -> 
upgradeInfo.getUpgradedFromSegmentId() != null)
+                        .forEach(upgradeInfo -> 
upgradedSegmentIds.add(upgradeInfo.getUpgradedFromSegmentId()));
+      }
+      catch (Exception e) {
+        LOG.warn(
+            e,
+            "Could not retrieve segment upgrade infos using task 
action[retrieveUpgradedFromSegmentIds]."
+            + " Overlord maybe on an older version."
+        );
+      }
+
+      // Nuke Segments
+      taskActionClient.submit(new SegmentNukeAction(new 
HashSet<>(unusedSegments)));
+
+      // Determine unreferenced segments
+      final Set<String> referencedSegmentIds = new HashSet<>();
+      try {
+        taskActionClient.submit(new 
RetrieveUpgradedToSegmentsIdsAction(getDataSource(), upgradedSegmentIds))
+                        .forEach(upgradeInfo -> 
referencedSegmentIds.add(upgradeInfo.getId()));
+      }
+      catch (Exception e) {
+        LOG.warn(
+            e,
+            "Could not retrieve segment upgrade infos using task 
action[retrieveUpgradedFromSegmentIds]."

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to