xiangfu0 commented on code in PR #18678:
URL: https://github.com/apache/pinot/pull/18678#discussion_r3373073797


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java:
##########
@@ -187,86 +187,110 @@ public void run() {
 
   protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String 
tableName, Collection<String> segmentIds,
       Long deletedSegmentsRetentionMs, long deletionDelay) {
-    // Check if segment got removed from ExternalView or IdealState
+    List<String> segmentsToDelete = filterSegmentsToDelete(tableName, 
segmentIds);
+    if (segmentsToDelete == null) {
+      // ExternalView or IdealState was unavailable; skip the whole batch
+      return;
+    }
+
+    Set<String> deletedSegments = new HashSet<>();
+    if (!segmentsToDelete.isEmpty()) {
+      // Capture segment time ranges before ZK metadata is removed (for MV 
dirty marking)
+      notifyMaterializedViewConsistencyManager(tableName, segmentsToDelete);
+
+      // Notify all active listeners here
+      PinotSegmentLifecycleEventListenerManager.getInstance()
+          .notifyListeners(new SegmentDeletionEventDetails(tableName, 
segmentsToDelete));
+
+      deletedSegments = deleteSegmentsFromPropertyStore(tableName, 
segmentsToDelete);
+
+      // Best effort remove segments from deep store.
+      // If this fails (e.g. controller crashes, deep store unavailable), 
future runs of RetentionManager
+      // will attempt to delete orphan deep store entries. Check 
getSegmentsToDeleteFromDeepstore()
+      removeSegmentsFromStoreInBatch(tableName, deletedSegments, 
deletedSegmentsRetentionMs);
+    }
+
+    Set<String> segmentsToRetryLater = new HashSet<>(segmentIds);
+    segmentsToRetryLater.removeAll(deletedSegments);
+
+    LOGGER.info("Deleted {} segments from table {}:{}", 
deletedSegments.size(), tableName,
+        deletedSegments.size() <= 5 ? deletedSegments : "");
+
+    if (!segmentsToRetryLater.isEmpty()) {
+      rescheduleRetry(tableName, segmentsToRetryLater, 
deletedSegmentsRetentionMs, deletionDelay);
+    }
+  }
+
+  /// Check if segment got removed from ExternalView or IdealState
+  /// Returns `null` when the ExternalView or IdealState is unavailable
+  protected List<String> filterSegmentsToDelete(String tableName, 
Collection<String> segmentIds) {
     ExternalView externalView = 
_helixAdmin.getResourceExternalView(_helixClusterName, tableName);
     IdealState idealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, tableName);
     if (externalView == null || idealState == null) {
       LOGGER.warn("Resource: {} is not set up in idealState or ExternalView, 
won't do anything", tableName);
-      return;
+      return null;
     }
 
-    List<String> segmentsToDelete = new ArrayList<>(segmentIds.size()); // Has 
the segments that will be deleted
-    Set<String> segmentsToRetryLater = new HashSet<>(segmentIds.size());  // 
List of segments that we need to retry
-
+    List<String> segmentsToDelete = new ArrayList<>(segmentIds.size());
     try {
       for (String segmentId : segmentIds) {
         Map<String, String> segmentToInstancesMapFromExternalView = 
externalView.getStateMap(segmentId);
         Map<String, String> segmentToInstancesMapFromIdealStates = 
idealState.getInstanceStateMap(segmentId);
         if ((segmentToInstancesMapFromExternalView == null || 
segmentToInstancesMapFromExternalView.isEmpty()) && (
             segmentToInstancesMapFromIdealStates == null || 
segmentToInstancesMapFromIdealStates.isEmpty())) {
           segmentsToDelete.add(segmentId);
-        } else {
-          segmentsToRetryLater.add(segmentId);
         }
       }
     } catch (Exception e) {
       LOGGER.warn("Caught exception while checking helix states for table: 
{}", tableName, e);
       segmentsToDelete.clear();
       segmentsToDelete.addAll(segmentIds);
-      segmentsToRetryLater.clear();
     }
+    return segmentsToDelete;
+  }
 
-    if (!segmentsToDelete.isEmpty()) {
-      List<String> propStorePathList = new 
ArrayList<>(segmentsToDelete.size());
-      for (String segmentId : segmentsToDelete) {
-        String segmentPropertyStorePath = 
ZKMetadataProvider.constructPropertyStorePathForSegment(tableName, segmentId);
-        propStorePathList.add(segmentPropertyStorePath);
-      }
-
-      // Capture segment time ranges before ZK metadata is removed (for MV 
dirty marking)
-      notifyMaterializedViewConsistencyManager(tableName, segmentsToDelete);
-
-      // Notify all active listeners here
-      PinotSegmentLifecycleEventListenerManager.getInstance()
-          .notifyListeners(new SegmentDeletionEventDetails(tableName, 
segmentsToDelete));
+  /// Removes the property-store znodes for the given segments
+  /// Returns the set of segments that were successfully deleted.
+  protected Set<String> deleteSegmentsFromPropertyStore(String tableName, 
List<String> segmentsToDelete) {
+    Set<String> deletedSegments = new HashSet<>(segmentsToDelete.size());
+    List<String> propStorePathList = new ArrayList<>(segmentsToDelete.size());
+    for (String segmentId : segmentsToDelete) {
+      String segmentPropertyStorePath = 
ZKMetadataProvider.constructPropertyStorePathForSegment(tableName, segmentId);
+      propStorePathList.add(segmentPropertyStorePath);
+    }
 
-      boolean[] deleteSuccessful = _propertyStore.remove(propStorePathList, 
AccessOption.PERSISTENT);
-      List<String> propStoreFailedSegs = new 
ArrayList<>(segmentsToDelete.size());
-      for (int i = 0; i < deleteSuccessful.length; i++) {
-        final String segmentId = segmentsToDelete.get(i);
-        if (!deleteSuccessful[i]) {
-          // The batch remove API takes a non-recursive ZK path: it cannot 
delete a znode that has
-          // accumulated children. Fall back to the single-path remove API, 
which falls back to a
-          // recursive delete on the same NotEmpty failure. Skip when the 
znode is already gone
-          // (the batch call may have failed simply because the entry did not 
exist).
-          String segmentPath = propStorePathList.get(i);
-          if (_propertyStore.exists(segmentPath, AccessOption.PERSISTENT)
-              && !_propertyStore.remove(segmentPath, AccessOption.PERSISTENT)) 
{
-            LOGGER.info("Could not delete {} from propertystore", segmentPath);
-            segmentsToRetryLater.add(segmentId);
-            propStoreFailedSegs.add(segmentId);
-          }
+    boolean[] deleteSuccessful = _propertyStore.remove(propStorePathList, 
AccessOption.PERSISTENT);
+    for (int i = 0; i < deleteSuccessful.length; i++) {
+      final String segmentId = segmentsToDelete.get(i);
+      if (deleteSuccessful[i]) {
+        deletedSegments.add(segmentId);
+      } else {
+        // The batch remove API takes a non-recursive ZK path: it cannot 
delete a znode that has
+        // accumulated children. Fall back to the single-path remove API, 
which falls back to a
+        // recursive delete on the same NotEmpty failure. Skip when the znode 
is already gone
+        // (the batch call may have failed simply because the entry did not 
exist).
+        String segmentPath = propStorePathList.get(i);
+        if (_propertyStore.exists(segmentPath, AccessOption.PERSISTENT)
+            && !_propertyStore.remove(segmentPath, AccessOption.PERSISTENT)) {
+          LOGGER.info("Could not delete {} from propertystore", segmentPath);
+        } else {
+          deletedSegments.add(segmentId);
         }
       }
-      segmentsToDelete.removeAll(propStoreFailedSegs);
-
-      // TODO: If removing segments from deep store fails (e.g. controller 
crashes, deep store unavailable), these
-      //       segments will become orphans and not easy to track because 
their ZK metadata are already deleted.
-      //       Consider removing segments from deep store before cleaning up 
the ZK metadata.
-      removeSegmentsFromStoreInBatch(tableName, segmentsToDelete, 
deletedSegmentsRetentionMs);
     }
+    return deletedSegments;
+  }
 
-    LOGGER.info("Deleted {} segments from table {}:{}", 
segmentsToDelete.size(), tableName,
-        segmentsToDelete.size() <= 5 ? segmentsToDelete : "");
-
-    if (!segmentsToRetryLater.isEmpty()) {
-      long effectiveDeletionDelay = Math.min(deletionDelay * 2, 
MAX_DELETION_DELAY_SECONDS);
-      LOGGER.info("Postponing deletion of {} segments from table {}", 
segmentsToRetryLater.size(), tableName);
-      deleteSegmentsWithDelay(tableName, segmentsToRetryLater, 
deletedSegmentsRetentionMs, effectiveDeletionDelay);
-    }
+  /// Reschedules the segments that could not be deleted this pass, applying 
the exponential back-off
+  /// (capped at [#MAX_DELETION_DELAY_SECONDS]). No-op when there is nothing 
to retry.
+  protected void rescheduleRetry(String tableName, Collection<String> 
segmentsToRetryLater,
+      Long deletedSegmentsRetentionMs, long deletionDelay) {
+    long effectiveDeletionDelay = Math.min(deletionDelay * 2, 
MAX_DELETION_DELAY_SECONDS);
+    LOGGER.info("Postponing deletion of {} segments from table {}", 
segmentsToRetryLater.size(), tableName);
+    deleteSegmentsWithDelay(tableName, segmentsToRetryLater, 
deletedSegmentsRetentionMs, effectiveDeletionDelay);
   }
 
-  public void removeSegmentsFromStoreInBatch(String tableNameWithType, 
List<String> segments,
+  public void removeSegmentsFromStoreInBatch(String tableNameWithType, 
Collection<String> segments,

Review Comment:
   Changing this public override signature from `List<String>` to 
`Collection<String>` is not a pure refactor: existing `SegmentDeletionManager` 
subclasses compiled against the old method will silently stop overriding after 
upgrade, because `deleteSegmentFromPropertyStoreAndLocal()` now dispatches to 
the new descriptor. That can bypass custom deep-store deletion logic in 
downstream controllers. Please preserve the old signature as a delegating 
overload or otherwise keep the existing override contract.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to