kbuci commented on code in PR #18302:
URL: https://github.com/apache/hudi/pull/18302#discussion_r2963314247


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java:
##########
@@ -289,6 +293,74 @@ private int doPurgePendingInstant(JavaSparkContext jsc) 
throws Exception {
     return 0;
   }
 
+  /**
+   * Returns the instant times of all pending clustering plans that target any 
of the given partitions.
+   *
+   * @param metaClient the table meta client
+   * @param partitions list of partition paths to check against pending 
clustering plans
+   * @return list of clustering instant times targeting the given partitions
+   */
+  public static List<String> getPendingClusteringInstantsForPartitions(
+      HoodieTableMetaClient metaClient,
+      List<String> partitions) {
+    Set<String> partitionSet = partitions.stream().collect(Collectors.toSet());
+    return ClusteringUtils.getAllPendingClusteringPlans(metaClient)
+        .filter(planPair -> {
+          HoodieClusteringPlan plan = planPair.getRight();
+          return plan.getInputGroups().stream()
+              .flatMap(group -> group.getSlices().stream())
+              .map(slice -> slice.getPartitionPath())
+              .anyMatch(partitionSet::contains);
+        })
+        .map(planPair -> planPair.getLeft().requestedTime())
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Rolls back pending clustering instants that target any of the given 
partitions,
+   * are eligible for rollback (config enabled, old enough, and a clustering 
instant),
+   * and whose heartbeat has expired (indicating the clustering job is no 
longer alive).
+   *
+   * @param client     the write client to use for rollback operations
+   * @param metaClient the table meta client
+   * @param partitions list of partition paths to check against pending 
clustering plans
+   */
+  public static void rollbackFailedClusteringForPartitions(
+      SparkRDDWriteClient<?> client,
+      HoodieTableMetaClient metaClient,
+      List<String> partitions) {
+    long maxAllowableHeartbeatIntervalInMs = 
client.getConfig().getHoodieClientHeartbeatIntervalInMs()
+        * client.getConfig().getHoodieClientHeartbeatTolerableMisses();
+    String basePath = metaClient.getBasePath().toString();
+
+    getPendingClusteringInstantsForPartitions(metaClient, partitions).stream()
+        .filter(instantTime -> {
+          HoodieInstant instant = metaClient.getInstantGenerator()
+              .createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.CLUSTERING_ACTION, instantTime);

Review Comment:
   Based on other changes, I avoided some redundant heartbeat related checks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to