This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 12dd211dda Wait segment deletion in startReplaceSegment api to make 
sure there will be at most 2 data snapshots. (#8838)
12dd211dda is described below

commit 12dd211dda3ec360914fc51acd4763b8a6b17641
Author: Jiapeng Tao <[email protected]>
AuthorDate: Tue Jun 21 14:06:56 2022 -0700

    Wait segment deletion in startReplaceSegment api to make sure there will be 
at most 2 data snapshots. (#8838)
---
 .../helix/core/PinotHelixResourceManager.java         | 19 +++++++++++++++++++
 .../helix/core/PinotHelixResourceManagerTest.java     |  8 ++++++--
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index d5a0fa9550..970d27efb8 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -173,6 +173,8 @@ public class PinotHelixResourceManager {
   // TODO: make this configurable
   public static final long EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS = 10 * 
60_000L; // 10 minutes
   public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1 
second
+  public static final long SEGMENT_CLEANUP_TIMEOUT_MS = 20 * 60_000L; // 20 
minutes
+  public static final long SEGMENT_CLEANUP_CHECK_INTERVAL_MS = 1_000L; // 1 
second
 
   private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new 
SimpleDateFormat("yyyyMMdd'T'HHmmss'Z'");
 
@@ -3165,6 +3167,7 @@ public class PinotHelixResourceManager {
           if (!segmentsToCleanUp.isEmpty()) {
             LOGGER.info("Cleaning up the segments while startReplaceSegments: 
{}", segmentsToCleanUp);
             deleteSegments(tableNameWithType, segmentsToCleanUp);
+            waitForSegmentsToDelete(tableNameWithType, segmentsToCleanUp, 
SEGMENT_CLEANUP_TIMEOUT_MS);
           }
           return true;
         } else {
@@ -3185,6 +3188,22 @@ public class PinotHelixResourceManager {
     return segmentLineageEntryId;
   }
 
+  private void waitForSegmentsToDelete(String tableNameWithType, List<String> 
segments, long timeOutInMillis)
+      throws InterruptedException {
+    LOGGER.info("Waiting for {} segments to delete for table: {}. timeout = 
{}ms, segments = {}", segments.size(),
+        tableNameWithType, timeOutInMillis, segments);
+    long endTimeMs = System.currentTimeMillis() + timeOutInMillis;
+    do {
+      if (Collections.disjoint(getSegmentsFor(tableNameWithType, false), 
segments)) {
+        return;
+      } else {
+        Thread.sleep(SEGMENT_CLEANUP_CHECK_INTERVAL_MS);
+      }
+    } while (System.currentTimeMillis() < endTimeMs);
+    throw new RuntimeException("Timeout while waiting for segments to be 
deleted for table: " + tableNameWithType
+        + ", timeout: " + timeOutInMillis + "ms");
+  }
+
   /**
    * Computes the end segment replace phase
    *
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index bd2f91a12b..85ae08c4e6 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -915,7 +915,9 @@ public class PinotHelixResourceManagerTest {
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getState(), 
LineageEntryState.IN_PROGRESS);
 
     // Check that the segments from the older lineage gets deleted
-    waitForSegmentsToDelete(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
3, TIMEOUT_IN_MS);
+    Collections.disjoint(
+        
TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
 false),
+        Arrays.asList("s6", "s7", "s8"));
     Assert.assertEquals(TEST_INSTANCE.getHelixResourceManager()
         .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
false).size(), 3);
     Assert.assertEquals(new HashSet<>(TEST_INSTANCE.getHelixResourceManager()
@@ -973,7 +975,9 @@ public class PinotHelixResourceManagerTest {
     segmentsTo = Arrays.asList("s12", "s13", "s14");
     String lineageEntryId4 = TEST_INSTANCE.getHelixResourceManager()
         
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
segmentsFrom, segmentsTo, true);
-    waitForSegmentsToDelete(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
3, TIMEOUT_IN_MS);
+    Collections.disjoint(
+        
TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
 false),
+        Arrays.asList("s9", "s10", "s11"));
     Assert.assertEquals(new HashSet<>(TEST_INSTANCE.getHelixResourceManager()
             .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
false)),
         new HashSet<>(Arrays.asList("s3", "s4", "s5")));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to