abhishekrb19 commented on code in PR #16667:
URL: https://github.com/apache/druid/pull/16667#discussion_r1659493204


##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2923,6 +2956,130 @@ public int deleteUpgradeSegmentsForTask(final String 
taskId)
     );
   }
 
+  @Override
+  public Set<DataSegment> findSegmentsWithUnreferencedLoadSpecs(final 
Set<DataSegment> segments)
+  {
+    if (segments.isEmpty()) {
+      return segments;
+    }
+    final String dataSource = segments.stream()
+                                      .findFirst()
+                                      .get()
+                                      .getDataSource();
+    final List<String> segmentIds = segments.stream()
+                                            .map(DataSegment::getId)
+                                            .map(SegmentId::toString)
+                                            .collect(Collectors.toList());
+    Map<String, String> idToRootMap = new 
HashMap<>(getRootSegmentIds(dataSource, segmentIds));
+    final Set<String> rootSegmentIds = new HashSet<>(segmentIds);
+    rootSegmentIds.addAll(idToRootMap.values());
+    idToRootMap.putAll(getSegmentsWithRootSegmentIds(dataSource, new 
ArrayList<>(rootSegmentIds)));
+    final Map<String, Set<String>> rootToIdsMap = new HashMap<>();
+    for (Map.Entry<String, String> idAndRoot : idToRootMap.entrySet()) {
+      rootToIdsMap.computeIfAbsent(idAndRoot.getValue(), root -> new 
HashSet<>())
+                  .add(idAndRoot.getKey());
+    }
+    final Set<DataSegment> segmentsWithUnreferencedLoadSpecs = new HashSet<>();
+    final Set<String> existingSegmentIds = retrieveSegmentsById(dataSource, 
rootSegmentIds).stream()
+                                                                               
            .map(DataSegment::getId)
+                                                                               
            .map(SegmentId::toString)
+                                                                               
            .collect(Collectors.toSet());
+    for (DataSegment segment : segments) {
+      final String id = segment.getId().toString();
+      final Set<String> conflicts = new HashSet<>();
+      if (rootToIdsMap.containsKey(id)) {
+        // Add children
+        conflicts.addAll(rootToIdsMap.get(id));
+      }
+      final String parent = idToRootMap.get(id);
+      if (parent != null) {
+        // add parent if it exists
+        if (existingSegmentIds.contains(parent)) {
+          conflicts.add(parent);
+        }
+        // add siblings
+        if (rootToIdsMap.containsKey(parent)) {
+          conflicts.addAll(rootToIdsMap.get(parent));
+        }
+      }
+      // Remove segments being deleted
+      segmentIds.forEach(conflicts::remove);
+      if (conflicts.isEmpty()) {
+        segmentsWithUnreferencedLoadSpecs.add(segment);
+      }
+    }
+    return segmentsWithUnreferencedLoadSpecs;
+  }
+
+  @VisibleForTesting
+  Map<String, String> getRootSegmentIds(final String dataSource, final 
List<String> segmentIds)

Review Comment:
   A javadoc here will be nice



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2923,6 +2956,130 @@ public int deleteUpgradeSegmentsForTask(final String 
taskId)
     );
   }
 
+  @Override
+  public Set<DataSegment> findSegmentsWithUnreferencedLoadSpecs(final 
Set<DataSegment> segments)
+  {
+    if (segments.isEmpty()) {
+      return segments;
+    }
+    final String dataSource = segments.stream()

Review Comment:
   Since we don't validate that segments don't belong to different data sources 
(like some methods do), we can at least call that out in the interface method
   
   



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -587,6 +587,11 @@ protected void alterSegmentTable()
     Map<String, String> columnNameTypes = new HashMap<>();
     columnNameTypes.put("used_status_last_updated", "VARCHAR(255)");
 
+    // root_segment_id is the segment id is the first / root segment to which 
the same load spec originally belonged

Review Comment:
   typo here



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java:
##########
@@ -54,26 +56,33 @@ public Set<DataSegment> getSegments()
   }
 
   @Override
-  public TypeReference<Void> getReturnTypeReference()
+  public TypeReference<Set<DataSegment>> getReturnTypeReference()
   {
-    return new TypeReference<Void>()
+    return new TypeReference<Set<DataSegment>>()
     {
     };
   }
 
   @Override
-  public Void perform(Task task, TaskActionToolbox toolbox)
+  public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox)
   {
     TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), 
segments);
-
+    final Set<DataSegment> segmentsToKill = new HashSet<>();
     try {
       toolbox.getTaskLockbox().doInCriticalSection(
           task,
           
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
           CriticalAction.builder()
                         .onValidLocks(
                             () -> {
-                              
toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments);
+                              final IndexerMetadataStorageCoordinator 
coordinator
+                                  = 
toolbox.getIndexerMetadataStorageCoordinator();
+
+                              // Find subset of segments with unreferenced 
load specs assuming successful nuking
+                              
segmentsToKill.addAll(coordinator.findSegmentsWithUnreferencedLoadSpecs(segments));
+

Review Comment:
   Design wise, it seems strange that the nuke task action is doing this. 
Shouldn't the kill task be responsible for determining this set or perhaps a 
separate task action?
   
   If there's a solid rationale for doing this, can you please update the 
javadocs so someone reading the code isn't caught by surprise.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -237,14 +237,26 @@ public TaskStatus runTask(TaskToolbox toolbox) throws 
Exception
       // the metadata segment is present. If the segment nuke throws an 
exception, then the segment cleanup is
       // abandoned.
 
-      toolbox.getTaskActionClient().submit(new SegmentNukeAction(new 
HashSet<>(unusedSegments)));
-
-      // Kill segments from the deep storage only if their load specs are not 
being used by any used segments
-      final List<DataSegment> segmentsToBeKilled = unusedSegments
-          .stream()
-          .filter(unusedSegment -> unusedSegment.getLoadSpec() == null
-                                   || 
!usedSegmentLoadSpecs.contains(unusedSegment.getLoadSpec()))
-          .collect(Collectors.toList());
+      final List<DataSegment> segmentsToBeKilled;
+
+      Set<DataSegment> segmentsWithUnreferencedLoadSpecs
+          = toolbox.getTaskActionClient().submit(new SegmentNukeAction(new 
HashSet<>(unusedSegments)));

Review Comment:
   Please see my comment above.  I would suggest adding a new task action for 
this instead of piggybacking on the functionality of `SegmentNukeAction`, which 
should only nuke segments IMO.



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1348,6 +1353,21 @@ public int hashCode()
     }
   }
 
+  private static void bindColumnValuesToQueryWithInCondition(

Review Comment:
   This looks identical to the private static method: 
`SqlSegmentsMetadataQuery#bindColumnValuesToQueryWithInCondition`. Can we reuse 
that method by making it package-private and removing this duplicate method?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java:
##########
@@ -54,26 +56,33 @@ public Set<DataSegment> getSegments()
   }
 
   @Override
-  public TypeReference<Void> getReturnTypeReference()
+  public TypeReference<Set<DataSegment>> getReturnTypeReference()
   {
-    return new TypeReference<Void>()
+    return new TypeReference<Set<DataSegment>>()
     {
     };
   }
 
   @Override
-  public Void perform(Task task, TaskActionToolbox toolbox)
+  public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox)

Review Comment:
   Also, we are changing the semantics of an existing task action? Will this be 
a breaking change?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java:
##########
@@ -54,26 +56,33 @@ public Set<DataSegment> getSegments()
   }
 
   @Override
-  public TypeReference<Void> getReturnTypeReference()
+  public TypeReference<Set<DataSegment>> getReturnTypeReference()
   {
-    return new TypeReference<Void>()
+    return new TypeReference<Set<DataSegment>>()
     {
     };
   }
 
   @Override
-  public Void perform(Task task, TaskActionToolbox toolbox)
+  public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox)
   {
     TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), 
segments);
-
+    final Set<DataSegment> segmentsToKill = new HashSet<>();
     try {
       toolbox.getTaskLockbox().doInCriticalSection(
           task,
           
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
           CriticalAction.builder()
                         .onValidLocks(
                             () -> {
-                              
toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments);
+                              final IndexerMetadataStorageCoordinator 
coordinator
+                                  = 
toolbox.getIndexerMetadataStorageCoordinator();
+
+                              // Find subset of segments with unreferenced 
load specs assuming successful nuking
+                              
segmentsToKill.addAll(coordinator.findSegmentsWithUnreferencedLoadSpecs(segments));
+

Review Comment:
   Design wise, it seems strange that the nuke task action is doing this. 
Shouldn't the kill task be responsible for determining this set or perhaps a 
separate task action?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to