kfaraz commented on code in PR #16667:
URL: https://github.com/apache/druid/pull/16667#discussion_r1673823924


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedToSegmentsResponse.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Response for the RetrieveUpgradedToSegmentIds task action
+ */

Review Comment:
   Nit: not needed, the name explains it.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -300,6 +356,81 @@ private NavigableMap<DateTime, List<TaskLock>> 
getNonRevokedTaskLockMap(TaskActi
     return taskLockMap;
   }
 
+  private List<DataSegment> findUnreferencedSegments(
+      List<DataSegment> unusedSegments,
+      Map<String, String> upgradedFromSegmentIds,
+      TaskActionClient taskActionClient
+  ) throws IOException
+  {
+    if (unusedSegments.isEmpty() || upgradedFromSegmentIds.isEmpty()) {
+      return unusedSegments;
+    }
+
+    final Set<String> upgradedSegmentIds = new HashSet<>();

Review Comment:
   ```suggestion
       final Set<String> segmentIdsToCheckForUsage = new HashSet<>();
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -66,6 +72,20 @@
  * The client representation of this task is {@link 
ClientKillUnusedSegmentsTaskQuery}.
  * JSON serialization fields of this class must correspond to those of {@link
  * ClientKillUnusedSegmentsTaskQuery}, except for {@link #id} and {@link 
#context} fields.
+ * <br/>
+ * <br/>
+ * The Kill task fetches the set of used segments for the interval and 
computes the set of their load specs. <br/>
+ * Until `limit` segments have been processed in total or all segments for the 
interval have been nuked: <br/>

Review Comment:
   Line break not needed here as lists start in a new line anyway.
   ```suggestion
    * Until `limit` segments have been processed in total or all segments for 
the interval have been nuked:
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -300,6 +356,81 @@ private NavigableMap<DateTime, List<TaskLock>> 
getNonRevokedTaskLockMap(TaskActi
     return taskLockMap;
   }
 
+  private List<DataSegment> findUnreferencedSegments(
+      List<DataSegment> unusedSegments,
+      Map<String, String> upgradedFromSegmentIds,
+      TaskActionClient taskActionClient
+  ) throws IOException
+  {
+    if (unusedSegments.isEmpty() || upgradedFromSegmentIds.isEmpty()) {
+      return unusedSegments;
+    }
+
+    final Set<String> upgradedSegmentIds = new HashSet<>();
+    for (DataSegment segment : unusedSegments) {
+      final String id = segment.getId().toString();
+      upgradedSegmentIds.add(id);
+      if (upgradedFromSegmentIds.containsKey(id)) {
+        upgradedSegmentIds.add(upgradedFromSegmentIds.get(id));
+      }
+    }
+
+    final Map<String, Set<String>> upgradedToSegmentIds;
+    try {
+      upgradedToSegmentIds = taskActionClient.submit(
+          new RetrieveUpgradedToSegmentIdsAction(getDataSource(), 
upgradedSegmentIds)
+      ).getUpgradedToSegmentIds();
+    }
+    catch (Exception e) {
+      LOG.warn(
+          e,
+          "Could not retrieve UpgradedToSegmentsResponse using task 
action[retrieveUpgradedToSegmentIds]."
+          + " Overlord may be on an older version."
+      );
+      return unusedSegments;
+    }
+
+    final Set<String> existingSegmentIds
+        = taskActionClient.submit(new 
RetrieveSegmentsByIdAction(getDataSource(), upgradedSegmentIds))
+                          .stream()
+                          .map(DataSegment::getId)
+                          .map(SegmentId::toString)
+                          .collect(Collectors.toSet());
+
+    List<DataSegment> unreferencedSegments = new ArrayList<>();

Review Comment:
   ```suggestion
       final List<DataSegment> segmentsToKill = new ArrayList<>();
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -231,29 +252,64 @@ public TaskStatus runTask(TaskToolbox toolbox) throws 
Exception
         );
       }
 
-      // Kill segments
-      // Order is important here: we want the nuke action to clean up the 
metadata records _before_ the
-      // segments are removed from storage, this helps maintain that we will 
always have a storage segment if
-      // the metadata segment is present. If the segment nuke throws an 
exception, then the segment cleanup is
-      // abandoned.
+      // Kill segments. Order is important here:
+      // Retrieve the segment upgrade infos for the batch _before_ the 
segments are nuked
+      // We then want the nuke action to clean up the metadata records 
_before_ the segments are removed from storage.
+      // This helps maintain that we will always have a storage segment if the 
metadata segment is present.
+      // Determine the subset of segments to be killed from deep storage based 
on loadspecs.
+      // If the segment nuke throws an exception, then the segment cleanup is 
abandoned.
+
+      // Determine upgraded segment ids before nuking
+      final Set<String> segmentIds = unusedSegments.stream()
+                                                   .map(DataSegment::getId)
+                                                   .map(SegmentId::toString)
+                                                   
.collect(Collectors.toSet());
+      final Map<String, String> upgradedFromSegmentIds = new HashMap<>();
+      try {
+        upgradedFromSegmentIds.putAll(
+            taskActionClient.submit(
+                new RetrieveUpgradedFromSegmentIdsAction(getDataSource(), 
segmentIds)
+            ).getUpgradedFromSegmentIds()
+        );
+      }
+      catch (Exception e) {
+        LOG.warn(
+            e,
+            "Could not retrieve UpgradedFromSegmentsResponse using task 
action[retrieveUpgradedFromSegmentIds]."
+            + " Overlord may be on an older version."
+        );
+      }
+
+      // Nuke Segments
+      taskActionClient.submit(new SegmentNukeAction(new 
HashSet<>(unusedSegments)));
 
-      toolbox.getTaskActionClient().submit(new SegmentNukeAction(new 
HashSet<>(unusedSegments)));
+      // Determine unreferenced segments
+      final List<DataSegment> unreferencedSegments
+          = findUnreferencedSegments(unusedSegments, upgradedFromSegmentIds, 
taskActionClient);
 
-      // Kill segments from the deep storage only if their load specs are not 
being used by any used segments
-      final List<DataSegment> segmentsToBeKilled = unusedSegments
+      // Kill segments from the deep storage only if their load specs are not 
being used by any other segments
+      // We still need to check with used load specs as segment upgrades were 
introduced before this change
+      final List<DataSegment> segmentsToBeKilled = unreferencedSegments
           .stream()
           .filter(unusedSegment -> unusedSegment.getLoadSpec() == null
                                    || 
!usedSegmentLoadSpecs.contains(unusedSegment.getLoadSpec()))
           .collect(Collectors.toList());

Review Comment:
   This logic should also move to the new method.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -66,6 +72,20 @@
  * The client representation of this task is {@link 
ClientKillUnusedSegmentsTaskQuery}.
  * JSON serialization fields of this class must correspond to those of {@link
  * ClientKillUnusedSegmentsTaskQuery}, except for {@link #id} and {@link 
#context} fields.
+ * <br/>

Review Comment:
   Thanks for the javadoc.
   Tip, you can use `<p>` to separate paragraphs. It looks nicer.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -300,6 +356,81 @@ private NavigableMap<DateTime, List<TaskLock>> 
getNonRevokedTaskLockMap(TaskActi
     return taskLockMap;
   }
 
+  private List<DataSegment> findUnreferencedSegments(
+      List<DataSegment> unusedSegments,
+      Map<String, String> upgradedFromSegmentIds,
+      TaskActionClient taskActionClient
+  ) throws IOException
+  {
+    if (unusedSegments.isEmpty() || upgradedFromSegmentIds.isEmpty()) {
+      return unusedSegments;
+    }
+
+    final Set<String> upgradedSegmentIds = new HashSet<>();
+    for (DataSegment segment : unusedSegments) {
+      final String id = segment.getId().toString();
+      upgradedSegmentIds.add(id);
+      if (upgradedFromSegmentIds.containsKey(id)) {
+        upgradedSegmentIds.add(upgradedFromSegmentIds.get(id));
+      }
+    }
+
+    final Map<String, Set<String>> upgradedToSegmentIds;
+    try {
+      upgradedToSegmentIds = taskActionClient.submit(
+          new RetrieveUpgradedToSegmentIdsAction(getDataSource(), 
upgradedSegmentIds)
+      ).getUpgradedToSegmentIds();
+    }
+    catch (Exception e) {
+      LOG.warn(
+          e,
+          "Could not retrieve UpgradedToSegmentsResponse using task 
action[retrieveUpgradedToSegmentIds]."
+          + " Overlord may be on an older version."
+      );
+      return unusedSegments;
+    }
+
+    final Set<String> existingSegmentIds
+        = taskActionClient.submit(new 
RetrieveSegmentsByIdAction(getDataSource(), upgradedSegmentIds))
+                          .stream()
+                          .map(DataSegment::getId)
+                          .map(SegmentId::toString)
+                          .collect(Collectors.toSet());
+
+    List<DataSegment> unreferencedSegments = new ArrayList<>();
+    for (DataSegment segment : unusedSegments) {
+      final String id = segment.getId().toString();
+
+      // If the segment still exists for some reason, do not kill it
+      if (existingSegmentIds.contains(id)) {
+        break;
+      }
+
+      // If the segment is the parent of existing segments, do not kill it
+      if (upgradedToSegmentIds.containsKey(id)) {

Review Comment:
   `upgradedToSegmentIds` can be `null` here.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -300,6 +356,81 @@ private NavigableMap<DateTime, List<TaskLock>> 
getNonRevokedTaskLockMap(TaskActi
     return taskLockMap;
   }
 
+  private List<DataSegment> findUnreferencedSegments(
+      List<DataSegment> unusedSegments,
+      Map<String, String> upgradedFromSegmentIds,
+      TaskActionClient taskActionClient
+  ) throws IOException
+  {
+    if (unusedSegments.isEmpty() || upgradedFromSegmentIds.isEmpty()) {
+      return unusedSegments;
+    }
+
+    final Set<String> upgradedSegmentIds = new HashSet<>();
+    for (DataSegment segment : unusedSegments) {
+      final String id = segment.getId().toString();
+      upgradedSegmentIds.add(id);
+      if (upgradedFromSegmentIds.containsKey(id)) {
+        upgradedSegmentIds.add(upgradedFromSegmentIds.get(id));
+      }

Review Comment:
   ```suggestion
         if (upgradedFromSegmentIds.containsKey(id)) {
           upgradedSegmentIds.add(upgradedFromSegmentIds.get(id));
         } else {
           // Check for the segment ID itself if it was not upgraded from some 
other segment
           upgradedSegmentIds.add(id);
         }
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -231,29 +252,64 @@ public TaskStatus runTask(TaskToolbox toolbox) throws 
Exception
         );
       }
 
-      // Kill segments
-      // Order is important here: we want the nuke action to clean up the 
metadata records _before_ the
-      // segments are removed from storage, this helps maintain that we will 
always have a storage segment if
-      // the metadata segment is present. If the segment nuke throws an 
exception, then the segment cleanup is
-      // abandoned.
+      // Kill segments. Order is important here:
+      // Retrieve the segment upgrade infos for the batch _before_ the 
segments are nuked
+      // We then want the nuke action to clean up the metadata records 
_before_ the segments are removed from storage.
+      // This helps maintain that we will always have a storage segment if the 
metadata segment is present.
+      // Determine the subset of segments to be killed from deep storage based 
on loadspecs.
+      // If the segment nuke throws an exception, then the segment cleanup is 
abandoned.
+
+      // Determine upgraded segment ids before nuking
+      final Set<String> segmentIds = unusedSegments.stream()
+                                                   .map(DataSegment::getId)
+                                                   .map(SegmentId::toString)
+                                                   
.collect(Collectors.toSet());
+      final Map<String, String> upgradedFromSegmentIds = new HashMap<>();
+      try {
+        upgradedFromSegmentIds.putAll(
+            taskActionClient.submit(
+                new RetrieveUpgradedFromSegmentIdsAction(getDataSource(), 
segmentIds)
+            ).getUpgradedFromSegmentIds()
+        );
+      }
+      catch (Exception e) {
+        LOG.warn(
+            e,
+            "Could not retrieve UpgradedFromSegmentsResponse using task 
action[retrieveUpgradedFromSegmentIds]."

Review Comment:
   ```suggestion
               "Could not retrieve parent segment IDs using task 
action[retrieveUpgradedFromSegmentIds]."
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedFromSegmentsResponse.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Map;
+
+/**
+ * Response for the RetrieveUpgradedFromSegmentIds task action
+ */

Review Comment:
   Nit: not really needed.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -300,6 +356,81 @@ private NavigableMap<DateTime, List<TaskLock>> 
getNonRevokedTaskLockMap(TaskActi
     return taskLockMap;
   }
 
+  private List<DataSegment> findUnreferencedSegments(
+      List<DataSegment> unusedSegments,
+      Map<String, String> upgradedFromSegmentIds,
+      TaskActionClient taskActionClient
+  ) throws IOException
+  {
+    if (unusedSegments.isEmpty() || upgradedFromSegmentIds.isEmpty()) {
+      return unusedSegments;
+    }
+
+    final Set<String> upgradedSegmentIds = new HashSet<>();
+    for (DataSegment segment : unusedSegments) {
+      final String id = segment.getId().toString();
+      upgradedSegmentIds.add(id);
+      if (upgradedFromSegmentIds.containsKey(id)) {
+        upgradedSegmentIds.add(upgradedFromSegmentIds.get(id));
+      }
+    }
+
+    final Map<String, Set<String>> upgradedToSegmentIds;
+    try {
+      upgradedToSegmentIds = taskActionClient.submit(
+          new RetrieveUpgradedToSegmentIdsAction(getDataSource(), 
upgradedSegmentIds)
+      ).getUpgradedToSegmentIds();
+    }
+    catch (Exception e) {
+      LOG.warn(
+          e,
+          "Could not retrieve UpgradedToSegmentsResponse using task 
action[retrieveUpgradedToSegmentIds]."
+          + " Overlord may be on an older version."
+      );
+      return unusedSegments;
+    }
+
+    final Set<String> existingSegmentIds
+        = taskActionClient.submit(new 
RetrieveSegmentsByIdAction(getDataSource(), upgradedSegmentIds))

Review Comment:
   Why is this action needed?
   Doesn't the new `UpgradedToSegmentIdsResponse` already cover this case?
   
   See the comment here:
   https://github.com/apache/druid/pull/16667#issuecomment-2205060939



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -300,6 +356,81 @@ private NavigableMap<DateTime, List<TaskLock>> 
getNonRevokedTaskLockMap(TaskActi
     return taskLockMap;
   }
 
+  private List<DataSegment> findUnreferencedSegments(
+      List<DataSegment> unusedSegments,
+      Map<String, String> upgradedFromSegmentIds,
+      TaskActionClient taskActionClient
+  ) throws IOException
+  {
+    if (unusedSegments.isEmpty() || upgradedFromSegmentIds.isEmpty()) {
+      return unusedSegments;
+    }
+
+    final Set<String> upgradedSegmentIds = new HashSet<>();
+    for (DataSegment segment : unusedSegments) {
+      final String id = segment.getId().toString();
+      upgradedSegmentIds.add(id);
+      if (upgradedFromSegmentIds.containsKey(id)) {
+        upgradedSegmentIds.add(upgradedFromSegmentIds.get(id));
+      }
+    }
+
+    final Map<String, Set<String>> upgradedToSegmentIds;
+    try {
+      upgradedToSegmentIds = taskActionClient.submit(
+          new RetrieveUpgradedToSegmentIdsAction(getDataSource(), 
upgradedSegmentIds)
+      ).getUpgradedToSegmentIds();
+    }
+    catch (Exception e) {
+      LOG.warn(
+          e,
+          "Could not retrieve UpgradedToSegmentsResponse using task 
action[retrieveUpgradedToSegmentIds]."
+          + " Overlord may be on an older version."
+      );
+      return unusedSegments;
+    }
+
+    final Set<String> existingSegmentIds
+        = taskActionClient.submit(new 
RetrieveSegmentsByIdAction(getDataSource(), upgradedSegmentIds))
+                          .stream()
+                          .map(DataSegment::getId)
+                          .map(SegmentId::toString)
+                          .collect(Collectors.toSet());
+
+    List<DataSegment> unreferencedSegments = new ArrayList<>();
+    for (DataSegment segment : unusedSegments) {
+      final String id = segment.getId().toString();
+
+      // If the segment still exists for some reason, do not kill it
+      if (existingSegmentIds.contains(id)) {
+        break;

Review Comment:
   did you want to perform `break` here or `continue`?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -300,6 +356,81 @@ private NavigableMap<DateTime, List<TaskLock>> 
getNonRevokedTaskLockMap(TaskActi
     return taskLockMap;
   }
 
+  private List<DataSegment> findUnreferencedSegments(

Review Comment:
   ```suggestion
     private List<DataSegment> getKillableSegments(
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -300,6 +356,81 @@ private NavigableMap<DateTime, List<TaskLock>> 
getNonRevokedTaskLockMap(TaskActi
     return taskLockMap;
   }
 
+  private List<DataSegment> findUnreferencedSegments(
+      List<DataSegment> unusedSegments,
+      Map<String, String> upgradedFromSegmentIds,
+      TaskActionClient taskActionClient
+  ) throws IOException
+  {
+    if (unusedSegments.isEmpty() || upgradedFromSegmentIds.isEmpty()) {
+      return unusedSegments;
+    }
+
+    final Set<String> upgradedSegmentIds = new HashSet<>();
+    for (DataSegment segment : unusedSegments) {
+      final String id = segment.getId().toString();
+      upgradedSegmentIds.add(id);
+      if (upgradedFromSegmentIds.containsKey(id)) {
+        upgradedSegmentIds.add(upgradedFromSegmentIds.get(id));
+      }
+    }
+
+    final Map<String, Set<String>> upgradedToSegmentIds;
+    try {
+      upgradedToSegmentIds = taskActionClient.submit(
+          new RetrieveUpgradedToSegmentIdsAction(getDataSource(), 
upgradedSegmentIds)
+      ).getUpgradedToSegmentIds();
+    }
+    catch (Exception e) {
+      LOG.warn(
+          e,
+          "Could not retrieve UpgradedToSegmentsResponse using task 
action[retrieveUpgradedToSegmentIds]."
+          + " Overlord may be on an older version."
+      );
+      return unusedSegments;
+    }
+
+    final Set<String> existingSegmentIds
+        = taskActionClient.submit(new 
RetrieveSegmentsByIdAction(getDataSource(), upgradedSegmentIds))
+                          .stream()
+                          .map(DataSegment::getId)
+                          .map(SegmentId::toString)
+                          .collect(Collectors.toSet());
+
+    List<DataSegment> unreferencedSegments = new ArrayList<>();
+    for (DataSegment segment : unusedSegments) {

Review Comment:
   This loop and the whole method in general has become too complicated.
   
   The method needs to do the following instead:
   
   ```java
   private Set<DataSegment> findKillableSegments(...)
   {
   
   // Determine parentId for each unused segment
   final Map<String, DataSegment> parentIdToUnusedSegment = new HashMap<>();
   for (DataSegment segment : unusedSegments) {
       final String segmentId = segment.getId().toString();
       if (upgradedFromSegmentIds.containsKey(segmentId) {
           parentIdToUnusedSegment.put(upgradedFromSegmentIds.get(segmentId), 
segment);
       } else {
           parentIdToUnusedSegment.put(segmentId, segment);
       }
   }
   
   // Check if the parent or any of its children exist in metadata store
   try {
       UpgradedToSegmentIdsResponse response = taskActionClient.submit(
             new RetrieveUpgradedToSegmentIdsAction(getDataSource(), 
parentIdToUnusedSegment.keySet())
       ).getUpgradedToSegmentIds();
       if (response != null && response.getUpgradedToSegmentIds() != null) {
           response.getUpgradedToSegmentIds().forEach((parent, children) -> {
                if (!CollectionUtils.isNullOrEmpty(children)) {
                    // Do not kill segment if its parent or any of its siblings 
still exist in metadata store
                    parentIdToUnusedSegment.remove(parent);
                }
           });
       }
   }
   catch (Exception e) {
         LOG.warn(
             e,
             "Could not retrieve UpgradedToSegmentsResponse using task 
action[retrieveUpgradedToSegmentIds]."
             + " Overlord may be on an older version."
   );
   
   // TODO: filter out based on used segment load specs
   
   return parentIdToUnusedSegment.values();
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to