This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d0502567e7 [HUDI-6846] Fix a bug of consistent bucket index 
clustering (#9679)
5d0502567e7 is described below

commit 5d0502567e7e9f2ef5cf062cf7d1db80ad3057f8
Author: Bingeng Huang <[email protected]>
AuthorDate: Wed Sep 13 10:01:04 2023 +0800

    [HUDI-6846] Fix a bug of consistent bucket index clustering (#9679)
---
 .../hudi/index/bucket/ConsistentBucketIdentifier.java     | 15 +++++++++++++++
 ...BaseConsistentHashingBucketClusteringPlanStrategy.java |  8 ++++++--
 .../TestSparkConsistentBucketClusteringPlanStrategy.java  | 13 +++++++++++++
 3 files changed, 34 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java
index af40ff50055..61fabf99403 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java
@@ -115,6 +115,21 @@ public class ConsistentBucketIdentifier extends 
BucketIdentifier {
     return headMap.isEmpty() ? ring.lastEntry().getValue() : 
headMap.get(headMap.lastKey());
   }
 
+  /**
+   * Get the latter node of the given node (inferred from file id).
+   */
+  public ConsistentHashingNode getLatterBucket(String fileId) {
+    return getLatterBucket(getBucketByFileId(fileId).getValue());
+  }
+
+  /**
+   * Get the latter node of the given node (inferred from hash value).
+   */
+  public ConsistentHashingNode getLatterBucket(int hashValue) {
+    SortedMap<Integer, ConsistentHashingNode> tailMap = 
ring.tailMap(hashValue, false);
+    return tailMap.isEmpty() ? ring.firstEntry().getValue() : 
tailMap.get(tailMap.firstKey());
+  }
+
   public List<ConsistentHashingNode> mergeBucket(List<String> fileIds) {
     ValidationUtils.checkArgument(fileIds.size() >= 2, "At least two file 
groups should be provided for merging");
     // Get nodes using fileIds
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
index 49ab5f181ad..af3c00d3d8e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
@@ -253,7 +253,9 @@ public abstract class 
BaseConsistentHashingBucketClusteringPlanStrategy<T extend
         boolean forward = k == 1;
         do {
           int nextIdx = forward ? (rangeIdx[k] + 1 < fileSlices.size() ? 
rangeIdx[k] + 1 : 0) : (rangeIdx[k] >= 1 ? rangeIdx[k] - 1 : fileSlices.size() 
- 1);
-          boolean isNeighbour = 
identifier.getBucketByFileId(fileSlices.get(nextIdx).getFileId()) == 
identifier.getFormerBucket(fileSlices.get(rangeIdx[k]).getFileId());
+          ConsistentHashingNode bucketOfNextFile = 
identifier.getBucketByFileId(fileSlices.get(nextIdx).getFileId());
+          ConsistentHashingNode nextBucket = forward ? 
identifier.getLatterBucket(fileSlices.get(rangeIdx[k]).getFileId()) : 
identifier.getFormerBucket(fileSlices.get(rangeIdx[k]).getFileId());
+          boolean isNeighbour = bucketOfNextFile == nextBucket;
           /**
            * Merge condition:
            * 1. there is still slot to merge bucket
@@ -261,7 +263,9 @@ public abstract class 
BaseConsistentHashingBucketClusteringPlanStrategy<T extend
            * 3. the previous file slice and current file slice are neighbour 
in the hash ring
            * 4. Both the total file size up to now and the previous file slice 
size are smaller than merge size threshold
            */
-          if (remainingMergeSlot == 0 || added[nextIdx] || !isNeighbour || 
totalSize > mergeSize || fileSlices.get(nextIdx).getTotalFileSize() > 
mergeSize) {
+          if (remainingMergeSlot == 0 || added[nextIdx] || !isNeighbour || 
totalSize > mergeSize || fileSlices.get(nextIdx).getTotalFileSize() > mergeSize
+              || nextIdx == rangeIdx[1 - k] // if start equal to end after 
update range
+          ) {
             break;
           }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkConsistentBucketClusteringPlanStrategy.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkConsistentBucketClusteringPlanStrategy.java
index 598191aa893..38792a13d72 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkConsistentBucketClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkConsistentBucketClusteringPlanStrategy.java
@@ -169,6 +169,19 @@ public class 
TestSparkConsistentBucketClusteringPlanStrategy extends HoodieSpark
     Assertions.assertEquals(ConsistentHashingNode.NodeTag.DELETE, 
nodes.get(0).getTag());
     Assertions.assertEquals(ConsistentHashingNode.NodeTag.REPLACE, 
nodes.get(1).getTag());
     Assertions.assertEquals(metadata.getNodes().get(3).getValue(), 
nodes.get(1).getValue());
+
+    HoodieConsistentHashingMetadata metadata1 = new 
HoodieConsistentHashingMetadata("partition", 4);
+    ConsistentBucketIdentifier identifier1 = new 
ConsistentBucketIdentifier(metadata1);
+
+    int[] fsSize1 = {mergeSize / 4, mergeSize / 4, maxFileSize, mergeSize / 4};
+    List<FileSlice> fileSlices1 = IntStream.range(0, 
metadata1.getNodes().size()).mapToObj(
+        i -> 
createFileSliceWithSize(metadata1.getNodes().get(i).getFileIdPrefix(), 
fsSize1[i] / 2, fsSize1[i] / 2)
+    ).collect(Collectors.toList());
+
+    Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>> res1 = 
planStrategy.buildMergeClusteringGroup(identifier1,
+        fileSlices1.stream().filter(fs -> fs.getTotalFileSize() < 
mergeSize).collect(Collectors.toList()), 4);
+    Assertions.assertEquals(1, res1.getLeft().size(), "should have 1 
clustering group");
+    Assertions.assertEquals(3, res1.getLeft().get(0).getSlices().size(), 
"should have 3 input files");
   }
 
   private FileSlice createFileSliceWithSize(String fileIdPfx, long 
baseFileSize, long totalLogFileSize) {

Reply via email to