This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9bc1e059fb Allow rerunning startReplaceSegment with the same
segmentsTo if the previous protocol fails in the middle (lineage entry in
IN_PROGRESS status). (#8639)
9bc1e059fb is described below
commit 9bc1e059fb5974a8c49523d4e9be1bd47bd88181
Author: Jiapeng Tao <[email protected]>
AuthorDate: Wed May 4 13:43:30 2022 -0700
Allow rerunning startReplaceSegment with the same segmentsTo if the
previous protocol fails in the middle (lineage entry in IN_PROGRESS status).
(#8639)
---
.../helix/core/PinotHelixResourceManager.java | 18 +++----
.../helix/core/PinotHelixResourceManagerTest.java | 56 +++++++++++++---------
2 files changed, 44 insertions(+), 30 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index f32e2e75e9..0e4d25c9ab 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -3010,7 +3010,7 @@ public class PinotHelixResourceManager {
String segmentLineageEntryId =
SegmentLineageUtils.generateLineageEntryId();
// Check that all the segments from 'segmentsFrom' exist in the table
- Set<String> segmentsForTable = new
HashSet<>(getSegmentsFor(tableNameWithType, false));
+ Set<String> segmentsForTable = new
HashSet<>(getSegmentsFor(tableNameWithType, true));
Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom),
String.format(
"Not all segments from 'segmentsFrom' are available in the table.
(tableName = '%s', segmentsFrom = '%s', "
+ "segmentsTo = '%s', segmentsFromTable = '%s')",
tableNameWithType, segmentsFrom, segmentsTo,
@@ -3060,17 +3060,19 @@ public class PinotHelixResourceManager {
// By here, the lineage entry is either 'IN_PROGRESS' or 'COMPLETED'.
// When 'forceCleanup' is enabled, we need to proactively clean up
at the following cases:
- // 1. Revert the lineage entry when we find the lineage entry with
overlapped 'segmentFrom' values. This is
- // used to un-block the segment replacement protocol if the
previous attempt failed in the middle.
+ // 1. Revert the lineage entry when we find the lineage entry with
overlapped 'segmentsFrom' or 'segmentsTo'
+ // values. This is used to un-block the segment replacement
protocol if the previous attempt failed in the
+ // middle.
// 2. Proactively delete the oldest data snapshot to make sure that
we only keep at most 2 data snapshots
// at any time in case of REFRESH use case.
if (forceCleanup) {
- if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS &&
!Collections
- .disjoint(segmentsFrom, lineageEntry.getSegmentsFrom())) {
+ if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS &&
(!Collections
+ .disjoint(segmentsFrom, lineageEntry.getSegmentsFrom()) ||
!Collections
+ .disjoint(segmentsTo, lineageEntry.getSegmentsTo()))) {
LOGGER.info(
- "Detected the incomplete lineage entry with the same
'segmentsFrom'. Reverting the lineage "
- + "entry to unblock the new segment protocol.
tableNameWithType={}, entryId={}, segmentsFrom={}, "
- + "segmentsTo={}", tableNameWithType, entryId,
lineageEntry.getSegmentsFrom(),
+ "Detected the incomplete lineage entry with the same
'segmentsFrom' or 'segmentsTo'. Reverting the "
+ + "lineage entry to unblock the new segment protocol.
tableNameWithType={}, entryId={}, "
+ + "segmentsFrom={}, segmentsTo={}", tableNameWithType,
entryId, lineageEntry.getSegmentsFrom(),
lineageEntry.getSegmentsTo());
// Update segment lineage entry to 'REVERTED'
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index 996f5b521c..8d4c4c2489 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -691,6 +691,18 @@ public class PinotHelixResourceManagerTest {
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getSegmentsTo(),
segmentsTo);
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getState(),
LineageEntryState.IN_PROGRESS);
+ // Assuming the replacement fails in the middle, rerunning the protocol
with the same segmentsTo will go through,
+ // and revert the previous lineage entry.
+ String lineageEntryId6 = ControllerTestUtils.getHelixResourceManager()
+ .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
segmentsFrom, segmentsTo, true);
+ segmentLineage = SegmentLineageAccessHelper
+ .getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 6);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getState(),
LineageEntryState.REVERTED);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsFrom(),
segmentsFrom);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsTo(),
segmentsTo);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getState(),
LineageEntryState.IN_PROGRESS);
+
// Upload partial data
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
"s7"), "downloadUrl");
@@ -698,12 +710,12 @@ public class PinotHelixResourceManagerTest {
// Start another new segment replacement with empty segmentsFrom,
// and check that previous lineages with empty segmentsFrom are not
reverted.
segmentsTo = Arrays.asList("s9", "s10");
- String lineageEntryId6 = ControllerTestUtils.getHelixResourceManager()
+ String lineageEntryId7 = ControllerTestUtils.getHelixResourceManager()
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
segmentsFrom, segmentsTo, true);
segmentLineage = SegmentLineageAccessHelper
.getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
-
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getState(),
LineageEntryState.IN_PROGRESS);
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getState(),
LineageEntryState.IN_PROGRESS);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getState(),
LineageEntryState.IN_PROGRESS);
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getState(),
LineageEntryState.COMPLETED);
// Finish the replacement
@@ -713,26 +725,26 @@ public class PinotHelixResourceManagerTest {
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
"s10"), "downloadUrl");
ControllerTestUtils.getHelixResourceManager()
- .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
lineageEntryId6);
+ .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
lineageEntryId7);
segmentLineage = SegmentLineageAccessHelper
.getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
- Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 6);
-
Assert.assertTrue(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsFrom().isEmpty());
-
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsTo(),
Arrays.asList("s9", "s10"));
-
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getState(),
LineageEntryState.COMPLETED);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 7);
+
Assert.assertTrue(segmentLineage.getLineageEntry(lineageEntryId7).getSegmentsFrom().isEmpty());
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getSegmentsTo(),
Arrays.asList("s9", "s10"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getState(),
LineageEntryState.COMPLETED);
// Check partial overlap reverts previous lineage
// Start a new segment replacement with non-empty segmentsFrom.
segmentsFrom = Arrays.asList("s9", "s10");
segmentsTo = Arrays.asList("s11", "s12");
- String lineageEntryId7 = ControllerTestUtils.getHelixResourceManager()
+ String lineageEntryId8 = ControllerTestUtils.getHelixResourceManager()
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
segmentsFrom, segmentsTo, false);
segmentLineage = SegmentLineageAccessHelper
.getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
- Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 7);
-
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getSegmentsFrom(),
segmentsFrom);
-
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getSegmentsTo(),
segmentsTo);
-
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getState(),
LineageEntryState.IN_PROGRESS);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 8);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsFrom(),
segmentsFrom);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsTo(),
segmentsTo);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(),
LineageEntryState.IN_PROGRESS);
// Upload partial data
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
@@ -742,12 +754,12 @@ public class PinotHelixResourceManagerTest {
// and check that previous lineages with overlapped segmentsFrom are
reverted.
segmentsFrom = Arrays.asList("s0", "s9");
segmentsTo = Arrays.asList("s13", "s14");
- String lineageEntryId8 = ControllerTestUtils.getHelixResourceManager()
+ String lineageEntryId9 = ControllerTestUtils.getHelixResourceManager()
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
segmentsFrom, segmentsTo, true);
segmentLineage = SegmentLineageAccessHelper
.getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
-
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getState(),
LineageEntryState.REVERTED);
-
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(),
LineageEntryState.IN_PROGRESS);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(),
LineageEntryState.REVERTED);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getState(),
LineageEntryState.IN_PROGRESS);
// Finish the replacement
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
@@ -756,18 +768,18 @@ public class PinotHelixResourceManagerTest {
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
"s14"), "downloadUrl");
ControllerTestUtils.getHelixResourceManager()
- .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
lineageEntryId8);
+ .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
lineageEntryId9);
segmentLineage = SegmentLineageAccessHelper
.getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
- Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 8);
-
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsFrom(),
Arrays.asList("s0", "s9"));
-
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsTo(),
Arrays.asList("s13", "s14"));
-
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(),
LineageEntryState.COMPLETED);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 9);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsFrom(),
Arrays.asList("s0", "s9"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsTo(),
Arrays.asList("s13", "s14"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getState(),
LineageEntryState.COMPLETED);
// Check endReplaceSegments is idempotent
ControllerTestUtils.getHelixResourceManager()
- .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
lineageEntryId8);
-
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(),
LineageEntryState.COMPLETED);
+ .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
lineageEntryId9);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getState(),
LineageEntryState.COMPLETED);
}
private void testSegmentReplacementForRefresh()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]