This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fe03999  Fix retention for cleaning up segment lineage (#7424)
fe03999 is described below

commit fe03999a9c59aa1a97eeb5cf070858c6e10f23b6
Author: Jialiang Li <[email protected]>
AuthorDate: Mon Sep 13 15:32:10 2021 -0700

    Fix retention for cleaning up segment lineage (#7424)
    
    Co-authored-by: Jack Li(Analytics Engineering) <[email protected]>
---
 .../helix/core/retention/RetentionManager.java        | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index b9b0d09..f605e4c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -79,8 +79,13 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
 
   @Override
   protected void processTable(String tableNameWithType) {
-    LOGGER.info("Start managing retention for table: {}", tableNameWithType);
+    // Manage normal table retention except segment lineage cleanup.
+    // The reason of separating the logic is that REFRESH only table will be 
skipped in the first part,
+    // whereas the segment lineage cleanup needs to be handled.
     manageRetentionForTable(tableNameWithType);
+
+    // Delete segments based on segment lineage and clean up segment lineage 
metadata.
+    manageSegmentLineageCleanupForTable(tableNameWithType);
   }
 
   @Override
@@ -90,6 +95,7 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
   }
 
   private void manageRetentionForTable(String tableNameWithType) {
+    LOGGER.info("Start managing retention for table: {}", tableNameWithType);
 
     // Build retention strategy from table config
     TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
@@ -102,7 +108,7 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
     SegmentsValidationAndRetentionConfig validationConfig = 
tableConfig.getValidationConfig();
     String segmentPushType = 
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
     if (tableConfig.getTableType() == TableType.OFFLINE && 
!"APPEND".equalsIgnoreCase(segmentPushType)) {
-      LOGGER.info("Segment push type is not APPEND for table: {}, skip", 
tableNameWithType);
+      LOGGER.info("Segment push type is not APPEND for table: {}, skip 
managing retention", tableNameWithType);
       return;
     }
     String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
@@ -123,9 +129,6 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
     } else {
       manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy);
     }
-
-    // Delete segments based on segment lineage and clean up segment lineage 
metadata
-    manageSegmentLineageCleanupForTable(tableNameWithType);
   }
 
   private void manageRetentionForOfflineTable(String offlineTableName, 
RetentionStrategy retentionStrategy) {
@@ -202,6 +205,8 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
         if (segmentLineageZNRecord == null) {
           return true;
         }
+        LOGGER.info("Start cleaning up segment lineage for table: {}", 
tableNameWithType);
+        long cleanupStartTime = System.currentTimeMillis();
         SegmentLineage segmentLineage = 
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
         int expectedVersion = segmentLineageZNRecord.getVersion();
 
@@ -246,8 +251,12 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
             
.writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), 
segmentLineage, expectedVersion)) {
           // Delete segments based on the segment lineage
           _pinotHelixResourceManager.deleteSegments(tableNameWithType, 
segmentsToDelete);
+          LOGGER.info("Finished cleaning up segment lineage for table: {} in 
{}ms", tableNameWithType,
+              (System.currentTimeMillis() - cleanupStartTime));
           return true;
         } else {
+          LOGGER.warn("Failed to write segment lineage back when cleaning up 
segment lineage for table: {}",
+              tableNameWithType);
           return false;
         }
       });

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to