yihua commented on a change in pull request #4971:
URL: https://github.com/apache/hudi/pull/4971#discussion_r823175038



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -519,14 +521,19 @@ public abstract HoodieRestoreMetadata 
restore(HoodieEngineContext context,
                                                     String restoreInstantTime,
                                                     String instantToRestore);
 
+  public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
+    rollbackInflightCompaction(inflightInstant, s -> Option.empty());
+  }
+
   /**
    * Rollback failed compactions. Inflight rollbacks for compactions revert 
the .inflight file
    * to the .requested file.
    *
    * @param inflightInstant Inflight Compaction Instant
    */
-  public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
-    String commitTime = HoodieActiveTimeline.createNewInstantTime();
+  public void rollbackInflightCompaction(HoodieInstant inflightInstant, 
Function<String, Option<HoodiePendingRollbackInfo>> 
getPendingRollbackInstantFunc) {

Review comment:
       Why is the second argument needed, i.e., the func should not change?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -954,29 +954,42 @@ private HoodieTimeline 
getInflightTimelineExcludeCompactionAndClustering(HoodieT
     return inflightTimelineExcludeClusteringCommit;
   }
 
-  private Option<HoodiePendingRollbackInfo> 
getPendingRollbackInfo(HoodieTableMetaClient metaClient, String 
commitToRollback) {
-    return getPendingRollbackInfos(metaClient).getOrDefault(commitToRollback, 
Option.empty());
+  protected Option<HoodiePendingRollbackInfo> 
getPendingRollbackInfo(HoodieTableMetaClient metaClient, String 
commitToRollback) {
+    return getPendingRollbackInfo(metaClient, commitToRollback, true);
+  }
+
+  protected Option<HoodiePendingRollbackInfo> 
getPendingRollbackInfo(HoodieTableMetaClient metaClient, String 
commitToRollback, boolean ignoreCompactionAndClusteringInstants) {
+    return getPendingRollbackInfos(metaClient, 
ignoreCompactionAndClusteringInstants).getOrDefault(commitToRollback, 
Option.empty());
+  }
+
+  protected Map<String, Option<HoodiePendingRollbackInfo>> 
getPendingRollbackInfos(HoodieTableMetaClient metaClient) {
+    return getPendingRollbackInfos(metaClient, true);
   }
 
   /**
    * Fetch map of pending commits to be rolled-back to {@link 
HoodiePendingRollbackInfo}.
    * @param metaClient instance of {@link HoodieTableMetaClient} to use.
    * @return map of pending commits to be rolled-back instants to Rollback 
Instant and Rollback plan Pair.
    */
-  protected Map<String, Option<HoodiePendingRollbackInfo>> 
getPendingRollbackInfos(HoodieTableMetaClient metaClient) {
+  protected Map<String, Option<HoodiePendingRollbackInfo>> 
getPendingRollbackInfos(HoodieTableMetaClient metaClient, boolean 
ignoreCompactionAndClusteringInstants) {
     List<HoodieInstant> instants = 
metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().collect(Collectors.toList());
     Map<String, Option<HoodiePendingRollbackInfo>> infoMap = new HashMap<>();
     for (HoodieInstant instant : instants) {
       try {
         HoodieRollbackPlan rollbackPlan = 
RollbackUtils.getRollbackPlan(metaClient, instant);
         String action = rollbackPlan.getInstantToRollback().getAction();
-        if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) {
-          boolean isClustering = 
HoodieTimeline.REPLACE_COMMIT_ACTION.equals(action)
-              && ClusteringUtils.getClusteringPlan(metaClient, 
instant).isPresent();
-          if (!isClustering) {
-            String instantToRollback = 
rollbackPlan.getInstantToRollback().getCommitTime();
-            infoMap.putIfAbsent(instantToRollback, Option.of(new 
HoodiePendingRollbackInfo(instant, rollbackPlan)));
+        if (ignoreCompactionAndClusteringInstants) {
+          if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) {
+            boolean isClustering = 
HoodieTimeline.REPLACE_COMMIT_ACTION.equals(action)
+                && ClusteringUtils.getClusteringPlan(metaClient, new 
HoodieInstant(true, rollbackPlan.getInstantToRollback().getAction(),
+                
rollbackPlan.getInstantToRollback().getCommitTime())).isPresent();
+            if (!isClustering) {
+              String instantToRollback = 
rollbackPlan.getInstantToRollback().getCommitTime();
+              infoMap.putIfAbsent(instantToRollback, Option.of(new 
HoodiePendingRollbackInfo(instant, rollbackPlan)));
+            }
           }
+        } else {
+          
infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), 
Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan)));

Review comment:
       Do we need to delete pending rollbacks if the instant to rollback is 
already removed at some place?  For example, this can happen for existing 
releases, where a clustering instant to roll back in a pending rollback info is 
rolled back by a later rollback instant.

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -1467,6 +1469,34 @@ public void testPendingClusteringRollback(boolean 
populateMetaFields) throws Exc
     metaClient.reloadActiveTimeline();
     // verify there are no pending clustering instants
     assertEquals(0, 
ClusteringUtils.getAllPendingClusteringPlans(metaClient).count());
+
+    // delete rollback.completed instant to mimic failed rollback of 
clustering. and then trigger rollback of clustering again. same rollback 
instant should be used.
+    HoodieInstant rollbackInstant = 
metaClient.getActiveTimeline().getRollbackTimeline().lastInstant().get();
+    FileCreateUtils.deleteRollbackCommit(metaClient.getBasePath(), 
rollbackInstant.getTimestamp());
+    metaClient.reloadActiveTimeline();
+
+    // create replace commit requested meta file so that rollback will not 
throw FileNotFoundException
+    // create file slice with instantTime 001 and build clustering plan 
including this created 001 file slice.
+    HoodieClusteringPlan clusteringPlan = 
ClusteringTestUtils.createClusteringPlan(metaClient, 
pendingClusteringInstant.getTimestamp(), "1");
+    // create requested replace commit
+    HoodieRequestedReplaceMetadata requestedReplaceMetadata = 
HoodieRequestedReplaceMetadata.newBuilder()
+        
.setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build();
+
+    FileCreateUtils.createRequestedReplaceCommit(metaClient.getBasePath(), 
pendingClusteringInstant.getTimestamp(), Option.of(requestedReplaceMetadata));
+    SparkRDDWriteClient client1 = new SparkRDDWriteClient(context, config);

Review comment:
       nit: reuse and reassign the `client` variable?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to