codope commented on code in PR #4958:
URL: https://github.com/apache/hudi/pull/4958#discussion_r917481111
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##########
@@ -67,6 +77,61 @@ public HoodieData<WriteStatus>
updateLocation(HoodieData<WriteStatus> writeStatu
HoodieEngineContext context,
HoodieTable hoodieTable)
throws HoodieIndexException {
+ throw new HoodieIndexException("Consistent hashing index does not support
update location without the instant parameter");
+ }
+
+ /**
+ * Persist hashing metadata to storage. Only clustering operations will
modify the metadata.
+ * For example, splitting & merging buckets, or just sorting and producing a
new bucket.
+ */
+ @Override
+ public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus>
writeStatuses,
+ HoodieEngineContext context,
+ HoodieTable hoodieTable,
+ String instantTime)
+ throws HoodieIndexException {
+ HoodieInstant instant =
hoodieTable.getMetaClient().getActiveTimeline().findInstantsAfterOrEquals(instantTime,
1).firstInstant().get();
+ ValidationUtils.checkState(instant.getTimestamp().equals(instantTime),
"Cannot get the same instant, instantTime: " + instantTime);
+ if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+ return writeStatuses;
+ }
+
+ // Double-check if it is a clustering operation by trying to obtain the
clustering plan
+ Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlanPair =
+ ClusteringUtils.getClusteringPlan(hoodieTable.getMetaClient(),
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime));
+ if (!instantPlanPair.isPresent()) {
+ return writeStatuses;
+ }
+
+ HoodieClusteringPlan plan = instantPlanPair.get().getRight();
+
HoodieJavaRDD.getJavaRDD(context.parallelize(plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList())))
+ .mapToPair(m -> new
Tuple2<>(m.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY),
m)
+ ).groupByKey().foreach((input) -> {
+ // Process each partition
+ String partition = input._1();
+ List<ConsistentHashingNode> childNodes = new ArrayList<>();
+ int seqNo = 0;
Review Comment:
Got it 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]