nsivabalan commented on code in PR #11553:
URL: https://github.com/apache/hudi/pull/11553#discussion_r1676772962


##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -70,11 +70,67 @@ public class ClusteringUtils {
   public static Stream<Pair<HoodieInstant, HoodieClusteringPlan>> 
getAllPendingClusteringPlans(
       HoodieTableMetaClient metaClient) {
     List<HoodieInstant> pendingClusterInstants =
-        
metaClient.getActiveTimeline().filterPendingClusterTimeline().getInstants();
+        
metaClient.getActiveTimeline().filterPendingReplaceOrClusteringTimeline().getInstants();
     return pendingClusterInstants.stream().map(instant -> 
getClusteringPlan(metaClient, instant))
         .filter(Option::isPresent).map(Option::get);
   }
 
+  /**
+   * Returns the pending clustering instant. This can be older pending replace 
commit or a new
+   * clustering inflight commit. After HUDI-7905, all the requested and 
inflight clustering instants
+   * use clustering action instead of replacecommit.
+   */
+  public static Option<HoodieInstant> getInflightClusteringInstant(String 
timestamp, HoodieActiveTimeline activeTimeline) {
+    HoodieTimeline pendingReplaceOrClusterTimeline = 
activeTimeline.filterPendingReplaceOrClusteringTimeline();
+    HoodieInstant inflightInstant = 
HoodieTimeline.getClusteringCommitInflightInstant(timestamp);
+    if (pendingReplaceOrClusterTimeline.containsInstant(inflightInstant)) {
+      return Option.of(inflightInstant);
+    }
+    inflightInstant = 
HoodieTimeline.getReplaceCommitInflightInstant(timestamp);
+    return 
Option.ofNullable(pendingReplaceOrClusterTimeline.containsInstant(inflightInstant)
 ? inflightInstant : null);
+  }
+
+  /**
+   * Returns the pending clustering instant. This can be older requested 
replace commit or a new
+   * clustering requested commit. After HUDI-7905, all the requested and 
inflight clustering instants
+   * use clustering action instead of replacecommit.
+   */
+  public static Option<HoodieInstant> getRequestedClusteringInstant(String 
timestamp, HoodieActiveTimeline activeTimeline) {
+    HoodieTimeline pendingReplaceOrClusterTimeline = 
activeTimeline.filterPendingReplaceOrClusteringTimeline();
+    HoodieInstant requestedInstant = 
HoodieTimeline.getClusteringCommitRequestedInstant(timestamp);
+    if (pendingReplaceOrClusterTimeline.containsInstant(requestedInstant)) {
+      return Option.of(requestedInstant);
+    }
+    requestedInstant = 
HoodieTimeline.getReplaceCommitRequestedInstant(timestamp);
+    return 
Option.ofNullable(pendingReplaceOrClusterTimeline.containsInstant(requestedInstant)
 ? requestedInstant : null);
+  }
+
+  /**
+   * Transitions the provided clustering instant fron inflight to complete 
based on the clustering
+   * action type. After HUDI-7905, the new clustering commits are written with 
clustering action.
+   */
+  public static void transitionClusterInflightToComplete(boolean shouldLock, 
HoodieInstant clusteringInstant,

Review Comment:
   oh 1 thing. if there is a pending replacecommit(just requested) in timeline 
just before upgrade, then we need to use replacecommit (and not clustering) for 
inflight right?
   are we handling this case. 



##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -70,11 +70,67 @@ public class ClusteringUtils {
   public static Stream<Pair<HoodieInstant, HoodieClusteringPlan>> 
getAllPendingClusteringPlans(
       HoodieTableMetaClient metaClient) {
     List<HoodieInstant> pendingClusterInstants =
-        
metaClient.getActiveTimeline().filterPendingClusterTimeline().getInstants();
+        
metaClient.getActiveTimeline().filterPendingReplaceOrClusteringTimeline().getInstants();
     return pendingClusterInstants.stream().map(instant -> 
getClusteringPlan(metaClient, instant))
         .filter(Option::isPresent).map(Option::get);
   }
 
+  /**
+   * Returns the pending clustering instant. This can be older pending replace 
commit or a new
+   * clustering inflight commit. After HUDI-7905, all the requested and 
inflight clustering instants
+   * use clustering action instead of replacecommit.
+   */
+  public static Option<HoodieInstant> getInflightClusteringInstant(String 
timestamp, HoodieActiveTimeline activeTimeline) {
+    HoodieTimeline pendingReplaceOrClusterTimeline = 
activeTimeline.filterPendingReplaceOrClusteringTimeline();
+    HoodieInstant inflightInstant = 
HoodieTimeline.getClusteringCommitInflightInstant(timestamp);
+    if (pendingReplaceOrClusterTimeline.containsInstant(inflightInstant)) {
+      return Option.of(inflightInstant);
+    }
+    inflightInstant = 
HoodieTimeline.getReplaceCommitInflightInstant(timestamp);
+    return 
Option.ofNullable(pendingReplaceOrClusterTimeline.containsInstant(inflightInstant)
 ? inflightInstant : null);
+  }
+
+  /**
+   * Returns the pending clustering instant. This can be older requested 
replace commit or a new
+   * clustering requested commit. After HUDI-7905, all the requested and 
inflight clustering instants
+   * use clustering action instead of replacecommit.
+   */
+  public static Option<HoodieInstant> getRequestedClusteringInstant(String 
timestamp, HoodieActiveTimeline activeTimeline) {
+    HoodieTimeline pendingReplaceOrClusterTimeline = 
activeTimeline.filterPendingReplaceOrClusteringTimeline();
+    HoodieInstant requestedInstant = 
HoodieTimeline.getClusteringCommitRequestedInstant(timestamp);
+    if (pendingReplaceOrClusterTimeline.containsInstant(requestedInstant)) {
+      return Option.of(requestedInstant);
+    }
+    requestedInstant = 
HoodieTimeline.getReplaceCommitRequestedInstant(timestamp);
+    return 
Option.ofNullable(pendingReplaceOrClusterTimeline.containsInstant(requestedInstant)
 ? requestedInstant : null);
+  }
+
+  /**
+   * Transitions the provided clustering instant fron inflight to complete 
based on the clustering
+   * action type. After HUDI-7905, the new clustering commits are written with 
clustering action.
+   */
+  public static void transitionClusterInflightToComplete(boolean shouldLock, 
HoodieInstant clusteringInstant,
+                                                         Option<byte[]> 
commitMetadata, HoodieActiveTimeline activeTimeline) {
+    if 
(clusteringInstant.getAction().equals(HoodieTimeline.CLUSTERING_ACTION)) {
+      activeTimeline.transitionClusterInflightToComplete(shouldLock, 
clusteringInstant, commitMetadata);
+    } else {
+      activeTimeline.transitionReplaceInflightToComplete(shouldLock, 
clusteringInstant, commitMetadata);
+    }
+  }
+
+  /**
+   * Transitions the provided clustering instant fron requested to inflight 
based on the clustering
+   * action type. After HUDI-7905, the new clustering commits are written with 
clustering action.
+   */
+  public static void transitionClusterRequestedToInflight(HoodieInstant 
requestedClusteringInstant, Option<byte[]> data,
+                                                          HoodieActiveTimeline 
activeTimeline) {
+    if 
(requestedClusteringInstant.getAction().equals(HoodieTimeline.CLUSTERING_ACTION))
 {
+      
activeTimeline.transitionClusterRequestedToInflight(requestedClusteringInstant, 
data);

Review Comment:
   ok I see it here. depending on the requested instant's action type, we 
follow suit here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to