This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fe03999 Fix retention for cleaning up segment lineage (#7424)
fe03999 is described below
commit fe03999a9c59aa1a97eeb5cf070858c6e10f23b6
Author: Jialiang Li <[email protected]>
AuthorDate: Mon Sep 13 15:32:10 2021 -0700
Fix retention for cleaning up segment lineage (#7424)
Co-authored-by: Jack Li(Analytics Engineering) <[email protected]>
---
.../helix/core/retention/RetentionManager.java | 19 ++++++++++++++-----
1 file changed, 14 insertions(+), 5 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index b9b0d09..f605e4c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -79,8 +79,13 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
@Override
protected void processTable(String tableNameWithType) {
- LOGGER.info("Start managing retention for table: {}", tableNameWithType);
+ // Manage normal table retention except segment lineage cleanup.
+ // The reason of separating the logic is that REFRESH only table will be
skipped in the first part,
+ // whereas the segment lineage cleanup needs to be handled.
manageRetentionForTable(tableNameWithType);
+
+ // Delete segments based on segment lineage and clean up segment lineage
metadata.
+ manageSegmentLineageCleanupForTable(tableNameWithType);
}
@Override
@@ -90,6 +95,7 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
}
private void manageRetentionForTable(String tableNameWithType) {
+ LOGGER.info("Start managing retention for table: {}", tableNameWithType);
// Build retention strategy from table config
TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
@@ -102,7 +108,7 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
SegmentsValidationAndRetentionConfig validationConfig =
tableConfig.getValidationConfig();
String segmentPushType =
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
if (tableConfig.getTableType() == TableType.OFFLINE &&
!"APPEND".equalsIgnoreCase(segmentPushType)) {
- LOGGER.info("Segment push type is not APPEND for table: {}, skip",
tableNameWithType);
+ LOGGER.info("Segment push type is not APPEND for table: {}, skip
managing retention", tableNameWithType);
return;
}
String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
@@ -123,9 +129,6 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
} else {
manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy);
}
-
- // Delete segments based on segment lineage and clean up segment lineage
metadata
- manageSegmentLineageCleanupForTable(tableNameWithType);
}
private void manageRetentionForOfflineTable(String offlineTableName,
RetentionStrategy retentionStrategy) {
@@ -202,6 +205,8 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
if (segmentLineageZNRecord == null) {
return true;
}
+ LOGGER.info("Start cleaning up segment lineage for table: {}",
tableNameWithType);
+ long cleanupStartTime = System.currentTimeMillis();
SegmentLineage segmentLineage =
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
int expectedVersion = segmentLineageZNRecord.getVersion();
@@ -246,8 +251,12 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
.writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(),
segmentLineage, expectedVersion)) {
// Delete segments based on the segment lineage
_pinotHelixResourceManager.deleteSegments(tableNameWithType,
segmentsToDelete);
+ LOGGER.info("Finished cleaning up segment lineage for table: {} in
{}ms", tableNameWithType,
+ (System.currentTimeMillis() - cleanupStartTime));
return true;
} else {
+ LOGGER.warn("Failed to write segment lineage back when cleaning up
segment lineage for table: {}",
+ tableNameWithType);
return false;
}
});
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]