This is an automated email from the ASF dual-hosted git repository.

swaminathanmanish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ce2b21b730 Refactor deep store delete code (#18678)
0ce2b21b730 is described below

commit 0ce2b21b7307c726ffde9eada73ef4bb386dc378
Author: Krishan Goyal <[email protected]>
AuthorDate: Wed Jun 10 16:55:19 2026 +0530

    Refactor deep store delete code (#18678)
    
    * Refactor deep store delete code
    
    * Add RetentionManager factory hook and FakePropertyStore bulk-read support
    
    * Factory for Retention Manager and Segment Deletion Manager in their tests
    
    * Use LinkedHashSet to keep deep-store deletion order deterministic
    
    deleteSegmentsFromPropertyStore returned a HashSet that was iterated by
    removeSegmentsFromStoreInBatch, making the deep-store deletion order depend
    on hash bucket layout. Switch to LinkedHashSet to preserve the input order.
    
    Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
 .../common/utils/helix/FakePropertyStore.java      |  41 +++++++
 .../pinot/controller/BaseControllerStarter.java    |  14 ++-
 .../helix/core/PinotHelixResourceManager.java      |   8 +-
 .../helix/core/SegmentDeletionManager.java         | 133 +++++++++++++--------
 .../helix/core/retention/RetentionManager.java     |  23 ++--
 .../helix/core/retention/RetentionManagerTest.java |  23 ++--
 .../core/util/SegmentDeletionManagerTest.java      |  13 +-
 7 files changed, 178 insertions(+), 77 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/FakePropertyStore.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/FakePropertyStore.java
index 2c5806aab92..6e146504f85 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/FakePropertyStore.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/FakePropertyStore.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.utils.helix;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -57,6 +58,46 @@ public class FakePropertyStore extends 
ZkHelixPropertyStore<ZNRecord> {
         .collect(Collectors.toList());
   }
 
+  /**
+   * Bulk-read variant that returns the stored {@link ZNRecord} for each 
immediate child of
+   * {@code parentPath}. Populates {@code stats} with the per-record stat in 
the same order as the
+   * returned list. The real Helix implementation goes through {@code 
_baseAccessor}, which is
+   * {@code null} in this fake; this override backs the same read shape from 
the in-memory map so
+   * tests that rely on bulk reads work end-to-end.
+   */
+  @Override
+  public List<ZNRecord> getChildren(String parentPath, List<Stat> stats, int 
options) {
+    List<String> childNames = getChildNames(parentPath, options);
+    List<ZNRecord> out = new ArrayList<>(childNames.size());
+    if (stats != null) {
+      stats.clear();
+    }
+    for (String childName : childNames) {
+      String childPath = parentPath + "/" + childName;
+      ZNRecord record = _contents.get(childPath);
+      if (record == null) {
+        continue;
+      }
+      out.add(record);
+      if (stats != null) {
+        Stat stat = _statMap.get(childPath);
+        stats.add(stat != null ? stat : new Stat());
+      }
+    }
+    return out;
+  }
+
+  /**
+   * Five-arg getChildren overload used by Pinot via {@code 
CommonConstants.Helix.ZkClient.RETRY_*}.
+   * The retry parameters don't apply to the in-memory fake; delegates to the 
simpler signature
+   * above.
+   */
+  @Override
+  public List<ZNRecord> getChildren(String parentPath, List<Stat> stats, int 
options, int retryCount,
+      int retryInterval) {
+    return getChildren(parentPath, stats, options);
+  }
+
   @Override
   public boolean exists(String path, int options) {
     return _contents.containsKey(path);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index c0a64c5fc5f..a7ef8f42662 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -1077,8 +1077,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     initRealtimeOffsetAutoResetManager(periodicTasks);
     BrokerServiceHelper brokerServiceHelper =
         new BrokerServiceHelper(_helixResourceManager, _config, 
_executorService, _connectionManager);
-    _retentionManager = new RetentionManager(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
-        brokerServiceHelper);
+    _retentionManager = createRetentionManager(brokerServiceHelper);
     periodicTasks.add(_retentionManager);
     _offlineSegmentValidationManager =
         new OfflineSegmentValidationManager(_config, _helixResourceManager, 
_leadControllerManager,
@@ -1136,6 +1135,17 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     periodicTasks.add(_realtimeOffsetAutoResetManager);
   }
 
+  /**
+   * Factory hook for the controller's {@link RetentionManager}. Subclasses 
override to install a
+   * deployment-specific retention manager (e.g. to extend the 
untracked-segment sweep with names
+   * tracked outside the standard per-segment ZK znodes). The default 
constructs the stock
+   * {@link RetentionManager}.
+   */
+  protected RetentionManager createRetentionManager(BrokerServiceHelper 
brokerServiceHelper) {
+    return new RetentionManager(_helixResourceManager, _leadControllerManager, 
_config, _controllerMetrics,
+        brokerServiceHelper);
+  }
+
   /**
    * Creates a TaskManager instance  as specified in the configuration.
    */
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 24d779e2507..4013379f722 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -318,7 +318,7 @@ public class PinotHelixResourceManager {
     _helixDataAccessor = _helixZkManager.getHelixDataAccessor();
     _keyBuilder = _helixDataAccessor.keyBuilder();
     _controllerMetrics = controllerMetrics;
-    _segmentDeletionManager = new SegmentDeletionManager(_dataDir, 
_helixAdmin, _helixClusterName, _propertyStore,
+    _segmentDeletionManager = createSegmentDeletionManager(_dataDir, 
_helixAdmin, _helixClusterName, _propertyStore,
         _deletedSegmentsRetentionInDays);
     ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, 
_isSingleTenantCluster);
 
@@ -404,6 +404,12 @@ public class PinotHelixResourceManager {
     return _segmentDeletionManager;
   }
 
+  protected SegmentDeletionManager createSegmentDeletionManager(String 
dataDir, HelixAdmin helixAdmin,
+      String helixClusterName, ZkHelixPropertyStore<ZNRecord> propertyStore, 
int deletedSegmentsRetentionInDays) {
+    return new SegmentDeletionManager(dataDir, helixAdmin, helixClusterName, 
propertyStore,
+        deletedSegmentsRetentionInDays);
+  }
+
   /**
    * Get the Helix manager.
    *
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index 3d5680fae37..7d4944897c7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -124,7 +125,7 @@ public class SegmentDeletionManager {
     _materializedViewConsistencyManager = materializedViewConsistencyManager;
   }
 
-  private void notifyMaterializedViewConsistencyManager(String tableName, 
List<String> segmentsToDelete) {
+  protected void notifyMaterializedViewConsistencyManager(String tableName, 
List<String> segmentsToDelete) {
     MaterializedViewConsistencyManager mgr = 
_materializedViewConsistencyManager;
     if (mgr == null || segmentsToDelete.isEmpty()) {
       return;
@@ -187,17 +188,51 @@ public class SegmentDeletionManager {
 
   protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String 
tableName, Collection<String> segmentIds,
       Long deletedSegmentsRetentionMs, long deletionDelay) {
-    // Check if segment got removed from ExternalView or IdealState
+    List<String> segmentsToDelete = filterSegmentsToDelete(tableName, 
segmentIds);
+    if (segmentsToDelete == null) {
+      // ExternalView or IdealState was unavailable; skip the whole batch
+      return;
+    }
+
+    Set<String> deletedSegments = new HashSet<>();
+    if (!segmentsToDelete.isEmpty()) {
+      // Capture segment time ranges before ZK metadata is removed (for MV 
dirty marking)
+      notifyMaterializedViewConsistencyManager(tableName, segmentsToDelete);
+
+      // Notify all active listeners here
+      PinotSegmentLifecycleEventListenerManager.getInstance()
+          .notifyListeners(new SegmentDeletionEventDetails(tableName, 
segmentsToDelete));
+
+      deletedSegments = deleteSegmentsFromPropertyStore(tableName, 
segmentsToDelete);
+
+      // Best effort remove segments from deep store.
+      // If this fails (e.g. controller crashes, deep store unavailable), 
future runs of RetentionManager
+      // will attempt to delete orphan deep store entries. Check 
getSegmentsToDeleteFromDeepstore()
+      removeSegmentsFromStoreInBatch(tableName, deletedSegments, 
deletedSegmentsRetentionMs);
+    }
+
+    Set<String> segmentsToRetryLater = new HashSet<>(segmentIds);
+    segmentsToRetryLater.removeAll(deletedSegments);
+
+    LOGGER.info("Deleted {} segments from table {}:{}", 
deletedSegments.size(), tableName,
+        deletedSegments.size() <= 5 ? deletedSegments : "");
+
+    if (!segmentsToRetryLater.isEmpty()) {
+      rescheduleRetry(tableName, segmentsToRetryLater, 
deletedSegmentsRetentionMs, deletionDelay);
+    }
+  }
+
+  /// Check if segment got removed from ExternalView or IdealState
+  /// Returns `null` when the ExternalView or IdealState is unavailable
+  protected List<String> filterSegmentsToDelete(String tableName, 
Collection<String> segmentIds) {
     ExternalView externalView = 
_helixAdmin.getResourceExternalView(_helixClusterName, tableName);
     IdealState idealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, tableName);
     if (externalView == null || idealState == null) {
       LOGGER.warn("Resource: {} is not set up in idealState or ExternalView, 
won't do anything", tableName);
-      return;
+      return null;
     }
 
-    List<String> segmentsToDelete = new ArrayList<>(segmentIds.size()); // Has 
the segments that will be deleted
-    Set<String> segmentsToRetryLater = new HashSet<>(segmentIds.size());  // 
List of segments that we need to retry
-
+    List<String> segmentsToDelete = new ArrayList<>(segmentIds.size());
     try {
       for (String segmentId : segmentIds) {
         Map<String, String> segmentToInstancesMapFromExternalView = 
externalView.getStateMap(segmentId);
@@ -205,68 +240,60 @@ public class SegmentDeletionManager {
         if ((segmentToInstancesMapFromExternalView == null || 
segmentToInstancesMapFromExternalView.isEmpty()) && (
             segmentToInstancesMapFromIdealStates == null || 
segmentToInstancesMapFromIdealStates.isEmpty())) {
           segmentsToDelete.add(segmentId);
-        } else {
-          segmentsToRetryLater.add(segmentId);
         }
       }
     } catch (Exception e) {
       LOGGER.warn("Caught exception while checking helix states for table: 
{}", tableName, e);
       segmentsToDelete.clear();
       segmentsToDelete.addAll(segmentIds);
-      segmentsToRetryLater.clear();
     }
+    return segmentsToDelete;
+  }
 
-    if (!segmentsToDelete.isEmpty()) {
-      List<String> propStorePathList = new 
ArrayList<>(segmentsToDelete.size());
-      for (String segmentId : segmentsToDelete) {
-        String segmentPropertyStorePath = 
ZKMetadataProvider.constructPropertyStorePathForSegment(tableName, segmentId);
-        propStorePathList.add(segmentPropertyStorePath);
-      }
-
-      // Capture segment time ranges before ZK metadata is removed (for MV 
dirty marking)
-      notifyMaterializedViewConsistencyManager(tableName, segmentsToDelete);
-
-      // Notify all active listeners here
-      PinotSegmentLifecycleEventListenerManager.getInstance()
-          .notifyListeners(new SegmentDeletionEventDetails(tableName, 
segmentsToDelete));
+  /// Removes the property-store znodes for the given segments
+  /// Returns the set of segments that were successfully deleted.
+  protected Set<String> deleteSegmentsFromPropertyStore(String tableName, 
List<String> segmentsToDelete) {
+    // Use a LinkedHashSet so deep-store deletion preserves the input order 
(deterministic) rather
+    // than HashSet bucket order, while keeping the Set semantics callers rely 
on.
+    Set<String> deletedSegments = new LinkedHashSet<>(segmentsToDelete.size());
+    List<String> propStorePathList = new ArrayList<>(segmentsToDelete.size());
+    for (String segmentId : segmentsToDelete) {
+      String segmentPropertyStorePath = 
ZKMetadataProvider.constructPropertyStorePathForSegment(tableName, segmentId);
+      propStorePathList.add(segmentPropertyStorePath);
+    }
 
-      boolean[] deleteSuccessful = _propertyStore.remove(propStorePathList, 
AccessOption.PERSISTENT);
-      List<String> propStoreFailedSegs = new 
ArrayList<>(segmentsToDelete.size());
-      for (int i = 0; i < deleteSuccessful.length; i++) {
-        final String segmentId = segmentsToDelete.get(i);
-        if (!deleteSuccessful[i]) {
-          // The batch remove API takes a non-recursive ZK path: it cannot 
delete a znode that has
-          // accumulated children. Fall back to the single-path remove API, 
which falls back to a
-          // recursive delete on the same NotEmpty failure. Skip when the 
znode is already gone
-          // (the batch call may have failed simply because the entry did not 
exist).
-          String segmentPath = propStorePathList.get(i);
-          if (_propertyStore.exists(segmentPath, AccessOption.PERSISTENT)
-              && !_propertyStore.remove(segmentPath, AccessOption.PERSISTENT)) 
{
-            LOGGER.info("Could not delete {} from propertystore", segmentPath);
-            segmentsToRetryLater.add(segmentId);
-            propStoreFailedSegs.add(segmentId);
-          }
+    boolean[] deleteSuccessful = _propertyStore.remove(propStorePathList, 
AccessOption.PERSISTENT);
+    for (int i = 0; i < deleteSuccessful.length; i++) {
+      final String segmentId = segmentsToDelete.get(i);
+      if (deleteSuccessful[i]) {
+        deletedSegments.add(segmentId);
+      } else {
+        // The batch remove API takes a non-recursive ZK path: it cannot 
delete a znode that has
+        // accumulated children. Fall back to the single-path remove API, 
which falls back to a
+        // recursive delete on the same NotEmpty failure. Skip when the znode 
is already gone
+        // (the batch call may have failed simply because the entry did not 
exist).
+        String segmentPath = propStorePathList.get(i);
+        if (_propertyStore.exists(segmentPath, AccessOption.PERSISTENT)
+            && !_propertyStore.remove(segmentPath, AccessOption.PERSISTENT)) {
+          LOGGER.info("Could not delete {} from propertystore", segmentPath);
+        } else {
+          deletedSegments.add(segmentId);
         }
       }
-      segmentsToDelete.removeAll(propStoreFailedSegs);
-
-      // TODO: If removing segments from deep store fails (e.g. controller 
crashes, deep store unavailable), these
-      //       segments will become orphans and not easy to track because 
their ZK metadata are already deleted.
-      //       Consider removing segments from deep store before cleaning up 
the ZK metadata.
-      removeSegmentsFromStoreInBatch(tableName, segmentsToDelete, 
deletedSegmentsRetentionMs);
     }
+    return deletedSegments;
+  }
 
-    LOGGER.info("Deleted {} segments from table {}:{}", 
segmentsToDelete.size(), tableName,
-        segmentsToDelete.size() <= 5 ? segmentsToDelete : "");
-
-    if (!segmentsToRetryLater.isEmpty()) {
-      long effectiveDeletionDelay = Math.min(deletionDelay * 2, 
MAX_DELETION_DELAY_SECONDS);
-      LOGGER.info("Postponing deletion of {} segments from table {}", 
segmentsToRetryLater.size(), tableName);
-      deleteSegmentsWithDelay(tableName, segmentsToRetryLater, 
deletedSegmentsRetentionMs, effectiveDeletionDelay);
-    }
+  /// Reschedules the segments that could not be deleted this pass, applying 
the exponential back-off
+  /// (capped at [#MAX_DELETION_DELAY_SECONDS]). No-op when there is nothing 
to retry.
+  protected void rescheduleRetry(String tableName, Collection<String> 
segmentsToRetryLater,
+      Long deletedSegmentsRetentionMs, long deletionDelay) {
+    long effectiveDeletionDelay = Math.min(deletionDelay * 2, 
MAX_DELETION_DELAY_SECONDS);
+    LOGGER.info("Postponing deletion of {} segments from table {}", 
segmentsToRetryLater.size(), tableName);
+    deleteSegmentsWithDelay(tableName, segmentsToRetryLater, 
deletedSegmentsRetentionMs, effectiveDeletionDelay);
   }
 
-  public void removeSegmentsFromStoreInBatch(String tableNameWithType, 
List<String> segments,
+  public void removeSegmentsFromStoreInBatch(String tableNameWithType, 
Collection<String> segments,
       @Nullable Long deletedSegmentsRetentionMs) {
     if (_dataDir == null) {
       LOGGER.info("dataDir is not configured, won't delete segment from disk 
for table: {}", tableNameWithType);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 55de58eae34..f65166f5a31 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -312,6 +312,18 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
     }
   }
 
+  @VisibleForTesting
+  protected Set<String> getSegmentNames(String tableNameWithType,
+      List<SegmentZKMetadata> segmentZKMetadataList) {
+    return segmentZKMetadataList.stream()
+        .map(SegmentZKMetadata::getSegmentName)
+        .collect(Collectors.toCollection(HashSet::new));
+  }
+
+  protected List<String> getSegmentNames(String tableNameWithType) {
+    return _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false);
+  }
+
   private List<String> getSegmentsToDeleteFromDeepstore(String 
tableNameWithType, RetentionStrategy retentionStrategy,
       List<SegmentZKMetadata> segmentZKMetadataList, int 
untrackedSegmentsDeletionBatchSize,
       RetentionStrategy untrackedSegmentsRetentionStrategy) {
@@ -344,18 +356,11 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
       return segmentsToDelete;
     }
 
-    Set<String> segmentsPresentInZK;
+    Set<String> segmentsPresentInZK = getSegmentNames(tableNameWithType, 
segmentZKMetadataList);
     if (isHybridTable) {
-      segmentsPresentInZK = new HashSet<>();
       // This must be the OFFLINE table
-      segmentsPresentInZK.addAll(
-          
segmentZKMetadataList.stream().map(SegmentZKMetadata::getSegmentName).collect(Collectors.toSet()));
       // Add segments from the REALTIME table as well
-      segmentsPresentInZK.addAll(
-          
_pinotHelixResourceManager.getSegmentsFor(TableNameBuilder.REALTIME.tableNameWithType(rawTableName),
 false));
-    } else {
-      segmentsPresentInZK =
-          
segmentZKMetadataList.stream().map(SegmentZKMetadata::getSegmentName).collect(Collectors.toSet());
+      
segmentsPresentInZK.addAll(getSegmentNames(TableNameBuilder.REALTIME.tableNameWithType(rawTableName)));
     }
 
     try {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index c32459615db..9999612bd24 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -91,6 +91,13 @@ public class RetentionManagerTest {
   private Path _tempDir;
   private File _tableDir;
 
+  protected RetentionManager createRetentionManager(PinotHelixResourceManager 
pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerConf config, 
ControllerMetrics controllerMetrics,
+      BrokerServiceHelper brokerServiceHelper) {
+    return new RetentionManager(pinotHelixResourceManager, 
leadControllerManager, config, controllerMetrics,
+        brokerServiceHelper);
+  }
+
   @BeforeMethod
   public void setUp() throws Exception {
     // Setup for real file test
@@ -202,7 +209,7 @@ public class RetentionManagerTest {
     BrokerServiceHelper brokerServiceHelper =
         new BrokerServiceHelper(mockResourceManager, conf, null, null);
     RetentionManager retentionManager =
-        new RetentionManager(pinotHelixResourceManager, leadControllerManager, 
conf, controllerMetrics,
+        createRetentionManager(pinotHelixResourceManager, 
leadControllerManager, conf, controllerMetrics,
             brokerServiceHelper);
     retentionManager.start();
     retentionManager.run();
@@ -403,7 +410,7 @@ public class RetentionManagerTest {
     PinotHelixResourceManager mockResourceManager = 
mock(PinotHelixResourceManager.class);
     BrokerServiceHelper brokerServiceHelper =
         new BrokerServiceHelper(mockResourceManager, conf, null, null);
-    RetentionManager retentionManager = new 
RetentionManager(pinotHelixResourceManager, leadControllerManager, conf,
+    RetentionManager retentionManager = 
createRetentionManager(pinotHelixResourceManager, leadControllerManager, conf,
         controllerMetrics, brokerServiceHelper);
     retentionManager.start();
     retentionManager.run();
@@ -495,7 +502,7 @@ public class RetentionManagerTest {
 
     // test
     RetentionManager retentionManager =
-        new RetentionManager(mockPinotHelixResourceManager, null, 
controllerConf, mock(ControllerMetrics.class),
+        createRetentionManager(mockPinotHelixResourceManager, null, 
controllerConf, mock(ControllerMetrics.class),
             brokerServiceHelper);
     retentionManager.manageRetentionForHybridTable(realtimeTableConfig, 
offlineTableConfig);
 
@@ -613,7 +620,7 @@ public class RetentionManagerTest {
     PinotHelixResourceManager mockResourceManager = 
mock(PinotHelixResourceManager.class);
     BrokerServiceHelper brokerServiceHelper =
         new BrokerServiceHelper(mockResourceManager, conf, null, null);
-    RetentionManager retentionManager = new 
RetentionManager(pinotHelixResourceManager, leadControllerManager, conf,
+    RetentionManager retentionManager = 
createRetentionManager(pinotHelixResourceManager, leadControllerManager, conf,
         controllerMetrics, brokerServiceHelper);
     retentionManager.start();
     retentionManager.run();
@@ -651,7 +658,7 @@ public class RetentionManagerTest {
     BrokerServiceHelper brokerServiceHelper =
         new BrokerServiceHelper(mockResourceManager, conf, null, null);
     RetentionManager retentionManager =
-        new RetentionManager(pinotHelixResourceManager, leadControllerManager, 
conf, controllerMetrics,
+        createRetentionManager(pinotHelixResourceManager, 
leadControllerManager, conf, controllerMetrics,
             brokerServiceHelper);
     retentionManager.start();
     retentionManager.run();
@@ -698,7 +705,7 @@ public class RetentionManagerTest {
     BrokerServiceHelper brokerServiceHelper =
         new BrokerServiceHelper(mockResourceManager, conf, null, null);
     RetentionManager retentionManager =
-        new RetentionManager(pinotHelixResourceManager, leadControllerManager, 
conf, controllerMetrics,
+        createRetentionManager(pinotHelixResourceManager, 
leadControllerManager, conf, controllerMetrics,
             brokerServiceHelper);
 
     
retentionManager.findUntrackedSegmentsToDeleteFromDeepstore("table1_REALTIME", 
null, segmentsToExclude, null);
@@ -871,7 +878,7 @@ public class RetentionManagerTest {
     BrokerServiceHelper brokerServiceHelper =
         new BrokerServiceHelper(mockResourceManager, conf, null, null);
     RetentionManager retentionManager =
-        new RetentionManager(mockResourceManager, 
mock(LeadControllerManager.class), conf, controllerMetrics,
+        createRetentionManager(mockResourceManager, 
mock(LeadControllerManager.class), conf, controllerMetrics,
             brokerServiceHelper);
 
     // Default should be false
@@ -954,7 +961,7 @@ public class RetentionManagerTest {
     BrokerServiceHelper brokerServiceHelper =
         new BrokerServiceHelper(mockResourceManager, conf, null, null);
     RetentionManager retentionManager =
-        new RetentionManager(pinotHelixResourceManager, leadControllerManager, 
conf, controllerMetrics,
+        createRetentionManager(pinotHelixResourceManager, 
leadControllerManager, conf, controllerMetrics,
             brokerServiceHelper);
     retentionManager.start();
     retentionManager.run();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
index c65162123cb..bf36f7a8660 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
@@ -82,6 +82,11 @@ public class SegmentDeletionManagerTest {
     RETENTION_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
   }
 
+  protected SegmentDeletionManager createDeletionManager(String dataDir, 
HelixAdmin helixAdmin, String clusterName,
+      ZkHelixPropertyStore<ZNRecord> propertyStore, int 
deletedSegmentsRetentionInDays) {
+    return new SegmentDeletionManager(dataDir, helixAdmin, clusterName, 
propertyStore, deletedSegmentsRetentionInDays);
+  }
+
   HelixAdmin makeHelixAdmin() {
     HelixAdmin admin = mock(HelixAdmin.class);
     ExternalView ev = mock(ExternalView.class);
@@ -387,7 +392,7 @@ public class SegmentDeletionManagerTest {
     ZkHelixPropertyStore<ZNRecord> propertyStore = makePropertyStore();
     File tempDir = Files.createTempDirectory("pinot-test-").toFile();
     tempDir.deleteOnExit();
-    SegmentDeletionManager deletionManager = new SegmentDeletionManager(
+    SegmentDeletionManager deletionManager = createDeletionManager(
         tempDir.getAbsolutePath(), helixAdmin, CLUSTER_NAME, propertyStore, 7);
 
     // create table segment files.
@@ -443,7 +448,7 @@ public class SegmentDeletionManagerTest {
     ZkHelixPropertyStore<ZNRecord> propertyStore = makePropertyStore();
     File tempDir = Files.createTempDirectory("pinot-test-").toFile();
     tempDir.deleteOnExit();
-    SegmentDeletionManager deletionManager = new SegmentDeletionManager(
+    SegmentDeletionManager deletionManager = createDeletionManager(
         tempDir.getAbsolutePath(), helixAdmin, CLUSTER_NAME, propertyStore, 7);
 
     // create table segment files.
@@ -512,7 +517,7 @@ public class SegmentDeletionManagerTest {
     ZkHelixPropertyStore<ZNRecord> propertyStore = makePropertyStore();
     File tempDir = Files.createTempDirectory("pinot-test-").toFile();
     tempDir.deleteOnExit();
-    SegmentDeletionManager deletionManager = new SegmentDeletionManager(
+    SegmentDeletionManager deletionManager = createDeletionManager(
         tempDir.getAbsolutePath(), helixAdmin, CLUSTER_NAME, propertyStore, 7);
 
     // create table segment files.
@@ -602,7 +607,7 @@ public class SegmentDeletionManagerTest {
     }
 
     @Override
-    public void removeSegmentsFromStoreInBatch(String tableNameWithType, 
List<String> segments,
+    public void removeSegmentsFromStoreInBatch(String tableNameWithType, 
Collection<String> segments,
         @Nullable Long deletedSegmentsRetentionMs) {
       _segmentsRemovedFromStore.addAll(segments);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to