snleee commented on a change in pull request #7995:
URL: https://github.com/apache/pinot/pull/7995#discussion_r783564616



##########
File path: 
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
##########
@@ -651,6 +658,169 @@ public void testSegmentReplacement()
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getState(), 
LineageEntryState.COMPLETED);
   }
 
+  @Test
+  public void testSegmentReplacementForRefresh()
+      throws IOException, InterruptedException {
+    // Create broker tenant on 1 Brokers
+    Tenant brokerTenant = new Tenant(TenantRole.BROKER, BROKER_TENANT_NAME, 1, 
0, 0);
+    PinotResourceManagerResponse response =
+        
ControllerTestUtils.getHelixResourceManager().createBrokerTenant(brokerTenant);
+    Assert.assertTrue(response.isSuccessful());
+
+    // Create the table
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME)
+            
.setNumReplicas(2).setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME)
+            .setIngestionConfig(
+                new IngestionConfig(new BatchIngestionConfig(null, "REFRESH", 
"DAILY"), null, null, null, null))
+            .build();
+
+    ControllerTestUtils.getHelixResourceManager().addTable(tableConfig);
+
+    for (int i = 0; i < 3; i++) {
+      
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+          
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
 "s" + i),
+          "downloadUrl");
+    }
+    List<String> segmentsForTable = 
ControllerTestUtils.getHelixResourceManager()
+        .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
false);
+    Assert.assertEquals(segmentsForTable.size(), 3);
+
+    List<String> segmentsFrom = Arrays.asList("s0", "s1", "s2");
+    List<String> segmentsTo = Arrays.asList("s3", "s4", "s5");
+
+    String lineageEntryId = ControllerTestUtils.getHelixResourceManager()
+        
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
segmentsFrom, segmentsTo, false);
+    SegmentLineage segmentLineage = SegmentLineageAccessHelper
+        .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsFrom(),
+        Arrays.asList("s0", "s1", "s2"));
+    Assert
+        
.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsTo(), 
Arrays.asList("s3", "s4", "s5"));
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getState(), 
LineageEntryState.IN_PROGRESS);
+    Assert.assertEquals(new 
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+        .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
false)), Set.of("s0", "s1", "s2"));
+
+    // Add new segments
+    for (int i = 3; i < 6; i++) {
+      
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+          
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
 "s" + i),
+          "downloadUrl");
+    }
+
+    Assert.assertEquals(ControllerTestUtils.getHelixResourceManager()
+        .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
false).size(), 6);
+    Assert.assertEquals(new 
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+        .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
true)), Set.of("s0", "s1", "s2"));
+
+    // Call end segment replacements
+    ControllerTestUtils.getHelixResourceManager()
+        .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
lineageEntryId);
+
+    Assert.assertEquals(ControllerTestUtils.getHelixResourceManager()
+        .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
false).size(), 6);
+    Assert.assertEquals(new 
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+        .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
true)), Set.of("s3", "s4", "s5"));
+
+    segmentLineage = SegmentLineageAccessHelper
+        .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsFrom(),
+        Arrays.asList("s0", "s1", "s2"));
+    Assert
+        
.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsTo(), 
Arrays.asList("s3", "s4", "s5"));
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getState(), 
LineageEntryState.COMPLETED);
+
+    // Start the new protocol with "forceCleanup = false" so there will be no 
proactive clean-up happening
+    segmentsFrom = Arrays.asList("s3", "s4", "s5");
+    segmentsTo = Arrays.asList("s6", "s7", "s8");
+
+    String lineageEntryId2 = ControllerTestUtils.getHelixResourceManager()
+        
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
segmentsFrom, segmentsTo, false);
+
+    segmentLineage = SegmentLineageAccessHelper
+        .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(),
+        Arrays.asList("s3", "s4", "s5"));
+    Assert
+        
.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsTo(), 
Arrays.asList("s6", "s7", "s8"));
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), 
LineageEntryState.IN_PROGRESS);
+    Assert.assertEquals(ControllerTestUtils.getHelixResourceManager()
+        .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
false).size(), 6);
+    Assert.assertEquals(new 
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+        .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
true)), Set.of("s3", "s4", "s5"));
+
+    // Add partial segments
+    
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+        
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
 "s6"),
+        "downloadUrl");
+
+    Assert.assertEquals(ControllerTestUtils.getHelixResourceManager()
+        .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
false).size(), 7);
+    Assert.assertEquals(new 
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+        .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
true)), Set.of("s3", "s4", "s5"));
+
+    // Start the new protocol with "forceCleanup = true" to check if 2 
different proactive clean-up mechanism works:
+    // 1. the previous lineage entry (s3, s4, s5) -> (s6, s7, s8) should be 
"REVERTED"
+    // 2. the older snapshot (s0, s1, s2) needs to be cleaned up because we 
are about to upload the 3rd data snapshot
+    segmentsTo = Arrays.asList("s9", "s10", "s11");
+    String lineageEntryId3 = ControllerTestUtils.getHelixResourceManager()
+        
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
segmentsFrom, segmentsTo, true);
+    segmentLineage = SegmentLineageAccessHelper
+        .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 3);
+
+    // Check that the previous entry gets reverted
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), 
LineageEntryState.REVERTED);

Review comment:
       Good catch. Our current behavior when we try to run `endReplaceSegment` 
to the lineage entry with `REVERTED/COMPLTED` was `return true but it's no-op`. 
This will silently return 200 responses to the user. I changed this to throw 
the exception so that the error message gets propagated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to