This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 22d583c  Improving segment replacement/revert protocol (#7995)
22d583c is described below

commit 22d583c0933e4a59bb6ec19eb6eecf53e9ac5c6d
Author: Seunghyun Lee <[email protected]>
AuthorDate: Thu Jan 13 11:05:30 2022 -0800

    Improving segment replacement/revert protocol (#7995)
    
    1. Add retention for original segments for REFRESH table after
       the replacement protocol is successfully done. For now, we
       use 24 hrs retention. We keep the original data for 24 hours
       so that the user can revert back to the original segments
       using revertReplaceSegments API.
    2. Add the proactive clean-up in startReplaceSegment so that we
       do not keep too much data snapshots for REFRESH table. For
       now, we allow to keep at most 2 snapshots.
    3. Added unit tests.
---
 .../helix/core/PinotHelixResourceManager.java      |  63 +++++--
 .../helix/core/retention/RetentionManager.java     |  55 +++++--
 .../helix/core/PinotHelixResourceManagerTest.java  | 182 +++++++++++++++++++++
 .../core/retention/SegmentLineageCleanupTest.java  |  90 +++++++---
 4 files changed, 344 insertions(+), 46 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 0af8abe..13a1fc3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2783,6 +2783,10 @@ public class PinotHelixResourceManager {
 
     try {
       DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch table config
+        TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+        Preconditions.checkNotNull(tableConfig, "Table config is not available 
for table '%s'", tableNameWithType);
+
         // Fetch the segment lineage metadata
         ZNRecord segmentLineageZNRecord =
             
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, 
tableNameWithType);
@@ -2794,7 +2798,6 @@ public class PinotHelixResourceManager {
           segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
           expectedVersion = segmentLineageZNRecord.getVersion();
         }
-
         // Check that the segment lineage entry id doesn't exists in the 
segment lineage
         
Preconditions.checkArgument(segmentLineage.getLineageEntry(segmentLineageEntryId)
 == null,
             String.format("SegmentLineageEntryId (%s) already exists in the 
segment lineage.", segmentLineageEntryId));
@@ -2811,15 +2814,46 @@ public class PinotHelixResourceManager {
 
           // By here, the lineage entry is either 'IN_PROGRESS' or 'COMPLETED'.
 
-          // When 'forceCleanup' is enabled, we need to proactively revert the 
lineage entry when we find the lineage
-          // entry with the same 'segmentFrom' values.
-          if (forceCleanup && lineageEntry.getState() == 
LineageEntryState.IN_PROGRESS && CollectionUtils
+          // When 'forceCleanup' is enabled, we need to proactively clean up 
at the following cases:
+          // 1. Revert the lineage entry when we find the lineage entry with 
the same 'segmentFrom' values. This is
+          //    used to un-block the segment replacement protocol if the 
previous attempt failed in the middle.
+          // 2. Proactively delete the oldest data snapshot to make sure that 
we only keep at most 2 data snapshots
+          //    at any time in case of REFRESH use case.
+          if (forceCleanup) {
+            if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && 
CollectionUtils
               .isEqualCollection(segmentsFrom, 
lineageEntry.getSegmentsFrom())) {
-            // Update segment lineage entry to 'REVERTED'
-            updateSegmentLineageEntryToReverted(tableNameWithType, 
segmentLineage, entryId, lineageEntry);
-
-            // Add segments for proactive clean-up.
-            segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo());
+              LOGGER.info(
+                  "Detected the incomplete lineage entry with the same 
'segmentsFrom'. Reverting the lineage "
+                      + "entry to unblock the new segment protocol. 
tableNameWithType={}, entryId={}, segmentsFrom={}, "
+                      + "segmentsTo={}", tableNameWithType, entryId, 
lineageEntry.getSegmentsFrom(),
+                  lineageEntry.getSegmentsTo());
+
+              // Update segment lineage entry to 'REVERTED'
+              updateSegmentLineageEntryToReverted(tableNameWithType, 
segmentLineage, entryId, lineageEntry);
+
+              // Add segments for proactive clean-up.
+              segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo());
+            } else if (lineageEntry.getState() == LineageEntryState.COMPLETED 
&& IngestionConfigUtils
+                
.getBatchSegmentIngestionType(tableConfig).equalsIgnoreCase("REFRESH") && 
CollectionUtils
+                .isEqualCollection(segmentsFrom, 
lineageEntry.getSegmentsTo())) {
+              // This part of code assumes that we only allow at most 2 data 
snapshots at a time by proactively
+              // deleting the older snapshots (for REFRESH tables).
+              //
+              // e.g. (Seg_0, Seg_1, Seg_2) -> (Seg_3, Seg_4, Seg_5)  // 
previous lineage
+              //      (Seg_3, Seg_4, Seg_5) -> (Seg_6, Seg_7, Seg_8)  // 
current lineage to be updated
+              // -> proactively delete (Seg_0, Seg_1, Seg_2) since we want to 
keep 2 data snapshots
+              //    (Seg_3, Seg_4, Seg_5), (Seg_6, Seg_7, Seg_8) only to avoid 
the disk space waste.
+              //
+              // TODO: make the number of allowed snapshots configurable to 
allow users to keep at most N snapshots
+              //       of data. We need to traverse the lineage by N steps 
instead of 2 steps. We can build the reverse
+              //       hash map (segmentsTo -> segmentsFrom) and traverse up 
to N times before deleting.
+              //
+              LOGGER.info(
+                  "Proactively deleting the replaced segments for REFRESH 
table to avoid the excessive disk waste. "
+                      + "tableNameWithType={}, segmentsToCleanUp={}", 
tableNameWithType,
+                  lineageEntry.getSegmentsFrom());
+              segmentsToCleanUp.addAll(lineageEntry.getSegmentsFrom());
+            }
           } else {
             // Check that any segment from 'segmentsFrom' does not appear 
twice.
             
Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(),
 segmentsFrom), String
@@ -2844,6 +2878,7 @@ public class PinotHelixResourceManager {
           // Trigger the proactive segment clean up if needed. Once the 
lineage is updated in the property store, it
           // is safe to physically delete segments.
           if (!segmentsToCleanUp.isEmpty()) {
+            LOGGER.info("Cleaning up the segments while startReplaceSegments: 
{}", segmentsToCleanUp);
             deleteSegments(tableNameWithType, segmentsToCleanUp);
           }
           return true;
@@ -2899,10 +2934,12 @@ public class PinotHelixResourceManager {
 
         // NO-OPS if the entry is already 'COMPLETED' or 'REVERTED'
         if (lineageEntry.getState() != LineageEntryState.IN_PROGRESS) {
-          LOGGER.warn("Lineage entry state is not 'IN_PROGRESS'. Cannot update 
to 'COMPLETED'. (tableNameWithType={}, "
-                  + "segmentLineageEntryId={}, state={})", tableNameWithType, 
segmentLineageEntryId,
-              lineageEntry.getState());
-          return true;
+          String errorMsg = String.format(
+              "The target lineage entry state is not 'IN_PROGRESS'. Cannot 
update to 'COMPLETED' state. "
+                  + "(tableNameWithType=%s, segmentLineageEntryId=%s, 
state=%s)", tableNameWithType,
+              segmentLineageEntryId, lineageEntry.getState());
+          LOGGER.error(errorMsg);
+          throw new RuntimeException(errorMsg);
         }
 
         // Check that all the segments from 'segmentsTo' exist in the table
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 9ed7a69..a1679dc 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory;
  */
 public class RetentionManager extends ControllerPeriodicTask<Void> {
   public static final long OLD_LLC_SEGMENTS_RETENTION_IN_MILLIS = 
TimeUnit.DAYS.toMillis(5L);
-
+  private static final long REPLACED_SEGMENTS_RETENTION_IN_MILLIS = 
TimeUnit.DAYS.toMillis(1L); // 1 day
   public static final long LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS = 
TimeUnit.DAYS.toMillis(1L); // 1 day
   private static final RetryPolicy DEFAULT_RETRY_POLICY = 
RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
 
@@ -80,13 +80,20 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
 
   @Override
   protected void processTable(String tableNameWithType) {
+    // Fetch table config
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      LOGGER.error("Failed to get table config for table: {}", 
tableNameWithType);
+      return;
+    }
+
     // Manage normal table retention except segment lineage cleanup.
     // The reason of separating the logic is that REFRESH only table will be 
skipped in the first part,
     // whereas the segment lineage cleanup needs to be handled.
-    manageRetentionForTable(tableNameWithType);
+    manageRetentionForTable(tableConfig);
 
     // Delete segments based on segment lineage and clean up segment lineage 
metadata.
-    manageSegmentLineageCleanupForTable(tableNameWithType);
+    manageSegmentLineageCleanupForTable(tableConfig);
   }
 
   @Override
@@ -95,16 +102,10 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
     
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_deletedSegmentsRetentionInDays);
   }
 
-  private void manageRetentionForTable(String tableNameWithType) {
+  private void manageRetentionForTable(TableConfig tableConfig) {
+    String tableNameWithType = tableConfig.getTableName();
     LOGGER.info("Start managing retention for table: {}", tableNameWithType);
 
-    // Build retention strategy from table config
-    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
-    if (tableConfig == null) {
-      LOGGER.error("Failed to get table config for table: {}", 
tableNameWithType);
-      return;
-    }
-
     // For offline tables, ensure that the segmentPushType is APPEND.
     SegmentsValidationAndRetentionConfig validationConfig = 
tableConfig.getValidationConfig();
     String segmentPushType = 
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
@@ -197,7 +198,8 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
     }
   }
 
-  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+  private void manageSegmentLineageCleanupForTable(TableConfig tableConfig) {
+    String tableNameWithType = tableConfig.getTableName();
     try {
       DEFAULT_RETRY_POLICY.attempt(() -> {
         // Fetch segment lineage
@@ -227,8 +229,11 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
               // the lineage entry
               segmentLineage.deleteLineageEntry(lineageEntryId);
             } else {
-              // If the lineage state is 'COMPLETED', it is safe to delete all 
segments from 'segmentsFrom'
-              segmentsToDelete.addAll(sourceSegments);
+              // If the lineage state is 'COMPLETED' and we already preserved 
the original segments for the required
+              // retention, it is safe to delete all segments from 
'segmentsFrom'
+              if (shouldDeleteReplacedSegments(tableConfig, lineageEntry)) {
+                segmentsToDelete.addAll(sourceSegments);
+              }
             }
           } else if (lineageEntry.getState() == LineageEntryState.REVERTED || (
               lineageEntry.getState() == LineageEntryState.IN_PROGRESS && 
lineageEntry.getTimestamp()
@@ -271,4 +276,26 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
     }
     LOGGER.info("Segment lineage metadata clean-up is successfully processed 
for table: {}", tableNameWithType);
   }
+
+  /**
+   * Helper function to decide whether we should delete segmentsFrom (replaced 
segments) given a lineage entry.
+   *
+   * The replaced segments are safe to delete if the following conditions are 
all satisfied
+   * 1) Table is "APPEND"
+   * 2) It has been more than 24 hours since the lineage entry became 
"COMPLETED" state.
+   *
+   * @param tableConfig a table config
+   * @param lineageEntry lineage entry
+   * @return True if we can safely delete the replaced segments. False 
otherwise.
+   */
+  private boolean shouldDeleteReplacedSegments(TableConfig tableConfig, 
LineageEntry lineageEntry) {
+    // TODO: Currently, we preserve the replaced segments for 1 day for 
REFRESH tables only. Once we support
+    // data rollback for APPEND tables, we should remove this check.
+    String batchSegmentIngestionType = 
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
+    if (!batchSegmentIngestionType.equalsIgnoreCase("REFRESH")
+        || lineageEntry.getTimestamp() < System.currentTimeMillis() - 
REPLACED_SEGMENTS_RETENTION_IN_MILLIS) {
+      return true;
+    }
+    return false;
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index faf594a..9448fdf 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -56,6 +57,8 @@ import org.apache.pinot.spi.config.instance.Instance;
 import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.config.tenant.Tenant;
 import org.apache.pinot.spi.config.tenant.TenantRole;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -86,6 +89,10 @@ public class PinotHelixResourceManagerTest {
   private static final String OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME =
       
TableNameBuilder.OFFLINE.tableNameWithType(SEGMENTS_REPLACE_TEST_TABLE_NAME);
 
+  private static final String SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME = 
"segmentsReplaceTestRefreshTable";
+  private static final String OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME 
=
+      
TableNameBuilder.OFFLINE.tableNameWithType(SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+
   private static final int CONNECTION_TIMEOUT_IN_MILLISECOND = 10_000;
   private static final int MAX_TIMEOUT_IN_MILLISECOND = 5_000;
   private static final int MAXIMUM_NUMBER_OF_CONTROLLER_INSTANCES = 10;
@@ -652,6 +659,181 @@ public class PinotHelixResourceManagerTest {
   }
 
   @Test
+  public void testSegmentReplacementForRefresh()
+      throws IOException, InterruptedException {
+    // Create broker tenant on 1 Brokers
+    Tenant brokerTenant = new Tenant(TenantRole.BROKER, BROKER_TENANT_NAME, 1, 
0, 0);
+    PinotResourceManagerResponse response =
+        
ControllerTestUtils.getHelixResourceManager().createBrokerTenant(brokerTenant);
+    Assert.assertTrue(response.isSuccessful());
+
+    // Create the table
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME)
+            
.setNumReplicas(2).setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME)
+            .setIngestionConfig(
+                new IngestionConfig(new BatchIngestionConfig(null, "REFRESH", 
"DAILY"), null, null, null, null))
+            .build();
+
+    ControllerTestUtils.getHelixResourceManager().addTable(tableConfig);
+
+    for (int i = 0; i < 3; i++) {
+      
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+          
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
 "s" + i),
+          "downloadUrl");
+    }
+    List<String> segmentsForTable = 
ControllerTestUtils.getHelixResourceManager()
+        .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
false);
+    Assert.assertEquals(segmentsForTable.size(), 3);
+
+    List<String> segmentsFrom = Arrays.asList("s0", "s1", "s2");
+    List<String> segmentsTo = Arrays.asList("s3", "s4", "s5");
+
+    String lineageEntryId = ControllerTestUtils.getHelixResourceManager()
+        
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
segmentsFrom, segmentsTo, false);
+    SegmentLineage segmentLineage = SegmentLineageAccessHelper
+        .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsFrom(),
+        Arrays.asList("s0", "s1", "s2"));
+    Assert
+        
.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsTo(), 
Arrays.asList("s3", "s4", "s5"));
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getState(), 
LineageEntryState.IN_PROGRESS);
+    Assert.assertEquals(new 
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+            .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
false)),
+        new HashSet<>(Arrays.asList("s0", "s1", "s2")));
+
+    // Add new segments
+    for (int i = 3; i < 6; i++) {
+      
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+          
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
 "s" + i),
+          "downloadUrl");
+    }
+
+    Assert.assertEquals(ControllerTestUtils.getHelixResourceManager()
+        .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
false).size(), 6);
+    Assert.assertEquals(new 
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+            .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
true)),
+        new HashSet<>(Arrays.asList("s0", "s1", "s2")));
+
+    // Call end segment replacements
+    ControllerTestUtils.getHelixResourceManager()
+        .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
lineageEntryId);
+
+    Assert.assertEquals(ControllerTestUtils.getHelixResourceManager()
+        .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
false).size(), 6);
+    Assert.assertEquals(new 
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+            .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
true)),
+        new HashSet<>(Arrays.asList("s3", "s4", "s5")));
+
+    segmentLineage = SegmentLineageAccessHelper
+        .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsFrom(),
+        Arrays.asList("s0", "s1", "s2"));
+    Assert
+        
.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsTo(), 
Arrays.asList("s3", "s4", "s5"));
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getState(), 
LineageEntryState.COMPLETED);
+
+    // Start the new protocol with "forceCleanup = false" so there will be no 
proactive clean-up happening
+    segmentsFrom = Arrays.asList("s3", "s4", "s5");
+    segmentsTo = Arrays.asList("s6", "s7", "s8");
+
+    String lineageEntryId2 = ControllerTestUtils.getHelixResourceManager()
+        
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
segmentsFrom, segmentsTo, false);
+
+    segmentLineage = SegmentLineageAccessHelper
+        .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(),
+        Arrays.asList("s3", "s4", "s5"));
+    Assert
+        
.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsTo(), 
Arrays.asList("s6", "s7", "s8"));
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), 
LineageEntryState.IN_PROGRESS);
+    Assert.assertEquals(ControllerTestUtils.getHelixResourceManager()
+        .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
false).size(), 6);
+    Assert.assertEquals(new 
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+            .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
true)),
+        new HashSet<>(Arrays.asList("s3", "s4", "s5")));
+
+    // Add partial segments
+    
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+        
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
 "s6"),
+        "downloadUrl");
+
+    Assert.assertEquals(ControllerTestUtils.getHelixResourceManager()
+        .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
false).size(), 7);
+    Assert.assertEquals(new 
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+            .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
true)),
+        new HashSet<>(Arrays.asList("s3", "s4", "s5")));
+
+    // Start the new protocol with "forceCleanup = true" to check if 2 
different proactive clean-up mechanism works:
+    // 1. the previous lineage entry (s3, s4, s5) -> (s6, s7, s8) should be 
"REVERTED"
+    // 2. the older snapshot (s0, s1, s2) needs to be cleaned up because we 
are about to upload the 3rd data snapshot
+    segmentsTo = Arrays.asList("s9", "s10", "s11");
+    String lineageEntryId3 = ControllerTestUtils.getHelixResourceManager()
+        
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
segmentsFrom, segmentsTo, true);
+    segmentLineage = SegmentLineageAccessHelper
+        .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 3);
+
+    // Check that the previous entry gets reverted
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), 
LineageEntryState.REVERTED);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsFrom(),
+        Arrays.asList("s3", "s4", "s5"));
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsTo(),
+        Arrays.asList("s9", "s10", "s11"));
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getState(), 
LineageEntryState.IN_PROGRESS);
+
+    // Check that the segments from the older lineage gets deleted
+    waitForSegmentsToDelete(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
3, MAX_TIMEOUT_IN_MILLISECOND);
+    Assert.assertEquals(ControllerTestUtils.getHelixResourceManager()
+        .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
false).size(), 3);
+    Assert.assertEquals(new 
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+            .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
true)),
+        new HashSet<>(Arrays.asList("s3", "s4", "s5")));
+
+    // Try to invoke end segment replacement for the reverted entry
+    try {
+      ControllerTestUtils.getHelixResourceManager()
+          
.endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
lineageEntryId2);
+    } catch (Exception e) {
+      // expected
+    }
+
+    // Add new segments
+    for (int i = 9; i < 12; i++) {
+      
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+          
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
 "s" + i),
+          "downloadUrl");
+    }
+
+    // Call end segment replacements
+    ControllerTestUtils.getHelixResourceManager()
+        .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
lineageEntryId3);
+    Assert.assertEquals(ControllerTestUtils.getHelixResourceManager()
+        .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
false).size(), 6);
+    Assert.assertEquals(new 
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+            .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
true)),
+        new HashSet<>(Arrays.asList("s9", "s10", "s11")));
+  }
+
+  private void waitForSegmentsToDelete(String tableNameWithType, int 
expectedNumSegmentsAfterDelete,
+      long timeOutInMillis)
+      throws InterruptedException {
+    long endTimeMs = System.currentTimeMillis() + timeOutInMillis;
+    do {
+      if 
(ControllerTestUtils.getHelixResourceManager().getSegmentsFor(tableNameWithType,
 false).size()
+          == expectedNumSegmentsAfterDelete) {
+        return;
+      } else {
+        Thread.sleep(500L);
+      }
+    } while (System.currentTimeMillis() < endTimeMs);
+    throw new RuntimeException("Timeout while waiting for segments to be 
deleted");
+  }
+
+  @Test
   public void testGetLiveBrokersForTable()
       throws IOException {
     // Create broker tenant
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
index b77e211..4a8cb0a 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
@@ -36,6 +36,8 @@ import 
org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.config.tenant.Tenant;
 import org.apache.pinot.spi.config.tenant.TenantRole;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
@@ -54,6 +56,7 @@ public class SegmentLineageCleanupTest {
   private static final long MAX_TIMEOUT_IN_MILLISECOND = 10_000L; // 10 seconds
 
   private static final String OFFLINE_TABLE_NAME = "segmentTable_OFFLINE";
+  private static final String REFRESH_OFFLINE_TABLE_NAME = 
"refreshSegmentTable_OFFLINE";
   private static final String BROKER_TENANT_NAME = "brokerTenant";
   private static final String SERVER_TENANT_NAME = "serverTenant";
   private static final String RETENTION_TIME_UNIT = "DAYS";
@@ -90,20 +93,25 @@ public class SegmentLineageCleanupTest {
         controllerMetrics);
 
     // Update table config
-    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME)
-        .setBrokerTenant(BROKER_TENANT_NAME)
-        .setServerTenant(SERVER_TENANT_NAME)
-        .setNumReplicas(1)
-        .setRetentionTimeUnit(RETENTION_TIME_UNIT)
-        .setRetentionTimeValue(RETENTION_TIME_VALUE)
-        .build();
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME)
+            
.setServerTenant(SERVER_TENANT_NAME).setNumReplicas(1).setRetentionTimeUnit(RETENTION_TIME_UNIT)
+            .setRetentionTimeValue(RETENTION_TIME_VALUE).build();
     ControllerTestUtils.getHelixResourceManager().addTable(tableConfig);
+
+    IngestionConfig ingestionConfig =
+        new IngestionConfig(new BatchIngestionConfig(null, "REFRESH", 
"DAILY"), null, null, null, null);
+    TableConfig refreshTableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(REFRESH_OFFLINE_TABLE_NAME)
+        
.setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).setNumReplicas(1)
+        
.setRetentionTimeUnit(RETENTION_TIME_UNIT).setRetentionTimeValue(RETENTION_TIME_VALUE)
+        .setIngestionConfig(ingestionConfig).build();
+    ControllerTestUtils.getHelixResourceManager().addTable(refreshTableConfig);
   }
 
   @Test
   public void testSegmentLineageCleanup()
       throws IOException, InterruptedException {
-    // Create metadata for original segments
+    // Create metadata for original segments.
     for (int i = 0; i < 5; i++) {
       
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_TABLE_NAME,
           SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"segment_" + i), "downloadUrl");
@@ -118,7 +126,7 @@ public class SegmentLineageCleanupTest {
         7);
     long currentTimeInMillis = System.currentTimeMillis();
 
-    // Validate the case when the lineage entry state is 'IN_PROGRESS'
+    // Validate the case when the lineage entry state is 'IN_PROGRESS'.
     SegmentLineage segmentLineage = new SegmentLineage(OFFLINE_TABLE_NAME);
     segmentLineage.addLineageEntry("0",
         new LineageEntry(Arrays.asList("segment_0", "segment_1"), 
Arrays.asList("merged_0"),
@@ -131,7 +139,7 @@ public class SegmentLineageCleanupTest {
         
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME,
 false);
     Assert.assertEquals(segmentsForTable.size(), 7);
 
-    // Validate the case when the lineage entry state is 'COMPLETED'
+    // Validate the case when the lineage entry state is 'COMPLETED'.
     segmentLineage.updateLineageEntry("0",
         new LineageEntry(Arrays.asList("segment_0", "segment_1"), 
Arrays.asList("merged_0"),
             LineageEntryState.COMPLETED, currentTimeInMillis));
@@ -143,7 +151,7 @@ public class SegmentLineageCleanupTest {
     Assert.assertEquals(segmentsForTable.size(), 5);
     Assert.assertTrue(Collections.disjoint(segmentsForTable, 
Arrays.asList("segment_0", "segment_1")));
 
-    // Validate the case when the lineage entry state is 'COMPLETED' and all 
segments are deleted
+    // Validate the case when the lineage entry state is 'COMPLETED' and all 
segments are deleted.
     
ControllerTestUtils.getHelixResourceManager().deleteSegment(OFFLINE_TABLE_NAME, 
"merged_0");
     waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 4);
     _retentionManager.processTable(OFFLINE_TABLE_NAME);
@@ -151,12 +159,11 @@ public class SegmentLineageCleanupTest {
     segmentsForTable = 
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME,
 false);
     Assert.assertEquals(segmentsForTable.size(), 4);
     Assert.assertTrue(Collections.disjoint(segmentsForTable, 
Arrays.asList("segment_0", "segment_1", "merged_0")));
-    segmentLineage =
-        SegmentLineageAccessHelper
-            
.getSegmentLineage(ControllerTestUtils.getHelixResourceManager().getPropertyStore(),
 OFFLINE_TABLE_NAME);
+    segmentLineage = SegmentLineageAccessHelper
+        
.getSegmentLineage(ControllerTestUtils.getHelixResourceManager().getPropertyStore(),
 OFFLINE_TABLE_NAME);
     Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 0);
 
-    // Validate the case when the lineage entry state is 'IN_PROGRESS' and 
timestamp is old
+    // Validate the case when the lineage entry state is 'IN_PROGRESS' and 
timestamp is old.
     LineageEntry lineageEntry =
         new LineageEntry(Arrays.asList("segment_2", "segment_3"), 
Arrays.asList("merged_1", "merged_2"),
             LineageEntryState.IN_PROGRESS, currentTimeInMillis - 
TimeUnit.DAYS.toMillis(2L));
@@ -168,15 +175,60 @@ public class SegmentLineageCleanupTest {
     segmentsForTable = 
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME,
 false);
     Assert.assertEquals(segmentsForTable.size(), 3);
     Assert.assertTrue(Collections.disjoint(segmentsForTable, 
Arrays.asList("merged_1", "merged_2")));
-    segmentLineage =
-        SegmentLineageAccessHelper
-            
.getSegmentLineage(ControllerTestUtils.getHelixResourceManager().getPropertyStore(),
 OFFLINE_TABLE_NAME);
+    segmentLineage = SegmentLineageAccessHelper
+        
.getSegmentLineage(ControllerTestUtils.getHelixResourceManager().getPropertyStore(),
 OFFLINE_TABLE_NAME);
     Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1);
   }
 
+  @Test
+  public void testRefreshTableCleanup()
+      throws InterruptedException {
+    // Create metadata for original segments
+    for (int i = 0; i < 3; i++) {
+      
ControllerTestUtils.getHelixResourceManager().addNewSegment(REFRESH_OFFLINE_TABLE_NAME,
+          
SegmentMetadataMockUtils.mockSegmentMetadata(REFRESH_OFFLINE_TABLE_NAME, 
"segment1_" + i), "downloadUrl");
+    }
+
+    // Create metadata for new segments.
+    for (int i = 0; i < 3; i++) {
+      
ControllerTestUtils.getHelixResourceManager().addNewSegment(REFRESH_OFFLINE_TABLE_NAME,
+          
SegmentMetadataMockUtils.mockSegmentMetadata(REFRESH_OFFLINE_TABLE_NAME, 
"segment2_" + i), "downloadUrl");
+    }
+
+    Assert.assertEquals(
+        
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(REFRESH_OFFLINE_TABLE_NAME,
 false).size(), 6);
+
+    // Validate the case when the lineage entry state is 'IN_PROGRESS'
+    SegmentLineage segmentLineage = new 
SegmentLineage(REFRESH_OFFLINE_TABLE_NAME);
+    segmentLineage.addLineageEntry("0", new 
LineageEntry(Arrays.asList("segment1_0", "segment1_1", "segment1_2"),
+        Arrays.asList("segment2_0", "segment2_1", "segment2_2"), 
LineageEntryState.IN_PROGRESS,
+        System.currentTimeMillis()));
+    SegmentLineageAccessHelper
+        
.writeSegmentLineage(ControllerTestUtils.getHelixResourceManager().getPropertyStore(),
 segmentLineage, -1);
+    _retentionManager.processTable(REFRESH_OFFLINE_TABLE_NAME);
+    try {
+      waitForSegmentsToDelete(REFRESH_OFFLINE_TABLE_NAME, 3, 1000L);
+      Assert.fail();
+    } catch (Exception e) {
+      // expected since the original segments are not supposed to be 
immediately erased by the retention manager.
+    }
+    List<String> segmentsForTable =
+        
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(REFRESH_OFFLINE_TABLE_NAME,
 false);
+    Assert.assertEquals(segmentsForTable.size(), 6);
+
+    segmentsForTable = 
ControllerTestUtils.getHelixResourceManager().getSegmentsFor(REFRESH_OFFLINE_TABLE_NAME,
 true);
+    Assert.assertEquals(segmentsForTable.size(), 3);
+  }
+
   private void waitForSegmentsToDelete(String tableNameWithType, int 
expectedNumSegmentsAfterDelete)
       throws InterruptedException {
-    long endTimeMs = System.currentTimeMillis() + MAX_TIMEOUT_IN_MILLISECOND;
+    waitForSegmentsToDelete(tableNameWithType, expectedNumSegmentsAfterDelete, 
MAX_TIMEOUT_IN_MILLISECOND);
+  }
+
+  private void waitForSegmentsToDelete(String tableNameWithType, int 
expectedNumSegmentsAfterDelete,
+      long timeOutInMillis)
+      throws InterruptedException {
+    long endTimeMs = System.currentTimeMillis() + timeOutInMillis;
     do {
       if 
(ControllerTestUtils.getHelixResourceManager().getSegmentsFor(tableNameWithType,
 false).size()
           == expectedNumSegmentsAfterDelete) {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to