Jackie-Jiang commented on a change in pull request #7662:
URL: https://github.com/apache/pinot/pull/7662#discussion_r739573305



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2907,6 +2907,88 @@ public void endReplaceSegments(String tableNameWithType, 
String segmentLineageEn
         tableNameWithType, segmentLineageEntryId);
   }
 
+  /**
+   * Revert the segment replacement
+   *
+   * 1. Compute validation
+   * 2. Update the lineage entry state to "REVERTED" and write metadata to the 
property store
+   *
+   * Update is done with retry logic along with read-modify-write block for 
achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType
+   * @param segmentLineageEntryId
+   */
+  public void revertReplaceSegments(String tableNameWithType, String 
segmentLineageEntryId) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, 
tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        Preconditions.checkArgument(segmentLineageZNRecord != null, String
+            .format("Segment lineage does not exist. (tableNameWithType = 
'%s', segmentLineageEntryId = '%s')",
+                tableNameWithType, segmentLineageEntryId));
+        segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        expectedVersion = segmentLineageZNRecord.getVersion();

Review comment:
       (minor) Declare the variable here
   ```suggestion
           SegmentLineage segmentLineage = 
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
           int expectedVersion = segmentLineageZNRecord.getVersion();
   ```

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2907,6 +2907,88 @@ public void endReplaceSegments(String tableNameWithType, 
String segmentLineageEn
         tableNameWithType, segmentLineageEntryId);
   }
 
+  /**
+   * Revert the segment replacement
+   *
+   * 1. Compute validation
+   * 2. Update the lineage entry state to "REVERTED" and write metadata to the 
property store
+   *
+   * Update is done with retry logic along with read-modify-write block for 
achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType
+   * @param segmentLineageEntryId
+   */
+  public void revertReplaceSegments(String tableNameWithType, String 
segmentLineageEntryId) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, 
tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        Preconditions.checkArgument(segmentLineageZNRecord != null, String
+            .format("Segment lineage does not exist. (tableNameWithType = 
'%s', segmentLineageEntryId = '%s')",
+                tableNameWithType, segmentLineageEntryId));
+        segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Look up the lineage entry based on the segment lineage entry id
+        LineageEntry lineageEntry = 
segmentLineage.getLineageEntry(segmentLineageEntryId);
+        Preconditions.checkArgument(lineageEntry != null, String
+            .format("Invalid segment lineage entry id (tableName='%s', 
segmentLineageEntryId='%s')", tableNameWithType,
+                segmentLineageEntryId));
+
+        // Revert is not allowed if the state is not 'COMPLETED'
+        if (lineageEntry.getState() != LineageEntryState.COMPLETED) {
+          LOGGER.warn("Lineage entry state is not COMPLETED. Cannot revert the 
lineage entry. (tableNameWithType={}, "
+              + "segmentLineageEntryId={})", tableNameWithType, 
segmentLineageEntryId);
+          return false;
+        }
+
+        // Check that all the segments from 'segmentsFrom' exist in the table

Review comment:
       I feel this check can be skipped because we will check the external view

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2907,6 +2907,88 @@ public void endReplaceSegments(String tableNameWithType, 
String segmentLineageEn
         tableNameWithType, segmentLineageEntryId);
   }
 
+  /**
+   * Revert the segment replacement
+   *
+   * 1. Compute validation
+   * 2. Update the lineage entry state to "REVERTED" and write metadata to the 
property store
+   *
+   * Update is done with retry logic along with read-modify-write block for 
achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType
+   * @param segmentLineageEntryId
+   */
+  public void revertReplaceSegments(String tableNameWithType, String 
segmentLineageEntryId) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, 
tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        Preconditions.checkArgument(segmentLineageZNRecord != null, String
+            .format("Segment lineage does not exist. (tableNameWithType = 
'%s', segmentLineageEntryId = '%s')",
+                tableNameWithType, segmentLineageEntryId));
+        segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Look up the lineage entry based on the segment lineage entry id
+        LineageEntry lineageEntry = 
segmentLineage.getLineageEntry(segmentLineageEntryId);
+        Preconditions.checkArgument(lineageEntry != null, String
+            .format("Invalid segment lineage entry id (tableName='%s', 
segmentLineageEntryId='%s')", tableNameWithType,
+                segmentLineageEntryId));
+
+        // Revert is not allowed if the state is not 'COMPLETED'
+        if (lineageEntry.getState() != LineageEntryState.COMPLETED) {

Review comment:
       +1. We should allow explicitly revert `IN_PROGRESS` lineage (add a flag 
is fine) in case user knows the the segment upload is already interrupted

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2907,6 +2907,88 @@ public void endReplaceSegments(String tableNameWithType, 
String segmentLineageEn
         tableNameWithType, segmentLineageEntryId);
   }
 
+  /**
+   * Revert the segment replacement
+   *
+   * 1. Compute validation
+   * 2. Update the lineage entry state to "REVERTED" and write metadata to the 
property store
+   *
+   * Update is done with retry logic along with read-modify-write block for 
achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType
+   * @param segmentLineageEntryId
+   */
+  public void revertReplaceSegments(String tableNameWithType, String 
segmentLineageEntryId) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, 
tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        Preconditions.checkArgument(segmentLineageZNRecord != null, String
+            .format("Segment lineage does not exist. (tableNameWithType = 
'%s', segmentLineageEntryId = '%s')",
+                tableNameWithType, segmentLineageEntryId));
+        segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Look up the lineage entry based on the segment lineage entry id
+        LineageEntry lineageEntry = 
segmentLineage.getLineageEntry(segmentLineageEntryId);
+        Preconditions.checkArgument(lineageEntry != null, String
+            .format("Invalid segment lineage entry id (tableName='%s', 
segmentLineageEntryId='%s')", tableNameWithType,
+                segmentLineageEntryId));
+
+        // Revert is not allowed if the state is not 'COMPLETED'
+        if (lineageEntry.getState() != LineageEntryState.COMPLETED) {
+          LOGGER.warn("Lineage entry state is not COMPLETED. Cannot revert the 
lineage entry. (tableNameWithType={}, "
+              + "segmentLineageEntryId={})", tableNameWithType, 
segmentLineageEntryId);
+          return false;
+        }
+
+        // Check that all the segments from 'segmentsFrom' exist in the table
+        Set<String> segmentsForTable = new 
HashSet<>(getSegmentsFor(tableNameWithType));
+        
Preconditions.checkArgument(segmentsForTable.containsAll(lineageEntry.getSegmentsFrom()),
 String.format(
+            "Not all segments from 'segmentsFrom' are available in the table. 
(tableName = '%s', segmentsFrom = '%s', "
+                + "segmentsForTable = '%s')", tableNameWithType, 
lineageEntry.getSegmentsFrom(), segmentsForTable));
+
+        // Check that all segments from 'segmentsFrom' are in ONLINE state in 
the external view.
+        Set<String> onlineSegments = 
getOnlineSegmentsFromExternalView(tableNameWithType);
+        
Preconditions.checkArgument(onlineSegments.containsAll(lineageEntry.getSegmentsFrom()),
 String.format(
+            "Not all segments from 'segmentFrom' are in ONLINE state in the 
external view. (tableName = '%s', "
+                + "segmentsFrom = '%s', onlineSegments = '%s'", 
tableNameWithType, lineageEntry.getSegmentsFrom(),
+            segmentsForTable));

Review comment:
       ```suggestion
               onlineSegments));
   ```

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -229,6 +229,17 @@ private void manageSegmentLineageCleanupForTable(String 
tableNameWithType) {
               // If the lineage state is 'COMPLETED', it is safe to delete all 
segments from 'segmentsFrom'
               segmentsToDelete.addAll(sourceSegments);
             }
+          } else if (lineageEntry.getState() == LineageEntryState.REVERTED) {

Review comment:
       We can simplify it because `REVERTED` should have the same behavior as 
expired `IN_PROGRESS`
   ```suggestion
             } else if (lineageEntry.getState() == LineageEntryState.REVERTED 
|| (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && 
lineageEntry.getTimestamp() < System.currentTimeMillis() - 
LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS)) {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to