snleee commented on a change in pull request #7995:
URL: https://github.com/apache/pinot/pull/7995#discussion_r783564616
##########
File path:
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
##########
@@ -651,6 +658,169 @@ public void testSegmentReplacement()
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getState(),
LineageEntryState.COMPLETED);
}
+ @Test
+ public void testSegmentReplacementForRefresh()
+ throws IOException, InterruptedException {
+ // Create broker tenant on 1 Brokers
+ Tenant brokerTenant = new Tenant(TenantRole.BROKER, BROKER_TENANT_NAME, 1,
0, 0);
+ PinotResourceManagerResponse response =
+
ControllerTestUtils.getHelixResourceManager().createBrokerTenant(brokerTenant);
+ Assert.assertTrue(response.isSuccessful());
+
+ // Create the table
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME)
+
.setNumReplicas(2).setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME)
+ .setIngestionConfig(
+ new IngestionConfig(new BatchIngestionConfig(null, "REFRESH",
"DAILY"), null, null, null, null))
+ .build();
+
+ ControllerTestUtils.getHelixResourceManager().addTable(tableConfig);
+
+ for (int i = 0; i < 3; i++) {
+
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
"s" + i),
+ "downloadUrl");
+ }
+ List<String> segmentsForTable =
ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
false);
+ Assert.assertEquals(segmentsForTable.size(), 3);
+
+ List<String> segmentsFrom = Arrays.asList("s0", "s1", "s2");
+ List<String> segmentsTo = Arrays.asList("s3", "s4", "s5");
+
+ String lineageEntryId = ControllerTestUtils.getHelixResourceManager()
+
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
segmentsFrom, segmentsTo, false);
+ SegmentLineage segmentLineage = SegmentLineageAccessHelper
+ .getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsFrom(),
+ Arrays.asList("s0", "s1", "s2"));
+ Assert
+
.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsTo(),
Arrays.asList("s3", "s4", "s5"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getState(),
LineageEntryState.IN_PROGRESS);
+ Assert.assertEquals(new
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
false)), Set.of("s0", "s1", "s2"));
+
+ // Add new segments
+ for (int i = 3; i < 6; i++) {
+
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
"s" + i),
+ "downloadUrl");
+ }
+
+ Assert.assertEquals(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
false).size(), 6);
+ Assert.assertEquals(new
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
true)), Set.of("s0", "s1", "s2"));
+
+ // Call end segment replacements
+ ControllerTestUtils.getHelixResourceManager()
+ .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
lineageEntryId);
+
+ Assert.assertEquals(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
false).size(), 6);
+ Assert.assertEquals(new
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
true)), Set.of("s3", "s4", "s5"));
+
+ segmentLineage = SegmentLineageAccessHelper
+ .getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsFrom(),
+ Arrays.asList("s0", "s1", "s2"));
+ Assert
+
.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsTo(),
Arrays.asList("s3", "s4", "s5"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getState(),
LineageEntryState.COMPLETED);
+
+ // Start the new protocol with "forceCleanup = false" so there will be no
proactive clean-up happening
+ segmentsFrom = Arrays.asList("s3", "s4", "s5");
+ segmentsTo = Arrays.asList("s6", "s7", "s8");
+
+ String lineageEntryId2 = ControllerTestUtils.getHelixResourceManager()
+
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
segmentsFrom, segmentsTo, false);
+
+ segmentLineage = SegmentLineageAccessHelper
+ .getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(),
+ Arrays.asList("s3", "s4", "s5"));
+ Assert
+
.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsTo(),
Arrays.asList("s6", "s7", "s8"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(),
LineageEntryState.IN_PROGRESS);
+ Assert.assertEquals(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
false).size(), 6);
+ Assert.assertEquals(new
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
true)), Set.of("s3", "s4", "s5"));
+
+ // Add partial segments
+
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
"s6"),
+ "downloadUrl");
+
+ Assert.assertEquals(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
false).size(), 7);
+ Assert.assertEquals(new
HashSet<>(ControllerTestUtils.getHelixResourceManager()
+ .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
true)), Set.of("s3", "s4", "s5"));
+
+ // Start the new protocol with "forceCleanup = true" to check if 2
different proactive clean-up mechanism works:
+ // 1. the previous lineage entry (s3, s4, s5) -> (s6, s7, s8) should be
"REVERTED"
+ // 2. the older snapshot (s0, s1, s2) needs to be cleaned up because we
are about to upload the 3rd data snapshot
+ segmentsTo = Arrays.asList("s9", "s10", "s11");
+ String lineageEntryId3 = ControllerTestUtils.getHelixResourceManager()
+
.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
segmentsFrom, segmentsTo, true);
+ segmentLineage = SegmentLineageAccessHelper
+ .getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME);
+
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 3);
+
+ // Check that the previous entry gets reverted
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(),
LineageEntryState.REVERTED);
Review comment:
Good catch. Our current behavior when we try to run `endReplaceSegment`
to the lineage entry with `REVERTED/COMPLTED` was `return true but it's no-op`.
This will silently return 200 responses to the user. I changed this to throw
the exception so that the error message gets propagated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]