This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f9e55ce7a6 Handle the race condition for common dest segments in 
startReplaceSegments(). (#8667)
f9e55ce7a6 is described below

commit f9e55ce7a60fcdfb0fee688059d50af6b6cd112d
Author: Jiapeng Tao <[email protected]>
AuthorDate: Wed May 11 23:18:40 2022 -0700

    Handle the race condition for common dest segments in 
startReplaceSegments(). (#8667)
---
 .../helix/core/PinotHelixResourceManager.java      | 59 +++++++++++++++-----
 .../helix/core/PinotHelixResourceManagerTest.java  | 62 +++++++++++++++++++---
 2 files changed, 100 insertions(+), 21 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 0334721571..4c9f61fc4a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1716,7 +1716,7 @@ public class PinotHelixResourceManager {
       throws IOException {
     String usernameWithComponent = userConfig.getUsernameWithComponent();
     ZKMetadataProvider.setUserConfig(_propertyStore, usernameWithComponent,
-      AccessControlUserConfigUtils.toZNRecord(userConfig));
+        AccessControlUserConfigUtils.toZNRecord(userConfig));
   }
 
   /**
@@ -3084,16 +3084,18 @@ public class PinotHelixResourceManager {
                 .disjoint(segmentsFrom, lineageEntry.getSegmentsFrom()) || 
!Collections
                 .disjoint(segmentsTo, lineageEntry.getSegmentsTo()))) {
               LOGGER.info(
-                  "Detected the incomplete lineage entry with the same 
'segmentsFrom' or 'segmentsTo'. Reverting the "
-                      + "lineage entry to unblock the new segment protocol. 
tableNameWithType={}, entryId={}, "
-                      + "segmentsFrom={}, segmentsTo={}", tableNameWithType, 
entryId, lineageEntry.getSegmentsFrom(),
-                  lineageEntry.getSegmentsTo());
+                  "Detected the incomplete lineage entry with overlapped 
'segmentsFrom' or 'segmentsTo'. Deleting or "
+                      + "reverting the lineage entry to unblock the new 
segment protocol. tableNameWithType={}, "
+                      + "entryId={}, segmentsFrom={}, segmentsTo={}", 
tableNameWithType, entryId,
+                  lineageEntry.getSegmentsFrom(), 
lineageEntry.getSegmentsTo());
 
-              // Update segment lineage entry to 'REVERTED'
-              updateSegmentLineageEntryToReverted(tableNameWithType, 
segmentLineage, entryId, lineageEntry);
+              // Delete the 'IN_PROGRESS' entry or update it to 'REVERTED'
+              List<String> segmentsToForRevertedEntry =
+                  
deleteOrUpdateSegmentLineageEntryToReverted(tableNameWithType, segmentLineage, 
entryId, lineageEntry,
+                      segmentsTo);
 
               // Add segments for proactive clean-up.
-              segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo());
+              segmentsToCleanUp.addAll(segmentsToForRevertedEntry);
             } else if (lineageEntry.getState() == LineageEntryState.COMPLETED 
&& IngestionConfigUtils
                 
.getBatchSegmentIngestionType(tableConfig).equalsIgnoreCase("REFRESH") && 
CollectionUtils
                 .isEqualCollection(segmentsFrom, 
lineageEntry.getSegmentsTo())) {
@@ -3343,12 +3345,14 @@ public class PinotHelixResourceManager {
 
   private void updateSegmentLineageEntryToReverted(String tableNameWithType, 
SegmentLineage segmentLineage,
       String segmentLineageEntryId, LineageEntry lineageEntry) {
-    // Check that all segments from 'segmentsFrom' are in ONLINE state in the 
external view.
-    Set<String> onlineSegments = 
getOnlineSegmentsFromExternalView(tableNameWithType);
-    
Preconditions.checkArgument(onlineSegments.containsAll(lineageEntry.getSegmentsFrom()),
 String.format(
-        "Failed to update the lineage to be 'REVERTED'. Not all segments from 
'segmentFrom' are in ONLINE state "
-            + "in the external view. (tableName = '%s', segmentsFrom = '%s', 
onlineSegments = '%s'", tableNameWithType,
-        lineageEntry.getSegmentsFrom(), onlineSegments));
+    if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+      // Check that all segments from 'segmentsFrom' are in ONLINE state in 
the external view.
+      Set<String> onlineSegments = 
getOnlineSegmentsFromExternalView(tableNameWithType);
+      
Preconditions.checkArgument(onlineSegments.containsAll(lineageEntry.getSegmentsFrom()),
 String.format(
+          "Failed to update the lineage to be 'REVERTED'. Not all segments 
from 'segmentFrom' are in ONLINE state "
+              + "in the external view. (tableName = '%s', segmentsFrom = '%s', 
onlineSegments = '%s'",
+          tableNameWithType, lineageEntry.getSegmentsFrom(), onlineSegments));
+    }
 
     // Update lineage entry
     segmentLineage.updateLineageEntry(segmentLineageEntryId,
@@ -3356,6 +3360,33 @@ public class PinotHelixResourceManager {
             System.currentTimeMillis()));
   }
 
+  private List<String> deleteOrUpdateSegmentLineageEntryToReverted(String 
tableNameWithType,
+      SegmentLineage segmentLineage, String segmentLineageEntryId, 
LineageEntry lineageEntry,
+      List<String> newSegments) {
+    // Delete or update segmentsTo of the entry to revert to handle the case 
of rerunning the protocol:
+    // Initial state:
+    //   Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4], status: 
IN_PROGRESS}
+    // 1. Rerunning the protocol with s4 and s5, s4 should not be deleted to 
avoid race conditions of concurrent data
+    // pushes and deletions:
+    //   Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3], status: REVERTED}
+    //   Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s4, s5], status: 
IN_PROGRESS}
+    // 2. Rerunning the protocol with s3 and s4, we can simply remove the 
'IN_PROGRESS' entry:
+    //   Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4], status: 
IN_PROGRESS}
+    List<String> segmentsToForEntryToRevert = new 
ArrayList<>(lineageEntry.getSegmentsTo());
+    segmentsToForEntryToRevert.removeAll(newSegments);
+
+    if (segmentsToForEntryToRevert.isEmpty()) {
+      // Delete 'IN_PROGRESS' entry if the segmentsTo is empty
+      segmentLineage.deleteLineageEntry(segmentLineageEntryId);
+    } else {
+      // Update the lineage entry to 'REVERTED'
+      segmentLineage.updateLineageEntry(segmentLineageEntryId,
+          new LineageEntry(lineageEntry.getSegmentsFrom(), 
segmentsToForEntryToRevert, LineageEntryState.REVERTED,
+              System.currentTimeMillis()));
+    }
+    return segmentsToForEntryToRevert;
+  }
+
   private void waitForSegmentsBecomeOnline(String tableNameWithType, 
Set<String> segmentsToCheck)
       throws InterruptedException, TimeoutException {
     long endTimeMs = System.currentTimeMillis() + 
EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index 8d4c4c2489..4cb3d5c89d 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -692,13 +692,12 @@ public class PinotHelixResourceManagerTest {
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getState(), 
LineageEntryState.IN_PROGRESS);
 
     // Assuming the replacement fails in the middle, rerunning the protocol 
with the same segmentsTo will go through,
-    // and revert the previous lineage entry.
+    // and remove the previous lineage entry.
     String lineageEntryId6 = ControllerTestUtils.getHelixResourceManager()
         .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, 
segmentsFrom, segmentsTo, true);
     segmentLineage = SegmentLineageAccessHelper
         .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
-    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 6);
-    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getState(), 
LineageEntryState.REVERTED);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 5);
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsFrom(),
 segmentsFrom);
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsTo(),
 segmentsTo);
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getState(), 
LineageEntryState.IN_PROGRESS);
@@ -728,7 +727,7 @@ public class PinotHelixResourceManagerTest {
         .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, 
lineageEntryId7);
     segmentLineage = SegmentLineageAccessHelper
         .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
-    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 7);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 6);
     
Assert.assertTrue(segmentLineage.getLineageEntry(lineageEntryId7).getSegmentsFrom().isEmpty());
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getSegmentsTo(),
 Arrays.asList("s9", "s10"));
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getState(), 
LineageEntryState.COMPLETED);
@@ -741,7 +740,7 @@ public class PinotHelixResourceManagerTest {
         .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, 
segmentsFrom, segmentsTo, false);
     segmentLineage = SegmentLineageAccessHelper
         .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
-    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 8);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 7);
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsFrom(),
 segmentsFrom);
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsTo(),
 segmentsTo);
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(), 
LineageEntryState.IN_PROGRESS);
@@ -771,7 +770,7 @@ public class PinotHelixResourceManagerTest {
         .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, 
lineageEntryId9);
     segmentLineage = SegmentLineageAccessHelper
         .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
-    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 9);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 8);
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsFrom(),
 Arrays.asList("s0", "s9"));
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsTo(),
 Arrays.asList("s13", "s14"));
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getState(), 
LineageEntryState.COMPLETED);
@@ -1038,7 +1037,7 @@ public class PinotHelixResourceManagerTest {
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsTo(),
 Arrays.asList("s17", "s18"));
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getState(), 
LineageEntryState.COMPLETED);
 
-    // Check partial overlap reverts previous lineage
+    // Check partial overlap of segmentsFrom reverts previous lineage
     // Start a new segment replacement with non-empty segmentsFrom.
     segmentsFrom = Arrays.asList("s17", "s18");
     segmentsTo = Arrays.asList("s19", "s20");
@@ -1081,6 +1080,55 @@ public class PinotHelixResourceManagerTest {
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsFrom(),
 Arrays.asList("s14", "s17"));
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsTo(),
 Arrays.asList("s21", "s22"));
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(), 
LineageEntryState.COMPLETED);
+
+    // Check partial overlap of segmentsTo reverts previous lineage
+    // Start a new segment replacement with non-empty segmentsFrom.
+    segmentsFrom = Arrays.asList("s21", "s22");
+    segmentsTo = Arrays.asList("s23", "s24");
+    String lineageEntryId9 = ControllerTestUtils.getHelixResourceManager()
+        
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
segmentsFrom, segmentsTo, false);
+    segmentLineage = SegmentLineageAccessHelper
+        .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsFrom(),
 segmentsFrom);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsTo(),
 segmentsTo);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getState(), 
LineageEntryState.IN_PROGRESS);
+
+    // Upload data
+    
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+        
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
 "s23"),
+        "downloadUrl");
+    
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+        
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
 "s24"),
+        "downloadUrl");
+
+    // Start another new segment replacement with segmentsTo overlapping with 
previous lineage
+    // and check that previous lineages with overlapped segmentsTo are 
reverted.
+    segmentsFrom = Arrays.asList("s21", "s22");
+    segmentsTo = Arrays.asList("s24", "s25");
+    String lineageEntryId10 = ControllerTestUtils.getHelixResourceManager()
+        
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
segmentsFrom, segmentsTo, true);
+    segmentLineage = SegmentLineageAccessHelper
+        .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getState(), 
LineageEntryState.REVERTED);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsTo(),
 Arrays.asList("s23"));
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId10).getState(),
 LineageEntryState.IN_PROGRESS);
+
+    // Finish the replacement
+    
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+        
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
 "s24"),
+        "downloadUrl");
+    
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+        
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
 "s25"),
+        "downloadUrl");
+
+    ControllerTestUtils.getHelixResourceManager()
+        .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
lineageEntryId10);
+    segmentLineage = SegmentLineageAccessHelper
+        .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+    Assert
+        
.assertEquals(segmentLineage.getLineageEntry(lineageEntryId10).getSegmentsFrom(),
 Arrays.asList("s21", "s22"));
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId10).getSegmentsTo(),
 Arrays.asList("s24", "s25"));
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId10).getState(),
 LineageEntryState.COMPLETED);
   }
 
   private void waitForSegmentsToDelete(String tableNameWithType, int 
expectedNumSegmentsAfterDelete,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to