This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f9e55ce7a6 Handle the race condition for common dest segments in
startReplaceSegments(). (#8667)
f9e55ce7a6 is described below
commit f9e55ce7a60fcdfb0fee688059d50af6b6cd112d
Author: Jiapeng Tao <[email protected]>
AuthorDate: Wed May 11 23:18:40 2022 -0700
Handle the race condition for common dest segments in
startReplaceSegments(). (#8667)
---
.../helix/core/PinotHelixResourceManager.java | 59 +++++++++++++++-----
.../helix/core/PinotHelixResourceManagerTest.java | 62 +++++++++++++++++++---
2 files changed, 100 insertions(+), 21 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 0334721571..4c9f61fc4a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1716,7 +1716,7 @@ public class PinotHelixResourceManager {
throws IOException {
String usernameWithComponent = userConfig.getUsernameWithComponent();
ZKMetadataProvider.setUserConfig(_propertyStore, usernameWithComponent,
- AccessControlUserConfigUtils.toZNRecord(userConfig));
+ AccessControlUserConfigUtils.toZNRecord(userConfig));
}
/**
@@ -3084,16 +3084,18 @@ public class PinotHelixResourceManager {
.disjoint(segmentsFrom, lineageEntry.getSegmentsFrom()) ||
!Collections
.disjoint(segmentsTo, lineageEntry.getSegmentsTo()))) {
LOGGER.info(
- "Detected the incomplete lineage entry with the same
'segmentsFrom' or 'segmentsTo'. Reverting the "
- + "lineage entry to unblock the new segment protocol.
tableNameWithType={}, entryId={}, "
- + "segmentsFrom={}, segmentsTo={}", tableNameWithType,
entryId, lineageEntry.getSegmentsFrom(),
- lineageEntry.getSegmentsTo());
+ "Detected the incomplete lineage entry with overlapped
'segmentsFrom' or 'segmentsTo'. Deleting or "
+ + "reverting the lineage entry to unblock the new
segment protocol. tableNameWithType={}, "
+ + "entryId={}, segmentsFrom={}, segmentsTo={}",
tableNameWithType, entryId,
+ lineageEntry.getSegmentsFrom(),
lineageEntry.getSegmentsTo());
- // Update segment lineage entry to 'REVERTED'
- updateSegmentLineageEntryToReverted(tableNameWithType,
segmentLineage, entryId, lineageEntry);
+ // Delete the 'IN_PROGRESS' entry or update it to 'REVERTED'
+ List<String> segmentsToForRevertedEntry =
+
deleteOrUpdateSegmentLineageEntryToReverted(tableNameWithType, segmentLineage,
entryId, lineageEntry,
+ segmentsTo);
// Add segments for proactive clean-up.
- segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo());
+ segmentsToCleanUp.addAll(segmentsToForRevertedEntry);
} else if (lineageEntry.getState() == LineageEntryState.COMPLETED
&& IngestionConfigUtils
.getBatchSegmentIngestionType(tableConfig).equalsIgnoreCase("REFRESH") &&
CollectionUtils
.isEqualCollection(segmentsFrom,
lineageEntry.getSegmentsTo())) {
@@ -3343,12 +3345,14 @@ public class PinotHelixResourceManager {
private void updateSegmentLineageEntryToReverted(String tableNameWithType,
SegmentLineage segmentLineage,
String segmentLineageEntryId, LineageEntry lineageEntry) {
- // Check that all segments from 'segmentsFrom' are in ONLINE state in the
external view.
- Set<String> onlineSegments =
getOnlineSegmentsFromExternalView(tableNameWithType);
-
Preconditions.checkArgument(onlineSegments.containsAll(lineageEntry.getSegmentsFrom()),
String.format(
- "Failed to update the lineage to be 'REVERTED'. Not all segments from
'segmentFrom' are in ONLINE state "
- + "in the external view. (tableName = '%s', segmentsFrom = '%s',
onlineSegments = '%s'", tableNameWithType,
- lineageEntry.getSegmentsFrom(), onlineSegments));
+ if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+ // Check that all segments from 'segmentsFrom' are in ONLINE state in
the external view.
+ Set<String> onlineSegments =
getOnlineSegmentsFromExternalView(tableNameWithType);
+
Preconditions.checkArgument(onlineSegments.containsAll(lineageEntry.getSegmentsFrom()),
String.format(
+ "Failed to update the lineage to be 'REVERTED'. Not all segments
from 'segmentFrom' are in ONLINE state "
+ + "in the external view. (tableName = '%s', segmentsFrom = '%s',
onlineSegments = '%s'",
+ tableNameWithType, lineageEntry.getSegmentsFrom(), onlineSegments));
+ }
// Update lineage entry
segmentLineage.updateLineageEntry(segmentLineageEntryId,
@@ -3356,6 +3360,33 @@ public class PinotHelixResourceManager {
System.currentTimeMillis()));
}
+ private List<String> deleteOrUpdateSegmentLineageEntryToReverted(String
tableNameWithType,
+ SegmentLineage segmentLineage, String segmentLineageEntryId,
LineageEntry lineageEntry,
+ List<String> newSegments) {
+ // Delete or update segmentsTo of the entry to revert to handle the case
of rerunning the protocol:
+ // Initial state:
+ // Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4], status:
IN_PROGRESS}
+ // 1. Rerunning the protocol with s4 and s5, s4 should not be deleted to
avoid race conditions of concurrent data
+ // pushes and deletions:
+ // Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3], status: REVERTED}
+ // Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s4, s5], status:
IN_PROGRESS}
+ // 2. Rerunning the protocol with s3 and s4, we can simply remove the
'IN_PROGRESS' entry:
+ // Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4], status:
IN_PROGRESS}
+ List<String> segmentsToForEntryToRevert = new
ArrayList<>(lineageEntry.getSegmentsTo());
+ segmentsToForEntryToRevert.removeAll(newSegments);
+
+ if (segmentsToForEntryToRevert.isEmpty()) {
+ // Delete 'IN_PROGRESS' entry if the segmentsTo is empty
+ segmentLineage.deleteLineageEntry(segmentLineageEntryId);
+ } else {
+ // Update the lineage entry to 'REVERTED'
+ segmentLineage.updateLineageEntry(segmentLineageEntryId,
+ new LineageEntry(lineageEntry.getSegmentsFrom(),
segmentsToForEntryToRevert, LineageEntryState.REVERTED,
+ System.currentTimeMillis()));
+ }
+ return segmentsToForEntryToRevert;
+ }
+
private void waitForSegmentsBecomeOnline(String tableNameWithType,
Set<String> segmentsToCheck)
throws InterruptedException, TimeoutException {
long endTimeMs = System.currentTimeMillis() +
EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS;
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index 8d4c4c2489..4cb3d5c89d 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -692,13 +692,12 @@ public class PinotHelixResourceManagerTest {
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getState(),
LineageEntryState.IN_PROGRESS);
// Assuming the replacement fails in the middle, rerunning the protocol
with the same segmentsTo will go through,
- // and revert the previous lineage entry.
+ // and remove the previous lineage entry.
String lineageEntryId6 = ControllerTestUtils.getHelixResourceManager()
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
segmentsFrom, segmentsTo, true);
segmentLineage = SegmentLineageAccessHelper
.getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
- Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 6);
-
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getState(),
LineageEntryState.REVERTED);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 5);
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsFrom(),
segmentsFrom);
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsTo(),
segmentsTo);
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getState(),
LineageEntryState.IN_PROGRESS);
@@ -728,7 +727,7 @@ public class PinotHelixResourceManagerTest {
.endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
lineageEntryId7);
segmentLineage = SegmentLineageAccessHelper
.getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
- Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 7);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 6);
Assert.assertTrue(segmentLineage.getLineageEntry(lineageEntryId7).getSegmentsFrom().isEmpty());
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getSegmentsTo(),
Arrays.asList("s9", "s10"));
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getState(),
LineageEntryState.COMPLETED);
@@ -741,7 +740,7 @@ public class PinotHelixResourceManagerTest {
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
segmentsFrom, segmentsTo, false);
segmentLineage = SegmentLineageAccessHelper
.getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
- Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 8);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 7);
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsFrom(),
segmentsFrom);
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsTo(),
segmentsTo);
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(),
LineageEntryState.IN_PROGRESS);
@@ -771,7 +770,7 @@ public class PinotHelixResourceManagerTest {
.endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
lineageEntryId9);
segmentLineage = SegmentLineageAccessHelper
.getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
- Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 9);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 8);
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsFrom(),
Arrays.asList("s0", "s9"));
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsTo(),
Arrays.asList("s13", "s14"));
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getState(),
LineageEntryState.COMPLETED);
@@ -1038,7 +1037,7 @@ public class PinotHelixResourceManagerTest {
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsTo(),
Arrays.asList("s17", "s18"));
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getState(),
LineageEntryState.COMPLETED);
- // Check partial overlap reverts previous lineage
+ // Check partial overlap of segmentsFrom reverts previous lineage
// Start a new segment replacement with non-empty segmentsFrom.
segmentsFrom = Arrays.asList("s17", "s18");
segmentsTo = Arrays.asList("s19", "s20");
@@ -1081,6 +1080,55 @@ public class PinotHelixResourceManagerTest {
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsFrom(),
Arrays.asList("s14", "s17"));
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsTo(),
Arrays.asList("s21", "s22"));
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(),
LineageEntryState.COMPLETED);
+
+ // Check partial overlap of segmentsTo reverts previous lineage
+ // Start a new segment replacement with non-empty segmentsFrom.
+ segmentsFrom = Arrays.asList("s21", "s22");
+ segmentsTo = Arrays.asList("s23", "s24");
+ String lineageEntryId9 = ControllerTestUtils.getHelixResourceManager()
+
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
segmentsFrom, segmentsTo, false);
+ segmentLineage = SegmentLineageAccessHelper
+ .getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsFrom(),
segmentsFrom);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsTo(),
segmentsTo);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getState(),
LineageEntryState.IN_PROGRESS);
+
+ // Upload data
+
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
"s23"),
+ "downloadUrl");
+
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
"s24"),
+ "downloadUrl");
+
+ // Start another new segment replacement with segmentsTo overlapping with
previous lineage
+ // and check that previous lineages with overlapped segmentsTo are
reverted.
+ segmentsFrom = Arrays.asList("s21", "s22");
+ segmentsTo = Arrays.asList("s24", "s25");
+ String lineageEntryId10 = ControllerTestUtils.getHelixResourceManager()
+
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
segmentsFrom, segmentsTo, true);
+ segmentLineage = SegmentLineageAccessHelper
+ .getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getState(),
LineageEntryState.REVERTED);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsTo(),
Arrays.asList("s23"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId10).getState(),
LineageEntryState.IN_PROGRESS);
+
+ // Finish the replacement
+
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
"s24"),
+ "downloadUrl");
+
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
"s25"),
+ "downloadUrl");
+
+ ControllerTestUtils.getHelixResourceManager()
+ .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
lineageEntryId10);
+ segmentLineage = SegmentLineageAccessHelper
+ .getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+ Assert
+
.assertEquals(segmentLineage.getLineageEntry(lineageEntryId10).getSegmentsFrom(),
Arrays.asList("s21", "s22"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId10).getSegmentsTo(),
Arrays.asList("s24", "s25"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId10).getState(),
LineageEntryState.COMPLETED);
}
private void waitForSegmentsToDelete(String tableNameWithType, int
expectedNumSegmentsAfterDelete,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]