This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 12dd211dda Wait segment deletion in startReplaceSegment api to make
sure there will be at most 2 data snapshots. (#8838)
12dd211dda is described below
commit 12dd211dda3ec360914fc51acd4763b8a6b17641
Author: Jiapeng Tao <[email protected]>
AuthorDate: Tue Jun 21 14:06:56 2022 -0700
Wait segment deletion in startReplaceSegment api to make sure there will be
at most 2 data snapshots. (#8838)
---
.../helix/core/PinotHelixResourceManager.java | 19 +++++++++++++++++++
.../helix/core/PinotHelixResourceManagerTest.java | 8 ++++++--
2 files changed, 25 insertions(+), 2 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index d5a0fa9550..970d27efb8 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -173,6 +173,8 @@ public class PinotHelixResourceManager {
// TODO: make this configurable
public static final long EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS = 10 *
60_000L; // 10 minutes
public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1
second
+ public static final long SEGMENT_CLEANUP_TIMEOUT_MS = 20 * 60_000L; // 20
minutes
+ public static final long SEGMENT_CLEANUP_CHECK_INTERVAL_MS = 1_000L; // 1
second
private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new
SimpleDateFormat("yyyyMMdd'T'HHmmss'Z'");
@@ -3165,6 +3167,7 @@ public class PinotHelixResourceManager {
if (!segmentsToCleanUp.isEmpty()) {
LOGGER.info("Cleaning up the segments while startReplaceSegments:
{}", segmentsToCleanUp);
deleteSegments(tableNameWithType, segmentsToCleanUp);
+ waitForSegmentsToDelete(tableNameWithType, segmentsToCleanUp,
SEGMENT_CLEANUP_TIMEOUT_MS);
}
return true;
} else {
@@ -3185,6 +3188,22 @@ public class PinotHelixResourceManager {
return segmentLineageEntryId;
}
+ private void waitForSegmentsToDelete(String tableNameWithType, List<String>
segments, long timeOutInMillis)
+ throws InterruptedException {
+ LOGGER.info("Waiting for {} segments to delete for table: {}. timeout =
{}ms, segments = {}", segments.size(),
+ tableNameWithType, timeOutInMillis, segments);
+ long endTimeMs = System.currentTimeMillis() + timeOutInMillis;
+ do {
+ if (Collections.disjoint(getSegmentsFor(tableNameWithType, false),
segments)) {
+ return;
+ } else {
+ Thread.sleep(SEGMENT_CLEANUP_CHECK_INTERVAL_MS);
+ }
+ } while (System.currentTimeMillis() < endTimeMs);
+ throw new RuntimeException("Timeout while waiting for segments to be
deleted for table: " + tableNameWithType
+ + ", timeout: " + timeOutInMillis + "ms");
+ }
+
/**
* Computes the end segment replace phase
*
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index bd2f91a12b..85ae08c4e6 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -915,7 +915,9 @@ public class PinotHelixResourceManagerTest {
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getState(),
LineageEntryState.IN_PROGRESS);
// Check that the segments from the older lineage gets deleted
- waitForSegmentsToDelete(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
3, TIMEOUT_IN_MS);
+ Collections.disjoint(
+
TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
false),
+ Arrays.asList("s6", "s7", "s8"));
Assert.assertEquals(TEST_INSTANCE.getHelixResourceManager()
.getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
false).size(), 3);
Assert.assertEquals(new HashSet<>(TEST_INSTANCE.getHelixResourceManager()
@@ -973,7 +975,9 @@ public class PinotHelixResourceManagerTest {
segmentsTo = Arrays.asList("s12", "s13", "s14");
String lineageEntryId4 = TEST_INSTANCE.getHelixResourceManager()
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
segmentsFrom, segmentsTo, true);
- waitForSegmentsToDelete(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
3, TIMEOUT_IN_MS);
+ Collections.disjoint(
+
TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
false),
+ Arrays.asList("s9", "s10", "s11"));
Assert.assertEquals(new HashSet<>(TEST_INSTANCE.getHelixResourceManager()
.getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
false)),
new HashSet<>(Arrays.asList("s3", "s4", "s5")));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]