This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9dea5537c27 Add support for providing custom retention time for
untracked segments (#16719)
9dea5537c27 is described below
commit 9dea5537c2778564c17bad6f8543c3e04d27fac8
Author: 9aman <[email protected]>
AuthorDate: Thu Sep 11 21:36:24 2025 +0530
Add support for providing custom retention time for untracked segments
(#16719)
* Add support for providing custom retention time for untracked segments
* Remove unnecessary configs added
* Ensure valid untracked retention time unit and value are provided in the
table config
* Make untracked segments retention configurable via cluster config
---
.../apache/pinot/controller/ControllerConf.java | 8 ++++
.../helix/core/SegmentDeletionManager.java | 2 +-
.../helix/core/retention/RetentionManager.java | 51 ++++++++++++++++----
.../core/util/SegmentDeletionManagerTest.java | 6 +++
.../segment/local/utils/TableConfigUtils.java | 56 +++++++++++++++++++---
.../SegmentsValidationAndRetentionConfig.java | 18 +++++++
6 files changed, 123 insertions(+), 18 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 1a849cab307..c96355d37e2 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -265,6 +265,9 @@ public class ControllerConf extends PinotConfiguration {
// Untracked segments are those that exist in deep store but have no
corresponding entry in the ZK property store.
public static final String ENABLE_UNTRACKED_SEGMENT_DELETION =
"controller.retentionManager.untrackedSegmentDeletionEnabled";
+ public static final String UNTRACKED_SEGMENTS_RETENTION_TIME_IN_DAYS =
+ "controller.retentionManager.untrackedSegmentsRetentionTimeInDays";
+ public static final int DEFAULT_UNTRACKED_SEGMENTS_RETENTION_TIME_IN_DAYS
= 3;
public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
public static final int DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND =
60 * 60; // 1 Hour.
@@ -1199,6 +1202,11 @@ public class ControllerConf extends PinotConfiguration {
return
getProperty(ControllerPeriodicTasksConf.ENABLE_UNTRACKED_SEGMENT_DELETION,
false);
}
+ public int getUntrackedSegmentsRetentionTimeInDays() {
+ return
getProperty(ControllerPeriodicTasksConf.UNTRACKED_SEGMENTS_RETENTION_TIME_IN_DAYS,
+
ControllerPeriodicTasksConf.DEFAULT_UNTRACKED_SEGMENTS_RETENTION_TIME_IN_DAYS);
+ }
+
public void setUntrackedSegmentDeletionEnabled(boolean
untrackedSegmentDeletionEnabled) {
setProperty(ControllerPeriodicTasksConf.ENABLE_UNTRACKED_SEGMENT_DELETION,
untrackedSegmentDeletionEnabled);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index a87d8011529..5638ebc16ce 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -197,7 +197,7 @@ public class SegmentDeletionManager {
// TODO: If removing segments from deep store fails (e.g. controller
crashes, deep store unavailable), these
// segments will become orphans and not easy to track because
their ZK metadata are already deleted.
// Consider removing segments from deep store before cleaning up
the ZK metadata.
- removeSegmentsFromStore(tableName, segmentsToDelete,
deletedSegmentsRetentionMs);
+ removeSegmentsFromStoreInBatch(tableName, segmentsToDelete,
deletedSegmentsRetentionMs);
}
LOGGER.info("Deleted {} segments from table {}:{}",
segmentsToDelete.size(), tableName,
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 3f6c162b81a..a92dffff3f9 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -80,6 +80,7 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
public static final int DEFAULT_UNTRACKED_SEGMENTS_DELETION_BATCH_SIZE = 100;
private static final RetryPolicy DEFAULT_RETRY_POLICY =
RetryPolicies.randomDelayRetryPolicy(20, 100L, 200L);
private final boolean _untrackedSegmentDeletionEnabled;
+ private final int _untrackedSegmentsRetentionTimeInDays;
private static final Logger LOGGER =
LoggerFactory.getLogger(RetentionManager.class);
private final boolean _isHybridTableRetentionStrategyEnabled;
@@ -92,6 +93,7 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
config.getRetentionManagerInitialDelayInSeconds(),
pinotHelixResourceManager, leadControllerManager,
controllerMetrics);
_untrackedSegmentDeletionEnabled =
config.getUntrackedSegmentDeletionEnabled();
+ _untrackedSegmentsRetentionTimeInDays =
config.getUntrackedSegmentsRetentionTimeInDays();
_isHybridTableRetentionStrategyEnabled =
config.isHybridTableRetentionStrategyEnabled();
_brokerServiceHelper = brokerServiceHelper;
LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}",
getIntervalInSeconds());
@@ -148,9 +150,13 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
return;
}
+ RetentionStrategy untrackedSegmentsRetentionStrategy =
+ createUntrackedSegmentsRetentionStrategy(validationConfig,
tableNameWithType);
+
// Scan all segment ZK metadata and purge segments if necessary
if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
- manageRetentionForOfflineTable(tableNameWithType, retentionStrategy,
untrackedSegmentsDeletionBatchSize);
+ manageRetentionForOfflineTable(tableNameWithType, retentionStrategy,
untrackedSegmentsDeletionBatchSize,
+ untrackedSegmentsRetentionStrategy);
} else {
String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
TableConfig offlineTableConfig =
_pinotHelixResourceManager.getOfflineTableConfig(rawTableName);
@@ -159,20 +165,21 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
// TODO: handle the orphan segment deletion for hybrid table
manageRetentionForHybridTable(tableConfig, offlineTableConfig);
} else {
- manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy,
untrackedSegmentsDeletionBatchSize);
+ manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy,
untrackedSegmentsDeletionBatchSize,
+ untrackedSegmentsRetentionStrategy);
}
}
}
private void manageRetentionForOfflineTable(String offlineTableName,
RetentionStrategy retentionStrategy,
- int untrackedSegmentsDeletionBatchSize) {
+ int untrackedSegmentsDeletionBatchSize, RetentionStrategy
untrackedSegmentsRetentionStrategy) {
List<SegmentZKMetadata> segmentZKMetadataList =
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName);
// fetch those segments that are beyond the retention period and don't
have an entry in ZK i.e.
// SegmentZkMetadata is missing for those segments
List<String> segmentsToDelete =
getSegmentsToDeleteFromDeepstore(offlineTableName, retentionStrategy,
segmentZKMetadataList,
- untrackedSegmentsDeletionBatchSize);
+ untrackedSegmentsDeletionBatchSize,
untrackedSegmentsRetentionStrategy);
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
if (retentionStrategy.isPurgeable(offlineTableName, segmentZKMetadata)) {
@@ -186,14 +193,14 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
}
private void manageRetentionForRealtimeTable(String realtimeTableName,
RetentionStrategy retentionStrategy,
- int untrackedSegmentsDeletionBatchSize) {
+ int untrackedSegmentsDeletionBatchSize, RetentionStrategy
untrackedSegmentsRetentionStrategy) {
List<SegmentZKMetadata> segmentZKMetadataList =
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
// fetch those segments that are beyond the retention period and don't
have an entry in ZK i.e.
// SegmentZkMetadata is missing for those segments
List<String> segmentsToDelete =
getSegmentsToDeleteFromDeepstore(realtimeTableName, retentionStrategy,
segmentZKMetadataList,
- untrackedSegmentsDeletionBatchSize);
+ untrackedSegmentsDeletionBatchSize,
untrackedSegmentsRetentionStrategy);
IdealState idealState = _pinotHelixResourceManager.getHelixAdmin()
.getResourceIdealState(_pinotHelixResourceManager.getHelixClusterName(),
realtimeTableName);
@@ -300,7 +307,8 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
}
private List<String> getSegmentsToDeleteFromDeepstore(String
tableNameWithType, RetentionStrategy retentionStrategy,
- List<SegmentZKMetadata> segmentZKMetadataList, int
untrackedSegmentsDeletionBatchSize) {
+ List<SegmentZKMetadata> segmentZKMetadataList, int
untrackedSegmentsDeletionBatchSize,
+ RetentionStrategy untrackedSegmentsRetentionStrategy) {
List<String> segmentsToDelete = new ArrayList<>();
String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
boolean isHybridTable =
_pinotHelixResourceManager.hasOfflineTable(rawTableName)
@@ -348,7 +356,8 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
LOGGER.info("Fetch segments present in deep store that are beyond
retention period for table: {}",
tableNameWithType);
segmentsToDelete =
- findUntrackedSegmentsToDeleteFromDeepstore(tableNameWithType,
retentionStrategy, segmentsPresentInZK);
+ findUntrackedSegmentsToDeleteFromDeepstore(tableNameWithType,
retentionStrategy, segmentsPresentInZK,
+ untrackedSegmentsRetentionStrategy);
_controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.UNTRACKED_SEGMENTS_COUNT,
segmentsToDelete.size());
@@ -380,7 +389,8 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
* @throws IOException If there's an error accessing the filesystem
*/
private List<String> findUntrackedSegmentsToDeleteFromDeepstore(String
tableNameWithType,
- RetentionStrategy retentionStrategy, List<String> segmentsToExclude)
+ RetentionStrategy retentionStrategy, List<String> segmentsToExclude,
+ RetentionStrategy untrackedSegmentsRetentionStrategy)
throws IOException {
List<String> segmentsToDelete = new ArrayList<>();
@@ -408,7 +418,11 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
// determine whether the segment should be purged or not based on the
last modified time of the file
long lastModifiedTime = fileMetadata.getLastModifiedTime();
- if (retentionStrategy.isPurgeable(tableNameWithType, segmentName,
lastModifiedTime)) {
+ // the segment is either beyond the table retention or the retention set
for untracked segments
+ boolean shouldDelete = retentionStrategy.isPurgeable(tableNameWithType,
segmentName, lastModifiedTime)
+ || untrackedSegmentsRetentionStrategy.isPurgeable(tableNameWithType,
segmentName, lastModifiedTime);
+
+ if (shouldDelete) {
segmentsToDelete.add(segmentName);
}
}
@@ -483,4 +497,21 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
}
LOGGER.info("Segment lineage metadata clean-up is successfully processed
for table: {}", tableNameWithType);
}
+
+ private RetentionStrategy createUntrackedSegmentsRetentionStrategy(
+ SegmentsValidationAndRetentionConfig validationConfig, String
tableNameWithType) {
+ if (validationConfig.getUntrackedSegmentsRetentionTimeUnit() != null
+ && validationConfig.getUntrackedSegmentsRetentionTimeValue() != null) {
+ try {
+ return new TimeRetentionStrategy(
+
TimeUnit.valueOf(validationConfig.getUntrackedSegmentsRetentionTimeUnit().toUpperCase()),
+
Long.parseLong(validationConfig.getUntrackedSegmentsRetentionTimeValue()));
+ } catch (Exception e) {
+ LOGGER.warn("Invalid untracked segments retention time: {} {} for
table: {}, using default 3 days",
+ validationConfig.getUntrackedSegmentsRetentionTimeUnit(),
+ validationConfig.getUntrackedSegmentsRetentionTimeValue(),
tableNameWithType, e);
+ }
+ }
+ return new TimeRetentionStrategy(TimeUnit.DAYS,
_untrackedSegmentsRetentionTimeInDays);
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
index cf1a091409f..3dcac7b01ea 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
@@ -580,6 +580,12 @@ public class SegmentDeletionManagerTest {
@Nullable Long deletedSegmentsRetentionMs, long deletionDelaySeconds) {
_segmentsToRetry.addAll(segmentIds);
}
+
+ @Override
+ public void removeSegmentsFromStoreInBatch(String tableNameWithType,
List<String> segments,
+ @Nullable Long deletedSegmentsRetentionMs) {
+ _segmentsRemovedFromStore.addAll(segments);
+ }
}
public static class FakePinotFs extends LocalPinotFS {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 19ce8c33813..3871b8d6a9c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -269,14 +269,56 @@ public final class TableConfigUtils {
}
// Retention may not be specified. Ignore validation in that case.
- String timeUnitString = segmentsConfig.getRetentionTimeUnit();
- if (timeUnitString == null || timeUnitString.isEmpty()) {
- return;
+ String retentionTimeUnitString = segmentsConfig.getRetentionTimeUnit();
+ if (retentionTimeUnitString != null && !retentionTimeUnitString.isEmpty())
{
+ try {
+ TimeUnit.valueOf(retentionTimeUnitString.toUpperCase());
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ String.format("Table: %s, invalid retention time unit: %s",
tableName, retentionTimeUnitString));
+ }
}
- try {
- TimeUnit.valueOf(timeUnitString.toUpperCase());
- } catch (Exception e) {
- throw new IllegalStateException(String.format("Table: %s, invalid time
unit: %s", tableName, timeUnitString));
+
+ // Untracked segments retention may not be specified. Ignore validation in
that case.
+ String untrackedSegmentsRetentionTimeUnitString =
segmentsConfig.getUntrackedSegmentsRetentionTimeUnit();
+ String untrackedSegmentsRetentionTimeValueString =
segmentsConfig.getUntrackedSegmentsRetentionTimeValue();
+
+ boolean hasUntrackedTimeUnit =
+ untrackedSegmentsRetentionTimeUnitString != null &&
!untrackedSegmentsRetentionTimeUnitString.isEmpty();
+ boolean hasUntrackedTimeValue =
+ untrackedSegmentsRetentionTimeValueString != null &&
!untrackedSegmentsRetentionTimeValueString.isEmpty();
+
+ if (hasUntrackedTimeUnit && !hasUntrackedTimeValue) {
+ throw new IllegalStateException(String.format(
+ "Table: %s, untracked retention time value must be specified when
untracked retention time unit is provided",
+ tableName));
+ }
+ if (hasUntrackedTimeValue && !hasUntrackedTimeUnit) {
+ throw new IllegalStateException(String.format(
+ "Table: %s, untracked retention time unit must be specified when
untracked retention time value is provided",
+ tableName));
+ }
+
+ if (hasUntrackedTimeUnit) {
+ try {
+
TimeUnit.valueOf(untrackedSegmentsRetentionTimeUnitString.toUpperCase());
+ } catch (Exception e) {
+ throw new IllegalStateException(String.format("Table: %s, invalid
untracked retention time unit: %s", tableName,
+ untrackedSegmentsRetentionTimeUnitString));
+ }
+
+ try {
+ long timeValue =
Long.parseLong(untrackedSegmentsRetentionTimeValueString);
+ if (timeValue <= 0) {
+ throw new IllegalStateException(
+ String.format("Table: %s, untracked retention time value must be
positive: %s", tableName,
+ untrackedSegmentsRetentionTimeValueString));
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ String.format("Table: %s, invalid untracked retention time value:
%s", tableName,
+ untrackedSegmentsRetentionTimeValueString));
+ }
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
index 536128b7c41..c5ba2f1e922 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
@@ -54,6 +54,8 @@ public class SegmentsValidationAndRetentionConfig extends
BaseJsonConfig {
private String _peerSegmentDownloadScheme;
private String _untrackedSegmentsDeletionBatchSize;
+ private String _untrackedSegmentsRetentionTimeUnit;
+ private String _untrackedSegmentsRetentionTimeValue;
/**
* @deprecated Use {@link InstanceAssignmentConfig} instead
@@ -245,4 +247,20 @@ public class SegmentsValidationAndRetentionConfig extends
BaseJsonConfig {
public void setUntrackedSegmentsDeletionBatchSize(String
untrackedSegmentsDeletionBatchSize) {
_untrackedSegmentsDeletionBatchSize = untrackedSegmentsDeletionBatchSize;
}
+
+ public String getUntrackedSegmentsRetentionTimeUnit() {
+ return _untrackedSegmentsRetentionTimeUnit;
+ }
+
+ public void setUntrackedSegmentsRetentionTimeUnit(String
untrackedSegmentsRetentionTimeUnit) {
+ _untrackedSegmentsRetentionTimeUnit = untrackedSegmentsRetentionTimeUnit;
+ }
+
+ public String getUntrackedSegmentsRetentionTimeValue() {
+ return _untrackedSegmentsRetentionTimeValue;
+ }
+
+ public void setUntrackedSegmentsRetentionTimeValue(String
untrackedSegmentsRetentionTimeValue) {
+ _untrackedSegmentsRetentionTimeValue = untrackedSegmentsRetentionTimeValue;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]