This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5d0502567e7 [HUDI-6846] Fix a bug of consistent bucket index
clustering (#9679)
5d0502567e7 is described below
commit 5d0502567e7e9f2ef5cf062cf7d1db80ad3057f8
Author: Bingeng Huang <[email protected]>
AuthorDate: Wed Sep 13 10:01:04 2023 +0800
[HUDI-6846] Fix a bug of consistent bucket index clustering (#9679)
---
.../hudi/index/bucket/ConsistentBucketIdentifier.java | 15 +++++++++++++++
...BaseConsistentHashingBucketClusteringPlanStrategy.java | 8 ++++++--
.../TestSparkConsistentBucketClusteringPlanStrategy.java | 13 +++++++++++++
3 files changed, 34 insertions(+), 2 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java
index af40ff50055..61fabf99403 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java
@@ -115,6 +115,21 @@ public class ConsistentBucketIdentifier extends
BucketIdentifier {
return headMap.isEmpty() ? ring.lastEntry().getValue() :
headMap.get(headMap.lastKey());
}
+ /**
+ * Get the latter node of the given node (inferred from file id).
+ */
+ public ConsistentHashingNode getLatterBucket(String fileId) {
+ return getLatterBucket(getBucketByFileId(fileId).getValue());
+ }
+
+ /**
+ * Get the latter node of the given node (inferred from hash value).
+ */
+ public ConsistentHashingNode getLatterBucket(int hashValue) {
+ SortedMap<Integer, ConsistentHashingNode> tailMap =
ring.tailMap(hashValue, false);
+ return tailMap.isEmpty() ? ring.firstEntry().getValue() :
tailMap.get(tailMap.firstKey());
+ }
+
public List<ConsistentHashingNode> mergeBucket(List<String> fileIds) {
ValidationUtils.checkArgument(fileIds.size() >= 2, "At least two file
groups should be provided for merging");
// Get nodes using fileIds
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
index 49ab5f181ad..af3c00d3d8e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
@@ -253,7 +253,9 @@ public abstract class
BaseConsistentHashingBucketClusteringPlanStrategy<T extend
boolean forward = k == 1;
do {
int nextIdx = forward ? (rangeIdx[k] + 1 < fileSlices.size() ?
rangeIdx[k] + 1 : 0) : (rangeIdx[k] >= 1 ? rangeIdx[k] - 1 : fileSlices.size()
- 1);
- boolean isNeighbour =
identifier.getBucketByFileId(fileSlices.get(nextIdx).getFileId()) ==
identifier.getFormerBucket(fileSlices.get(rangeIdx[k]).getFileId());
+ ConsistentHashingNode bucketOfNextFile =
identifier.getBucketByFileId(fileSlices.get(nextIdx).getFileId());
+ ConsistentHashingNode nextBucket = forward ?
identifier.getLatterBucket(fileSlices.get(rangeIdx[k]).getFileId()) :
identifier.getFormerBucket(fileSlices.get(rangeIdx[k]).getFileId());
+ boolean isNeighbour = bucketOfNextFile == nextBucket;
/**
* Merge condition:
* 1. there is still slot to merge bucket
@@ -261,7 +263,9 @@ public abstract class
BaseConsistentHashingBucketClusteringPlanStrategy<T extend
* 3. the previous file slice and current file slice are neighbour
in the hash ring
* 4. Both the total file size up to now and the previous file slice
size are smaller than merge size threshold
*/
- if (remainingMergeSlot == 0 || added[nextIdx] || !isNeighbour ||
totalSize > mergeSize || fileSlices.get(nextIdx).getTotalFileSize() >
mergeSize) {
+ if (remainingMergeSlot == 0 || added[nextIdx] || !isNeighbour ||
totalSize > mergeSize || fileSlices.get(nextIdx).getTotalFileSize() > mergeSize
+ || nextIdx == rangeIdx[1 - k] // if start equal to end after
update range
+ ) {
break;
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkConsistentBucketClusteringPlanStrategy.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkConsistentBucketClusteringPlanStrategy.java
index 598191aa893..38792a13d72 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkConsistentBucketClusteringPlanStrategy.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkConsistentBucketClusteringPlanStrategy.java
@@ -169,6 +169,19 @@ public class
TestSparkConsistentBucketClusteringPlanStrategy extends HoodieSpark
Assertions.assertEquals(ConsistentHashingNode.NodeTag.DELETE,
nodes.get(0).getTag());
Assertions.assertEquals(ConsistentHashingNode.NodeTag.REPLACE,
nodes.get(1).getTag());
Assertions.assertEquals(metadata.getNodes().get(3).getValue(),
nodes.get(1).getValue());
+
+ HoodieConsistentHashingMetadata metadata1 = new
HoodieConsistentHashingMetadata("partition", 4);
+ ConsistentBucketIdentifier identifier1 = new
ConsistentBucketIdentifier(metadata1);
+
+ int[] fsSize1 = {mergeSize / 4, mergeSize / 4, maxFileSize, mergeSize / 4};
+ List<FileSlice> fileSlices1 = IntStream.range(0,
metadata1.getNodes().size()).mapToObj(
+ i ->
createFileSliceWithSize(metadata1.getNodes().get(i).getFileIdPrefix(),
fsSize1[i] / 2, fsSize1[i] / 2)
+ ).collect(Collectors.toList());
+
+ Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>> res1 =
planStrategy.buildMergeClusteringGroup(identifier1,
+ fileSlices1.stream().filter(fs -> fs.getTotalFileSize() <
mergeSize).collect(Collectors.toList()), 4);
+ Assertions.assertEquals(1, res1.getLeft().size(), "should have 1
clustering group");
+ Assertions.assertEquals(3, res1.getLeft().get(0).getSlices().size(),
"should have 3 input files");
}
private FileSlice createFileSliceWithSize(String fileIdPfx, long
baseFileSize, long totalLogFileSize) {