This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e3d238a Add forceCleanup option for 'startReplaceSegments' API (#7744)
e3d238a is described below
commit e3d238ac1d8633331d9507713266e41e6b40f870
Author: Seunghyun Lee <[email protected]>
AuthorDate: Thu Nov 11 18:52:49 2021 -0800
Add forceCleanup option for 'startReplaceSegments' API (#7744)
---
.../PinotSegmentUploadDownloadRestletResource.java | 3 +-
.../helix/core/PinotHelixResourceManager.java | 133 ++++++++++++--------
.../helix/core/PinotHelixResourceManagerTest.java | 134 ++++++++++++++++-----
3 files changed, 189 insertions(+), 81 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 78de2fd..1a75bfb 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -545,6 +545,7 @@ public class PinotSegmentUploadDownloadRestletResource {
public Response startReplaceSegments(
@ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME", required = true)
@QueryParam("type") String tableTypeStr,
+ @ApiParam(value = "Force cleanup") @QueryParam("forceCleanup")
@DefaultValue("false") boolean forceCleanup,
StartReplaceSegmentsRequest startReplaceSegmentsRequest) {
try {
TableType tableType = Constants.validateTableType(tableTypeStr);
@@ -555,7 +556,7 @@ public class PinotSegmentUploadDownloadRestletResource {
String tableNameWithType =
TableNameBuilder.forType(tableType).tableNameWithType(tableName);
String segmentLineageEntryId = _pinotHelixResourceManager
.startReplaceSegments(tableNameWithType,
startReplaceSegmentsRequest.getSegmentsFrom(),
- startReplaceSegmentsRequest.getSegmentsTo());
+ startReplaceSegmentsRequest.getSegmentsTo(), forceCleanup);
return
Response.ok(JsonUtils.newObjectNode().put("segmentLineageEntryId",
segmentLineageEntryId)).build();
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 22b6872..e32506b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2747,11 +2747,13 @@ public class PinotHelixResourceManager {
* @param tableNameWithType Table name with type
* @param segmentsFrom a list of segments to be merged
* @param segmentsTo a list of merged segments
+ * @param forceCleanup True for enabling the force segment cleanup
* @return Segment lineage entry id
*
* @throws InvalidConfigException
*/
- public String startReplaceSegments(String tableNameWithType, List<String>
segmentsFrom, List<String> segmentsTo) {
+ public String startReplaceSegments(String tableNameWithType, List<String>
segmentsFrom, List<String> segmentsTo,
+ boolean forceCleanup) {
// Create a segment lineage entry id
String segmentLineageEntryId =
SegmentLineageUtils.generateLineageEntryId();
@@ -2786,37 +2788,61 @@ public class PinotHelixResourceManager {
Preconditions.checkArgument(segmentLineage.getLineageEntry(segmentLineageEntryId)
== null,
String.format("SegmentLineageEntryId (%s) already exists in the
segment lineage.", segmentLineageEntryId));
+ List<String> segmentsToCleanUp = new ArrayList<>();
for (String entryId : segmentLineage.getLineageEntryIds()) {
LineageEntry lineageEntry = segmentLineage.getLineageEntry(entryId);
- // If segment entry is in 'REVERTED' state, no need to check for
'segmentsFrom'.
- if (lineageEntry.getState() != LineageEntryState.REVERTED) {
+ // If the lineage entry is in 'REVERTED' state, no need to go
through the validation because we can regard
+ // the entry as not existing.
+ if (lineageEntry.getState() == LineageEntryState.REVERTED) {
+ continue;
+ }
+
+ // By here, the lineage entry is either 'IN_PROGRESS' or 'COMPLETED'.
+
+ // When 'forceCleanup' is enabled, we need to proactively revert the
lineage entry when we find the lineage
+ // entry with the same 'segmentFrom' values.
+ if (forceCleanup && lineageEntry.getState() ==
LineageEntryState.IN_PROGRESS && CollectionUtils
+ .isEqualCollection(segmentsFrom,
lineageEntry.getSegmentsFrom())) {
+ // Update segment lineage entry to 'REVERTED'
+ updateSegmentLineageEntryToReverted(tableNameWithType,
segmentLineage, entryId, lineageEntry);
+
+ // Add segments for proactive clean-up.
+ segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo());
+ } else {
// Check that any segment from 'segmentsFrom' does not appear
twice.
Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(),
segmentsFrom), String
- .format(
- "It is not allowed to merge segments that are already
merged. (tableName = %s, segmentsFrom from "
- + "existing lineage entry = %s, requested segmentsFrom
= %s)", tableNameWithType,
- lineageEntry.getSegmentsFrom(), segmentsFrom));
+ .format("It is not allowed to replace segments that are
already replaced. (tableName = %s, "
+ + "segmentsFrom from the existing lineage entry = %s,
requested segmentsFrom = %s)",
+ tableNameWithType, lineageEntry.getSegmentsFrom(),
segmentsFrom));
+
+ // Check that any segment from 'segmentTo' does not appear twice.
+
Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(),
segmentsTo), String.format(
+ "It is not allowed to have the same segment name for segments
in 'segmentsTo'. (tableName = %s, "
+ + "segmentsTo from the existing lineage entry = %s,
requested segmentsTo = %s)", tableNameWithType,
+ lineageEntry.getSegmentsTo(), segmentsTo));
}
-
- // Check that merged segments name cannot be the same.
-
Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(),
segmentsTo), String.format(
- "It is not allowed to have the same segment name for merged
segments. (tableName = %s, segmentsTo from "
- + "existing lineage entry = %s, requested segmentsTo = %s)",
tableNameWithType,
- lineageEntry.getSegmentsTo(), segmentsTo));
}
// Update lineage entry
segmentLineage.addLineageEntry(segmentLineageEntryId,
new LineageEntry(segmentsFrom, segmentsTo,
LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
- // Write back to the lineage entry
- return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore,
segmentLineage, expectedVersion);
+ // Write back to the lineage entry to the property store
+ if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore,
segmentLineage, expectedVersion)) {
+ // Trigger the proactive segment clean up if needed. Once the
lineage is updated in the property store, it
+ // is safe to physically delete segments.
+ if (!segmentsToCleanUp.isEmpty()) {
+ deleteSegments(tableNameWithType, segmentsToCleanUp);
+ }
+ return true;
+ } else {
+ return false;
+ }
});
} catch (Exception e) {
- String errorMsg = String
- .format("Failed while updating the segment lineage. (tableName = %s,
segmentsFrom = %s, segmentsTo = %s)",
- tableNameWithType, segmentsFrom, segmentsTo);
+ String errorMsg = String.format("Failed to update the segment lineage
during startReplaceSegments. "
+ + "(tableName = %s, segmentsFrom = %s, segmentsTo = %s)",
tableNameWithType, segmentsFrom, segmentsTo);
LOGGER.error(errorMsg, e);
throw new RuntimeException(errorMsg, e);
}
@@ -2860,10 +2886,11 @@ public class PinotHelixResourceManager {
.format("Invalid segment lineage entry id (tableName='%s',
segmentLineageEntryId='%s')", tableNameWithType,
segmentLineageEntryId));
- // NO-OPS if the entry is already completed
- if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
- LOGGER.warn("Lineage entry state is already COMPLETED. Nothing to
update. (tableNameWithType={}, "
- + "segmentLineageEntryId={})", tableNameWithType,
segmentLineageEntryId);
+ // NO-OPS if the entry is already 'COMPLETED' or 'REVERTED'
+ if (lineageEntry.getState() != LineageEntryState.IN_PROGRESS) {
+ LOGGER.warn("Lineage entry state is not 'IN_PROGRESS'. Cannot update
to 'COMPLETED'. (tableNameWithType={}, "
+ + "segmentLineageEntryId={}, state={})", tableNameWithType,
segmentLineageEntryId,
+ lineageEntry.getState());
return true;
}
@@ -2901,9 +2928,8 @@ public class PinotHelixResourceManager {
}
});
} catch (Exception e) {
- String errorMsg = String
- .format("Failed to update the segment lineage. (tableName = %s,
segmentLineageEntryId = %s)",
- tableNameWithType, segmentLineageEntryId);
+ String errorMsg = String.format("Failed to update the segment lineage
during endReplaceSegments. "
+ + "(tableName = %s, segmentLineageEntryId = %s)", tableNameWithType,
segmentLineageEntryId);
LOGGER.error(errorMsg, e);
throw new RuntimeException(errorMsg, e);
}
@@ -2943,29 +2969,19 @@ public class PinotHelixResourceManager {
.format("Invalid segment lineage entry id (tableName='%s',
segmentLineageEntryId='%s')", tableNameWithType,
segmentLineageEntryId));
- if (lineageEntry.getState() != LineageEntryState.COMPLETED) {
- // We do not allow to revert the lineage entry with 'REVERTED'
state. For 'IN_PROGRESS", we only allow to
- // revert when 'forceRevert' is set to true.
- if (lineageEntry.getState() != LineageEntryState.IN_PROGRESS ||
!forceRevert) {
- LOGGER.warn("Lineage state is not valid. Cannot revert the lineage
entry. (tableNameWithType={}, "
- + "segmentLineageEntryId={}, segmentLineageEntrySate={},
forceRevert={})", tableNameWithType,
- segmentLineageEntryId, lineageEntry.getState(), forceRevert);
- return false;
- }
+ // We do not allow to revert the lineage entry with 'REVERTED' state.
For 'IN_PROGRESS", we only allow to
+ // revert when 'forceRevert' is set to true.
+ if (lineageEntry.getState() == LineageEntryState.REVERTED || (
+ lineageEntry.getState() == LineageEntryState.IN_PROGRESS &&
!forceRevert)) {
+ String errorMsg = String.format(
+ "Lineage state is not valid. Cannot update the lineage entry to
be 'REVERTED'. (tableNameWithType=%s, "
+ + "segmentLineageEntryId=%s, segmentLineageEntryState=%s,
forceRevert=%s)", tableNameWithType,
+ segmentLineageEntryId, lineageEntry.getState(), forceRevert);
+ throw new RuntimeException(errorMsg);
}
- // Check that all segments from 'segmentsFrom' are in ONLINE state in
the external view.
- Set<String> onlineSegments =
getOnlineSegmentsFromExternalView(tableNameWithType);
-
Preconditions.checkArgument(onlineSegments.containsAll(lineageEntry.getSegmentsFrom()),
String.format(
- "Not all segments from 'segmentFrom' are in ONLINE state in the
external view. (tableName = '%s', "
- + "segmentsFrom = '%s', onlineSegments = '%s'",
tableNameWithType, lineageEntry.getSegmentsFrom(),
- onlineSegments));
-
- // Update lineage entry
- LineageEntry newLineageEntry =
- new LineageEntry(lineageEntry.getSegmentsFrom(),
lineageEntry.getSegmentsTo(), LineageEntryState.REVERTED,
- System.currentTimeMillis());
- segmentLineage.updateLineageEntry(segmentLineageEntryId,
newLineageEntry);
+ // Update segment lineage entry to 'REVERTED'
+ updateSegmentLineageEntryToReverted(tableNameWithType, segmentLineage,
segmentLineageEntryId, lineageEntry);
// Write back to the lineage entry
if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore,
segmentLineage, expectedVersion)) {
@@ -2973,15 +2989,19 @@ public class PinotHelixResourceManager {
// routing table because it is possible that there has been no EV
change but the routing result may be
// different after updating the lineage entry.
sendRoutingTableRebuildMessage(tableNameWithType);
+
+ // Invoke the proactive clean-up for segments that we no longer
needs in case 'forceRevert' is enabled
+ if (forceRevert) {
+ deleteSegments(tableNameWithType, lineageEntry.getSegmentsTo());
+ }
return true;
} else {
return false;
}
});
} catch (Exception e) {
- String errorMsg = String
- .format("Failed to update the segment lineage. (tableName = %s,
segmentLineageEntryId = %s)",
- tableNameWithType, segmentLineageEntryId);
+ String errorMsg = String.format("Failed to update the segment lineage
during revertReplaceSegments. "
+ + "(tableName = %s, segmentLineageEntryId = %s)", tableNameWithType,
segmentLineageEntryId);
LOGGER.error(errorMsg, e);
throw new RuntimeException(errorMsg, e);
}
@@ -2991,6 +3011,21 @@ public class PinotHelixResourceManager {
tableNameWithType, segmentLineageEntryId);
}
+ private void updateSegmentLineageEntryToReverted(String tableNameWithType,
SegmentLineage segmentLineage,
+ String segmentLineageEntryId, LineageEntry lineageEntry) {
+ // Check that all segments from 'segmentsFrom' are in ONLINE state in the
external view.
+ Set<String> onlineSegments =
getOnlineSegmentsFromExternalView(tableNameWithType);
+
Preconditions.checkArgument(onlineSegments.containsAll(lineageEntry.getSegmentsFrom()),
String.format(
+ "Failed to update the lineage to be 'REVERTED'. Not all segments from
'segmentFrom' are in ONLINE state "
+ + "in the external view. (tableName = '%s', segmentsFrom = '%s',
onlineSegments = '%s'", tableNameWithType,
+ lineageEntry.getSegmentsFrom(), onlineSegments));
+
+ // Update lineage entry
+ segmentLineage.updateLineageEntry(segmentLineageEntryId,
+ new LineageEntry(lineageEntry.getSegmentsFrom(),
lineageEntry.getSegmentsTo(), LineageEntryState.REVERTED,
+ System.currentTimeMillis()));
+ }
+
private void waitForSegmentsBecomeOnline(String tableNameWithType,
Set<String> segmentsToCheck)
throws InterruptedException, TimeoutException {
long endTimeMs = System.currentTimeMillis() +
EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS;
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index 89e98b0..ae524a6 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -471,7 +471,7 @@ public class PinotHelixResourceManagerTest {
List<String> segmentsTo = Arrays.asList("s5", "s6");
String lineageEntryId = ControllerTestUtils.getHelixResourceManager()
- .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
segmentsFrom, segmentsTo);
+ .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
segmentsFrom, segmentsTo, false);
SegmentLineage segmentLineage = SegmentLineageAccessHelper
.getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1);
@@ -484,7 +484,7 @@ public class PinotHelixResourceManagerTest {
segmentsTo = Arrays.asList("s3", "s4");
try {
ControllerTestUtils.getHelixResourceManager()
- .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
segmentsFrom, segmentsTo);
+ .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
segmentsFrom, segmentsTo, false);
} catch (Exception e) {
// expected
}
@@ -492,34 +492,16 @@ public class PinotHelixResourceManagerTest {
segmentsTo = Arrays.asList("s2");
try {
ControllerTestUtils.getHelixResourceManager()
- .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
segmentsFrom, segmentsTo);
+ .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
segmentsFrom, segmentsTo, false);
} catch (Exception e) {
// expected
}
// Check invalid segmentsFrom
segmentsFrom = Arrays.asList("s1", "s6");
- segmentsTo = Arrays.asList("merged1", "merged2");
try {
ControllerTestUtils.getHelixResourceManager()
- .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
segmentsFrom, segmentsTo);
- } catch (Exception e) {
- // expected
- }
-
- segmentsFrom = Arrays.asList("s1", "s2");
- String lineageEntryId2 = ControllerTestUtils.getHelixResourceManager()
- .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
segmentsFrom, segmentsTo);
- segmentLineage = SegmentLineageAccessHelper
- .getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
- Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
-
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(),
segmentsFrom);
-
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsTo(),
segmentsTo);
-
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(),
LineageEntryState.IN_PROGRESS);
-
- try {
- ControllerTestUtils.getHelixResourceManager()
- .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
segmentsFrom, segmentsTo);
+ .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
segmentsFrom, segmentsTo, false);
} catch (Exception e) {
// expected
}
@@ -556,27 +538,117 @@ public class PinotHelixResourceManagerTest {
.endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
lineageEntryId);
segmentLineage = SegmentLineageAccessHelper
.getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
- Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1);
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsFrom(),
new ArrayList<>());
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsTo(),
Arrays.asList("s5", "s6"));
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getState(),
LineageEntryState.COMPLETED);
+ // Start the new segment replacement
+ segmentsFrom = Arrays.asList("s1", "s2");
+ segmentsTo = Arrays.asList("merged_t1_0", "merged_t1_1");
+ String lineageEntryId2 = ControllerTestUtils.getHelixResourceManager()
+ .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
segmentsFrom, segmentsTo, false);
+ segmentLineage = SegmentLineageAccessHelper
+ .getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(),
Arrays.asList("s1", "s2"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsTo(),
+ Arrays.asList("merged_t1_0", "merged_t1_1"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(),
LineageEntryState.IN_PROGRESS);
+
+ // Upload partial data
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
-
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
"merged1"),
+
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
"merged_t1_0"),
+ "downloadUrl");
+
+ IdealState idealState =
+
ControllerTestUtils.getHelixResourceManager().getTableIdealState(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+ Assert.assertTrue(!idealState.getInstanceSet("merged_t1_0").isEmpty());
+
+ // Try to revert the entry with partial data uploaded without forceRevert
+ try {
+ ControllerTestUtils.getHelixResourceManager()
+ .revertReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
lineageEntryId2, false);
+ } catch (Exception e) {
+ // expected
+ }
+
+ // Try to revert the entry with partial data uploaded with forceRevert
+ ControllerTestUtils.getHelixResourceManager()
+ .revertReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
lineageEntryId2, true);
+ segmentLineage = SegmentLineageAccessHelper
+ .getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(),
LineageEntryState.REVERTED);
+
+ // 'merged_t1_0' segment should be proactively cleaned up
+ idealState =
+
ControllerTestUtils.getHelixResourceManager().getTableIdealState(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+ Assert.assertTrue(idealState.getInstanceSet("merged_t1_0").isEmpty());
+
+ // Start new segment replacement since the above entry is reverted
+ segmentsFrom = Arrays.asList("s1", "s2");
+ segmentsTo = Arrays.asList("merged_t2_0", "merged_t2_1");
+ String lineageEntryId3 = ControllerTestUtils.getHelixResourceManager()
+ .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
segmentsFrom, segmentsTo, false);
+ segmentLineage = SegmentLineageAccessHelper
+ .getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 3);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsFrom(),
segmentsFrom);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsTo(),
segmentsTo);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getState(),
LineageEntryState.IN_PROGRESS);
+
+ // Upload partial data
+
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
"merged_t2_0"),
+ "downloadUrl");
+
+ // Without force cleanup, 'startReplaceSegments' should fail because of
duplicate segments on 'segmentFrom'.
+ segmentsTo = Arrays.asList("merged_t3_0", "merged_t3_1");
+ try {
+ ControllerTestUtils.getHelixResourceManager()
+ .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
segmentsFrom, segmentsTo, false);
+ } catch (Exception e) {
+ // expected
+ }
+
+ // Test force clean up case
+ String lineageEntryId4 = ControllerTestUtils.getHelixResourceManager()
+ .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
segmentsFrom, segmentsTo, true);
+ segmentLineage = SegmentLineageAccessHelper
+ .getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 4);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsFrom(),
Arrays.asList("s1", "s2"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsTo(),
+ Arrays.asList("merged_t2_0", "merged_t2_1"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getState(),
LineageEntryState.REVERTED);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getSegmentsFrom(),
Arrays.asList("s1", "s2"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getSegmentsTo(),
+ Arrays.asList("merged_t3_0", "merged_t3_1"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getState(),
LineageEntryState.IN_PROGRESS);
+
+ // 'merged_t2_0' segment should be proactively cleaned up
+ idealState =
+
ControllerTestUtils.getHelixResourceManager().getTableIdealState(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+ Assert.assertTrue(idealState.getInstanceSet("merged_t2_0").isEmpty());
+
+ // Upload segments again
+
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
"merged_t3_0"),
"downloadUrl");
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
-
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
"merged2"),
+
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
"merged_t3_1"),
"downloadUrl");
+ // Finish the replacement
ControllerTestUtils.getHelixResourceManager()
- .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
lineageEntryId2);
+ .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
lineageEntryId4);
segmentLineage = SegmentLineageAccessHelper
.getSegmentLineage(ControllerTestUtils.getPropertyStore(),
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
- Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
-
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(),
Arrays.asList("s1", "s2"));
-
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsTo(),
- Arrays.asList("merged1", "merged2"));
-
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(),
LineageEntryState.COMPLETED);
+ Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 4);
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getSegmentsFrom(),
Arrays.asList("s1", "s2"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getSegmentsTo(),
+ Arrays.asList("merged_t3_0", "merged_t3_1"));
+
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getState(),
LineageEntryState.COMPLETED);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]