codope commented on code in PR #4958:
URL: https://github.com/apache/hudi/pull/4958#discussion_r917481111


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##########
@@ -67,6 +77,61 @@ public HoodieData<WriteStatus> 
updateLocation(HoodieData<WriteStatus> writeStatu
                                                 HoodieEngineContext context,
                                                 HoodieTable hoodieTable)
       throws HoodieIndexException {
+    throw new HoodieIndexException("Consistent hashing index does not support 
update location without the instant parameter");
+  }
+
+  /**
+   * Persist hashing metadata to storage. Only clustering operations will 
modify the metadata.
+   * For example, splitting & merging buckets, or just sorting and producing a 
new bucket.
+   */
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> 
writeStatuses,
+                                                HoodieEngineContext context,
+                                                HoodieTable hoodieTable,
+                                                String instantTime)
+      throws HoodieIndexException {
+    HoodieInstant instant = 
hoodieTable.getMetaClient().getActiveTimeline().findInstantsAfterOrEquals(instantTime,
 1).firstInstant().get();
+    ValidationUtils.checkState(instant.getTimestamp().equals(instantTime), 
"Cannot get the same instant, instantTime: " + instantTime);
+    if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+      return writeStatuses;
+    }
+
+    // Double-check if it is a clustering operation by trying to obtain the 
clustering plan
+    Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlanPair =
+        ClusteringUtils.getClusteringPlan(hoodieTable.getMetaClient(), 
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime));
+    if (!instantPlanPair.isPresent()) {
+      return writeStatuses;
+    }
+
+    HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+    
HoodieJavaRDD.getJavaRDD(context.parallelize(plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList())))
+        .mapToPair(m -> new 
Tuple2<>(m.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY),
 m)
+    ).groupByKey().foreach((input) -> {
+      // Process each partition
+      String partition = input._1();
+      List<ConsistentHashingNode> childNodes = new ArrayList<>();
+      int seqNo = 0;

Review Comment:
   Got it 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to