xiangfu0 commented on code in PR #18678:
URL: https://github.com/apache/pinot/pull/18678#discussion_r3373073797
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java:
##########
@@ -187,86 +187,110 @@ public void run() {
protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String
tableName, Collection<String> segmentIds,
Long deletedSegmentsRetentionMs, long deletionDelay) {
- // Check if segment got removed from ExternalView or IdealState
+ List<String> segmentsToDelete = filterSegmentsToDelete(tableName,
segmentIds);
+ if (segmentsToDelete == null) {
+ // ExternalView or IdealState was unavailable; skip the whole batch
+ return;
+ }
+
+ Set<String> deletedSegments = new HashSet<>();
+ if (!segmentsToDelete.isEmpty()) {
+ // Capture segment time ranges before ZK metadata is removed (for MV
dirty marking)
+ notifyMaterializedViewConsistencyManager(tableName, segmentsToDelete);
+
+ // Notify all active listeners here
+ PinotSegmentLifecycleEventListenerManager.getInstance()
+ .notifyListeners(new SegmentDeletionEventDetails(tableName,
segmentsToDelete));
+
+ deletedSegments = deleteSegmentsFromPropertyStore(tableName,
segmentsToDelete);
+
+ // Best effort remove segments from deep store.
+ // If this fails (e.g. controller crashes, deep store unavailable),
future runs of RetentionManager
+ // will attempt to delete orphan deep store entries. Check
getSegmentsToDeleteFromDeepstore()
+ removeSegmentsFromStoreInBatch(tableName, deletedSegments,
deletedSegmentsRetentionMs);
+ }
+
+ Set<String> segmentsToRetryLater = new HashSet<>(segmentIds);
+ segmentsToRetryLater.removeAll(deletedSegments);
+
+ LOGGER.info("Deleted {} segments from table {}:{}",
deletedSegments.size(), tableName,
+ deletedSegments.size() <= 5 ? deletedSegments : "");
+
+ if (!segmentsToRetryLater.isEmpty()) {
+ rescheduleRetry(tableName, segmentsToRetryLater,
deletedSegmentsRetentionMs, deletionDelay);
+ }
+ }
+
+ /// Check if segment got removed from ExternalView or IdealState
+ /// Returns `null` when the ExternalView or IdealState is unavailable
+ protected List<String> filterSegmentsToDelete(String tableName,
Collection<String> segmentIds) {
ExternalView externalView =
_helixAdmin.getResourceExternalView(_helixClusterName, tableName);
IdealState idealState =
_helixAdmin.getResourceIdealState(_helixClusterName, tableName);
if (externalView == null || idealState == null) {
LOGGER.warn("Resource: {} is not set up in idealState or ExternalView,
won't do anything", tableName);
- return;
+ return null;
}
- List<String> segmentsToDelete = new ArrayList<>(segmentIds.size()); // Has
the segments that will be deleted
- Set<String> segmentsToRetryLater = new HashSet<>(segmentIds.size()); //
List of segments that we need to retry
-
+ List<String> segmentsToDelete = new ArrayList<>(segmentIds.size());
try {
for (String segmentId : segmentIds) {
Map<String, String> segmentToInstancesMapFromExternalView =
externalView.getStateMap(segmentId);
Map<String, String> segmentToInstancesMapFromIdealStates =
idealState.getInstanceStateMap(segmentId);
if ((segmentToInstancesMapFromExternalView == null ||
segmentToInstancesMapFromExternalView.isEmpty()) && (
segmentToInstancesMapFromIdealStates == null ||
segmentToInstancesMapFromIdealStates.isEmpty())) {
segmentsToDelete.add(segmentId);
- } else {
- segmentsToRetryLater.add(segmentId);
}
}
} catch (Exception e) {
LOGGER.warn("Caught exception while checking helix states for table:
{}", tableName, e);
segmentsToDelete.clear();
segmentsToDelete.addAll(segmentIds);
- segmentsToRetryLater.clear();
}
+ return segmentsToDelete;
+ }
- if (!segmentsToDelete.isEmpty()) {
- List<String> propStorePathList = new
ArrayList<>(segmentsToDelete.size());
- for (String segmentId : segmentsToDelete) {
- String segmentPropertyStorePath =
ZKMetadataProvider.constructPropertyStorePathForSegment(tableName, segmentId);
- propStorePathList.add(segmentPropertyStorePath);
- }
-
- // Capture segment time ranges before ZK metadata is removed (for MV
dirty marking)
- notifyMaterializedViewConsistencyManager(tableName, segmentsToDelete);
-
- // Notify all active listeners here
- PinotSegmentLifecycleEventListenerManager.getInstance()
- .notifyListeners(new SegmentDeletionEventDetails(tableName,
segmentsToDelete));
+ /// Removes the property-store znodes for the given segments
+ /// Returns the set of segments that were successfully deleted.
+ protected Set<String> deleteSegmentsFromPropertyStore(String tableName,
List<String> segmentsToDelete) {
+ Set<String> deletedSegments = new HashSet<>(segmentsToDelete.size());
+ List<String> propStorePathList = new ArrayList<>(segmentsToDelete.size());
+ for (String segmentId : segmentsToDelete) {
+ String segmentPropertyStorePath =
ZKMetadataProvider.constructPropertyStorePathForSegment(tableName, segmentId);
+ propStorePathList.add(segmentPropertyStorePath);
+ }
- boolean[] deleteSuccessful = _propertyStore.remove(propStorePathList,
AccessOption.PERSISTENT);
- List<String> propStoreFailedSegs = new
ArrayList<>(segmentsToDelete.size());
- for (int i = 0; i < deleteSuccessful.length; i++) {
- final String segmentId = segmentsToDelete.get(i);
- if (!deleteSuccessful[i]) {
- // The batch remove API takes a non-recursive ZK path: it cannot
delete a znode that has
- // accumulated children. Fall back to the single-path remove API,
which falls back to a
- // recursive delete on the same NotEmpty failure. Skip when the
znode is already gone
- // (the batch call may have failed simply because the entry did not
exist).
- String segmentPath = propStorePathList.get(i);
- if (_propertyStore.exists(segmentPath, AccessOption.PERSISTENT)
- && !_propertyStore.remove(segmentPath, AccessOption.PERSISTENT))
{
- LOGGER.info("Could not delete {} from propertystore", segmentPath);
- segmentsToRetryLater.add(segmentId);
- propStoreFailedSegs.add(segmentId);
- }
+ boolean[] deleteSuccessful = _propertyStore.remove(propStorePathList,
AccessOption.PERSISTENT);
+ for (int i = 0; i < deleteSuccessful.length; i++) {
+ final String segmentId = segmentsToDelete.get(i);
+ if (deleteSuccessful[i]) {
+ deletedSegments.add(segmentId);
+ } else {
+ // The batch remove API takes a non-recursive ZK path: it cannot
delete a znode that has
+ // accumulated children. Fall back to the single-path remove API,
which falls back to a
+ // recursive delete on the same NotEmpty failure. Skip when the znode
is already gone
+ // (the batch call may have failed simply because the entry did not
exist).
+ String segmentPath = propStorePathList.get(i);
+ if (_propertyStore.exists(segmentPath, AccessOption.PERSISTENT)
+ && !_propertyStore.remove(segmentPath, AccessOption.PERSISTENT)) {
+ LOGGER.info("Could not delete {} from propertystore", segmentPath);
+ } else {
+ deletedSegments.add(segmentId);
}
}
- segmentsToDelete.removeAll(propStoreFailedSegs);
-
- // TODO: If removing segments from deep store fails (e.g. controller
crashes, deep store unavailable), these
- // segments will become orphans and not easy to track because
their ZK metadata are already deleted.
- // Consider removing segments from deep store before cleaning up
the ZK metadata.
- removeSegmentsFromStoreInBatch(tableName, segmentsToDelete,
deletedSegmentsRetentionMs);
}
+ return deletedSegments;
+ }
- LOGGER.info("Deleted {} segments from table {}:{}",
segmentsToDelete.size(), tableName,
- segmentsToDelete.size() <= 5 ? segmentsToDelete : "");
-
- if (!segmentsToRetryLater.isEmpty()) {
- long effectiveDeletionDelay = Math.min(deletionDelay * 2,
MAX_DELETION_DELAY_SECONDS);
- LOGGER.info("Postponing deletion of {} segments from table {}",
segmentsToRetryLater.size(), tableName);
- deleteSegmentsWithDelay(tableName, segmentsToRetryLater,
deletedSegmentsRetentionMs, effectiveDeletionDelay);
- }
+ /// Reschedules the segments that could not be deleted this pass, applying
the exponential back-off
+ /// (capped at [#MAX_DELETION_DELAY_SECONDS]). No-op when there is nothing
to retry.
+ protected void rescheduleRetry(String tableName, Collection<String>
segmentsToRetryLater,
+ Long deletedSegmentsRetentionMs, long deletionDelay) {
+ long effectiveDeletionDelay = Math.min(deletionDelay * 2,
MAX_DELETION_DELAY_SECONDS);
+ LOGGER.info("Postponing deletion of {} segments from table {}",
segmentsToRetryLater.size(), tableName);
+ deleteSegmentsWithDelay(tableName, segmentsToRetryLater,
deletedSegmentsRetentionMs, effectiveDeletionDelay);
}
- public void removeSegmentsFromStoreInBatch(String tableNameWithType,
List<String> segments,
+ public void removeSegmentsFromStoreInBatch(String tableNameWithType,
Collection<String> segments,
Review Comment:
Changing this public override signature from `List<String>` to
`Collection<String>` is not a pure refactor: existing `SegmentDeletionManager`
subclasses compiled against the old method will silently stop overriding after
upgrade, because `deleteSegmentFromPropertyStoreAndLocal()` now dispatches to
the new descriptor. That can bypass custom deep-store deletion logic in
downstream controllers. Please preserve the old signature as a delegating
overload or otherwise keep the existing override contract.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]