snleee commented on a change in pull request #7662:
URL: https://github.com/apache/pinot/pull/7662#discussion_r740575489
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2907,6 +2911,83 @@ public void endReplaceSegments(String tableNameWithType,
String segmentLineageEn
tableNameWithType, segmentLineageEntryId);
}
+ /**
+ * Revert the segment replacement
+ *
+ * 1. Compute validation
+ * 2. Update the lineage entry state to "REVERTED" and write metadata to the
property store
+ *
+ * Update is done with retry logic along with read-modify-write block for
achieving atomic update of the lineage
+ * metadata.
+ *
+ * @param tableNameWithType
+ * @param segmentLineageEntryId
+ */
+ public void revertReplaceSegments(String tableNameWithType, String
segmentLineageEntryId, boolean forceRevert) {
+ try {
+ DEFAULT_RETRY_POLICY.attempt(() -> {
+ // Fetch the segment lineage metadata
+ ZNRecord segmentLineageZNRecord =
+
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore,
tableNameWithType);
+ Preconditions.checkArgument(segmentLineageZNRecord != null, String
+ .format("Segment lineage does not exist. (tableNameWithType =
'%s', segmentLineageEntryId = '%s')",
+ tableNameWithType, segmentLineageEntryId));
+ SegmentLineage segmentLineage =
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+ int expectedVersion = segmentLineageZNRecord.getVersion();
+
+ // Look up the lineage entry based on the segment lineage entry id
+ LineageEntry lineageEntry =
segmentLineage.getLineageEntry(segmentLineageEntryId);
+ Preconditions.checkArgument(lineageEntry != null, String
+ .format("Invalid segment lineage entry id (tableName='%s',
segmentLineageEntryId='%s')", tableNameWithType,
+ segmentLineageEntryId));
+
+ if (lineageEntry.getState() != LineageEntryState.COMPLETED) {
+ // We do not allow to revert the lineage entry with 'REVERTED'
state. For 'IN_PROGRESS", we only allow to
+ // revert when 'forceRevert' is set to true.
+ if (lineageEntry.getState() != LineageEntryState.IN_PROGRESS ||
!forceRevert) {
+ LOGGER.warn("Lineage state is not valid. Cannot revert the lineage
entry. (tableNameWithType={}, "
Review comment:
added
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -580,6 +580,30 @@ public Response endReplaceSegments(
}
}
+ @POST
+ @Path("segments/{tableName}/revertReplaceSegments")
+ @Authenticate(AccessType.UPDATE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Revert segments replacement", notes = "Revert
segments replacement")
+ public Response revertReplaceSegments(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
+ @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String
tableTypeStr,
+ @ApiParam(value = "Segment lineage entry id to revert", required = true)
+ @QueryParam("segmentLineageEntryId") String segmentLineageEntryId,
+ @ApiParam(value = "Force revert in case the user knows that the lineage
entry is interrupted")
+ @QueryParam("forceRevert") boolean forceRevert) {
Review comment:
added
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -580,6 +580,30 @@ public Response endReplaceSegments(
}
}
+ @POST
+ @Path("segments/{tableName}/revertReplaceSegments")
+ @Authenticate(AccessType.UPDATE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Revert segments replacement", notes = "Revert
segments replacement")
+ public Response revertReplaceSegments(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
+ @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String
tableTypeStr,
+ @ApiParam(value = "Segment lineage entry id to revert", required = true)
+ @QueryParam("segmentLineageEntryId") String segmentLineageEntryId,
+ @ApiParam(value = "Force revert in case the user knows that the lineage
entry is interrupted")
+ @QueryParam("forceRevert") boolean forceRevert) {
+ try {
+ String tableNameWithType =
+
TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName);
Review comment:
I made `tableType` to be required field.
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2907,6 +2911,84 @@ public void endReplaceSegments(String tableNameWithType,
String segmentLineageEn
tableNameWithType, segmentLineageEntryId);
}
+ /**
+ * Revert the segment replacement
+ *
+ * 1. Compute validation
+ * 2. Update the lineage entry state to "REVERTED" and write metadata to the
property store
+ *
+ * Update is done with retry logic along with read-modify-write block for
achieving atomic update of the lineage
+ * metadata.
+ *
+ * @param tableNameWithType
+ * @param segmentLineageEntryId
+ */
+ public void revertReplaceSegments(String tableNameWithType, String
segmentLineageEntryId, boolean forceRevert) {
+ try {
+ DEFAULT_RETRY_POLICY.attempt(() -> {
+ // Fetch the segment lineage metadata
+ ZNRecord segmentLineageZNRecord =
+
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore,
tableNameWithType);
+ Preconditions.checkArgument(segmentLineageZNRecord != null, String
+ .format("Segment lineage does not exist. (tableNameWithType =
'%s', segmentLineageEntryId = '%s')",
+ tableNameWithType, segmentLineageEntryId));
+ SegmentLineage segmentLineage =
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+ int expectedVersion = segmentLineageZNRecord.getVersion();
+
+ // Look up the lineage entry based on the segment lineage entry id
+ LineageEntry lineageEntry =
segmentLineage.getLineageEntry(segmentLineageEntryId);
+ Preconditions.checkArgument(lineageEntry != null, String
+ .format("Invalid segment lineage entry id (tableName='%s',
segmentLineageEntryId='%s')", tableNameWithType,
+ segmentLineageEntryId));
+
+ if (lineageEntry.getState() != LineageEntryState.COMPLETED) {
+ // We do not allow to revert the lineage entry with 'REVERTED'
state. For 'IN_PROGRESS", we only allow to
+ // revert when 'forceRevert' is set to true.
+ if (lineageEntry.getState() != LineageEntryState.IN_PROGRESS ||
!forceRevert) {
Review comment:
oops, I missed your review and I already checked in. I will address this
when I file the follow-up PR for proactive deletion.
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2907,6 +2911,84 @@ public void endReplaceSegments(String tableNameWithType,
String segmentLineageEn
tableNameWithType, segmentLineageEntryId);
}
+ /**
+ * Revert the segment replacement
+ *
+ * 1. Compute validation
+ * 2. Update the lineage entry state to "REVERTED" and write metadata to the
property store
+ *
+ * Update is done with retry logic along with read-modify-write block for
achieving atomic update of the lineage
+ * metadata.
+ *
+ * @param tableNameWithType
+ * @param segmentLineageEntryId
+ */
+ public void revertReplaceSegments(String tableNameWithType, String
segmentLineageEntryId, boolean forceRevert) {
+ try {
+ DEFAULT_RETRY_POLICY.attempt(() -> {
+ // Fetch the segment lineage metadata
+ ZNRecord segmentLineageZNRecord =
+
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore,
tableNameWithType);
+ Preconditions.checkArgument(segmentLineageZNRecord != null, String
+ .format("Segment lineage does not exist. (tableNameWithType =
'%s', segmentLineageEntryId = '%s')",
+ tableNameWithType, segmentLineageEntryId));
+ SegmentLineage segmentLineage =
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+ int expectedVersion = segmentLineageZNRecord.getVersion();
+
+ // Look up the lineage entry based on the segment lineage entry id
+ LineageEntry lineageEntry =
segmentLineage.getLineageEntry(segmentLineageEntryId);
+ Preconditions.checkArgument(lineageEntry != null, String
+ .format("Invalid segment lineage entry id (tableName='%s',
segmentLineageEntryId='%s')", tableNameWithType,
+ segmentLineageEntryId));
+
+ if (lineageEntry.getState() != LineageEntryState.COMPLETED) {
+ // We do not allow to revert the lineage entry with 'REVERTED'
state. For 'IN_PROGRESS", we only allow to
+ // revert when 'forceRevert' is set to true.
+ if (lineageEntry.getState() != LineageEntryState.IN_PROGRESS ||
!forceRevert) {
+ LOGGER.warn("Lineage state is not valid. Cannot revert the lineage
entry. (tableNameWithType={}, "
+ + "segmentLineageEntryId={}, segmentLineageEntrySate={},
forceRevert={})", tableNameWithType,
+ segmentLineageEntryId, lineageEntry.getState(), forceRevert);
+ return false;
+ }
+ }
+
+ // Check that all segments from 'segmentsFrom' are in ONLINE state in
the external view.
+ Set<String> onlineSegments =
getOnlineSegmentsFromExternalView(tableNameWithType);
+
Preconditions.checkArgument(onlineSegments.containsAll(lineageEntry.getSegmentsFrom()),
String.format(
+ "Not all segments from 'segmentFrom' are in ONLINE state in the
external view. (tableName = '%s', "
+ + "segmentsFrom = '%s', onlineSegments = '%s'",
tableNameWithType, lineageEntry.getSegmentsFrom(),
+ onlineSegments));
+
+ // Update lineage entry
+ LineageEntry newLineageEntry =
+ new LineageEntry(lineageEntry.getSegmentsFrom(),
lineageEntry.getSegmentsTo(), LineageEntryState.REVERTED,
+ System.currentTimeMillis());
+ segmentLineage.updateLineageEntry(segmentLineageEntryId,
newLineageEntry);
+
+ // Write back to the lineage entry
+ if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore,
segmentLineage, expectedVersion)) {
+ // If the segment lineage metadata is successfully updated, we need
to trigger brokers to rebuild the
+ // routing table because it is possible that there has been no EV
change but the routing result may be
+ // different after updating the lineage entry.
+ sendRoutingTableRebuildMessage(tableNameWithType);
+ return true;
Review comment:
Correct. I will file the follow-up PRs for proactive segment delete.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]