This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e662f9828d3 feat (flink): improvement bucket assignment for MOR with 
bucket index (#18444)
3e662f9828d3 is described below

commit 3e662f9828d34d516adbc42c1d78326d93e7699e
Author: Peter Huang <[email protected]>
AuthorDate: Wed Apr 1 19:02:36 2026 -0700

    feat (flink): improvement bucket assignment for MOR with bucket index 
(#18444)
---
 .../source/split/assign/HoodieSplitAssigners.java  |   2 +-
 .../split/assign/HoodieSplitBucketAssigner.java    |  23 +-
 .../split/TestDefaultHoodieSplitProvider.java      |   3 +-
 .../assign/TestHoodieSplitBucketAssigner.java      | 401 +++++++++++----------
 4 files changed, 223 insertions(+), 206 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitAssigners.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitAssigners.java
index e50751f79ab4..a6954c977f0a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitAssigners.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitAssigners.java
@@ -31,7 +31,7 @@ public class HoodieSplitAssigners {
           int parallelism) {
 
     if (OptionsResolver.isMorWithBucketIndexUpsert(config)) {
-      return new HoodieSplitBucketAssigner(parallelism);
+      return new HoodieSplitBucketAssigner(parallelism, config);
     } else if (OptionsResolver.isAppendMode(config)) {
       return new HoodieSplitNumberAssigner(parallelism);
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitBucketAssigner.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitBucketAssigner.java
index c13d28b03e75..a5d675b35102 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitBucketAssigner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitBucketAssigner.java
@@ -18,7 +18,12 @@
 
 package org.apache.hudi.source.split.assign;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.hash.BucketIndexUtil;
+import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
 import org.apache.hudi.source.split.HoodieSourceSplit;
 
 /**
@@ -26,24 +31,30 @@ import org.apache.hudi.source.split.HoodieSourceSplit;
  * source splits to task IDs using bucket index information for co-location.
  */
 public class HoodieSplitBucketAssigner implements HoodieSplitAssigner {
-  private final int parallelism;
+  private final NumBucketsFunction numBucketsFunction;
+  private Functions.Function3<Integer, String, Integer, Integer> 
partitionIndexFunc;
 
   /**
-   * Creates a new HoodieSplitBucketAssigner.
+   * Creates a new HoodieSplitBucketAssigner using bucket index configuration.
    *
    * @param parallelism the number of parallel tasks (must be positive)
+   * @param conf        Flink configuration containing bucket index settings
    * @throws IllegalArgumentException if parallelism is less than or equal to 0
    */
-  public HoodieSplitBucketAssigner(int parallelism) {
+  public HoodieSplitBucketAssigner(int parallelism, Configuration conf) {
     if (parallelism <= 0) {
       throw new IllegalArgumentException("Parallelism must be positive, but 
was: " + parallelism);
     }
-    this.parallelism = parallelism;
+    this.numBucketsFunction = new 
NumBucketsFunction(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS),
+            conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_RULE), 
conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS));
+    this.partitionIndexFunc = 
BucketIndexUtil.getPartitionIndexFunc(parallelism);
   }
 
   @Override
   public int assign(HoodieSourceSplit split) {
-    int bucketId = BucketIdentifier.bucketIdFromFileId(split.getFileId());
-    return bucketId % parallelism;
+
+    int curBucket = BucketIdentifier.bucketIdFromFileId(split.getFileId());
+    int numBuckets = 
numBucketsFunction.getNumBuckets(split.getPartitionPath());
+    return this.partitionIndexFunc.apply(numBuckets, split.getPartitionPath(), 
curBucket);
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
index 5571e471819c..6ea01d26e5d9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.source.split;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.hudi.common.util.Option;
 
 import org.apache.hudi.source.split.assign.HoodieSplitBucketAssigner;
@@ -564,7 +565,7 @@ public class TestDefaultHoodieSplitProvider {
   public void testGetNextWithBucketAssigner() {
     // Test with HoodieSplitBucketAssigner
     HoodieSplitBucketAssigner assigner =
-        new HoodieSplitBucketAssigner(4);
+        new HoodieSplitBucketAssigner(4, new Configuration());
     DefaultHoodieSplitProvider provider = new 
DefaultHoodieSplitProvider(assigner);
 
     // Create splits with bucket-encoded file IDs
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/assign/TestHoodieSplitBucketAssigner.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/assign/TestHoodieSplitBucketAssigner.java
index 2d3da5b143a3..482d0aada983 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/assign/TestHoodieSplitBucketAssigner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/assign/TestHoodieSplitBucketAssigner.java
@@ -18,277 +18,282 @@
 
 package org.apache.hudi.source.split.assign;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.source.split.HoodieSourceSplit;
 
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Test cases for {@link HoodieSplitBucketAssigner}.
+ *
+ * <p>The assignment formula is:
+ * <pre>
+ *   partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) % parallelism 
* numBuckets
+ *   taskId = (partitionIndex + curBucket) % parallelism
+ * </pre>
+ * When the partition path is empty (hashCode = 0), this simplifies to {@code 
curBucket % parallelism}.
  */
 public class TestHoodieSplitBucketAssigner {
 
-  @Test
-  public void testAssignWithSingleParallelism() {
-    HoodieSplitBucketAssigner assigner = new HoodieSplitBucketAssigner(1);
-
-    HoodieSourceSplit split1 = createTestSplitWithBucket(0, 0);
-    HoodieSourceSplit split2 = createTestSplitWithBucket(1, 5);
-    HoodieSourceSplit split3 = createTestSplitWithBucket(2, 10);
-
-    assertEquals(0, assigner.assign(split1), "All splits should be assigned to 
task 0");
-    assertEquals(0, assigner.assign(split2), "All splits should be assigned to 
task 0");
-    assertEquals(0, assigner.assign(split3), "All splits should be assigned to 
task 0");
-  }
+  // -------------------------------------------------------------------------
+  // Constructor validation
+  // -------------------------------------------------------------------------
 
   @Test
-  public void testAssignWithMultipleParallelism() {
-    HoodieSplitBucketAssigner assigner = new HoodieSplitBucketAssigner(3);
-
-    HoodieSourceSplit split0 = createTestSplitWithBucket(0, 0);
-    HoodieSourceSplit split1 = createTestSplitWithBucket(1, 1);
-    HoodieSourceSplit split2 = createTestSplitWithBucket(2, 2);
-    HoodieSourceSplit split3 = createTestSplitWithBucket(3, 3);
-    HoodieSourceSplit split4 = createTestSplitWithBucket(4, 4);
-    HoodieSourceSplit split5 = createTestSplitWithBucket(5, 5);
-
-    assertEquals(0, assigner.assign(split0), "Bucket 0 should be assigned to 
task 0");
-    assertEquals(1, assigner.assign(split1), "Bucket 1 should be assigned to 
task 1");
-    assertEquals(2, assigner.assign(split2), "Bucket 2 should be assigned to 
task 2");
-    assertEquals(0, assigner.assign(split3), "Bucket 3 should be assigned to 
task 0 (round-robin)");
-    assertEquals(1, assigner.assign(split4), "Bucket 4 should be assigned to 
task 1 (round-robin)");
-    assertEquals(2, assigner.assign(split5), "Bucket 5 should be assigned to 
task 2 (round-robin)");
+  public void testConstructorWithZeroParallelismThrows() {
+    IllegalArgumentException ex = assertThrows(
+        IllegalArgumentException.class,
+        () -> new HoodieSplitBucketAssigner(0, new Configuration()));
+    assertEquals("Parallelism must be positive, but was: 0", ex.getMessage());
   }
 
   @Test
-  public void testAssignBucketBasedCoLocation() {
-    int parallelism = 4;
-    HoodieSplitBucketAssigner assigner = new 
HoodieSplitBucketAssigner(parallelism);
-
-    // Splits with the same bucket ID should always be assigned to the same 
task
-    HoodieSourceSplit split1a = createTestSplitWithBucket(0, 5);
-    HoodieSourceSplit split1b = createTestSplitWithBucket(1, 5);
-    HoodieSourceSplit split1c = createTestSplitWithBucket(2, 5);
-
-    int taskId = assigner.assign(split1a);
-    assertEquals(taskId, assigner.assign(split1b),
-        "Splits with same bucket should be co-located on same task");
-    assertEquals(taskId, assigner.assign(split1c),
-        "Splits with same bucket should be co-located on same task");
-    assertEquals(1, taskId, "Bucket 5 % 4 = 1");
+  public void testConstructorWithNegativeParallelismThrows() {
+    IllegalArgumentException ex = assertThrows(
+        IllegalArgumentException.class,
+        () -> new HoodieSplitBucketAssigner(-1, new Configuration()));
+    assertEquals("Parallelism must be positive, but was: -1", ex.getMessage());
   }
 
   @Test
-  public void testAssignDifferentBucketsDistributed() {
-    HoodieSplitBucketAssigner assigner = new HoodieSplitBucketAssigner(5);
-
-    // Different buckets should be distributed across different tasks
-    HoodieSourceSplit split0 = createTestSplitWithBucket(0, 0);
-    HoodieSourceSplit split1 = createTestSplitWithBucket(1, 1);
-    HoodieSourceSplit split2 = createTestSplitWithBucket(2, 2);
-
-    assertNotEquals(assigner.assign(split0), assigner.assign(split1),
-        "Different buckets should typically be on different tasks");
-    assertNotEquals(assigner.assign(split1), assigner.assign(split2),
-        "Different buckets should typically be on different tasks");
+  public void testConstructorWithLargeNegativeParallelismThrows() {
+    IllegalArgumentException ex = assertThrows(
+        IllegalArgumentException.class,
+        () -> new HoodieSplitBucketAssigner(-100, new Configuration()));
+    assertEquals("Parallelism must be positive, but was: -100", 
ex.getMessage());
   }
 
-  @Test
-  public void testAssignWithHighParallelism() {
-    HoodieSplitBucketAssigner assigner = new HoodieSplitBucketAssigner(100);
-
-    HoodieSourceSplit split0 = createTestSplitWithBucket(0, 0);
-    HoodieSourceSplit split50 = createTestSplitWithBucket(1, 50);
-    HoodieSourceSplit split99 = createTestSplitWithBucket(2, 99);
-    HoodieSourceSplit split100 = createTestSplitWithBucket(3, 100);
-
-    assertEquals(0, assigner.assign(split0));
-    assertEquals(50, assigner.assign(split50));
-    assertEquals(99, assigner.assign(split99));
-    assertEquals(0, assigner.assign(split100), "Bucket 100 should wrap around 
to task 0");
-  }
+  // -------------------------------------------------------------------------
+  // Single parallelism — everything maps to task 0
+  // -------------------------------------------------------------------------
 
   @Test
-  public void testAssignSameBucketMultipleTimes() {
-    HoodieSplitBucketAssigner assigner = new HoodieSplitBucketAssigner(5);
-
-    HoodieSourceSplit split1 = createTestSplitWithBucket(0, 7);
-    HoodieSourceSplit split2 = createTestSplitWithBucket(1, 7);
-
-    // Assigning splits with the same bucket multiple times should return the 
same task ID
-    int taskId = assigner.assign(split1);
-    assertEquals(2, taskId, "Bucket 7 % 5 = 2");
-    assertEquals(taskId, assigner.assign(split2),
-        "Same bucket should always return same task ID");
-    assertEquals(taskId, assigner.assign(split1),
-        "Same bucket should always return same task ID");
-  }
+  public void testSingleParallelismAlwaysAssignsToTaskZero() {
+    HoodieSplitBucketAssigner assigner = new HoodieSplitBucketAssigner(1, new 
Configuration());
 
-  @Test
-  public void testConstructorWithZeroParallelism() {
-    IllegalArgumentException exception = assertThrows(
-        IllegalArgumentException.class,
-        () -> new HoodieSplitBucketAssigner(0),
-        "Should throw exception for zero parallelism"
-    );
-    assertEquals("Parallelism must be positive, but was: 0", 
exception.getMessage());
+    for (int bucket = 0; bucket < 10; bucket++) {
+      assertEquals(0, assigner.assign(splitWithBucket(bucket, "")),
+          "All splits must go to task 0 when parallelism is 1");
+    }
   }
 
-  @Test
-  public void testConstructorWithNegativeParallelism() {
-    IllegalArgumentException exception = assertThrows(
-        IllegalArgumentException.class,
-        () -> new HoodieSplitBucketAssigner(-1),
-        "Should throw exception for negative parallelism"
-    );
-    assertEquals("Parallelism must be positive, but was: -1", 
exception.getMessage());
-  }
+  // -------------------------------------------------------------------------
+  // Empty partition path: taskId = curBucket % parallelism
+  // When partition hashCode is 0, partitionIndex = 0, so the formula
+  // reduces to a simple modulo on the bucket ID.
+  // -------------------------------------------------------------------------
 
   @Test
-  public void testConstructorWithNegativeLargeParallelism() {
-    IllegalArgumentException exception = assertThrows(
-        IllegalArgumentException.class,
-        () -> new HoodieSplitBucketAssigner(-100),
-        "Should throw exception for large negative parallelism"
-    );
-    assertEquals("Parallelism must be positive, but was: -100", 
exception.getMessage());
+  public void testAssignWithEmptyPartitionUsesSimpleModulo() {
+    int parallelism = 5;
+    HoodieSplitBucketAssigner assigner = new 
HoodieSplitBucketAssigner(parallelism, new Configuration());
+
+    for (int bucket = 0; bucket < 20; bucket++) {
+      int expected = bucket % parallelism;
+      assertEquals(expected, assigner.assign(splitWithBucket(bucket, "")),
+          "With empty partition, taskId must equal curBucket % parallelism");
+    }
   }
 
   @Test
-  public void testAssignmentConsistency() {
-    HoodieSplitBucketAssigner assigner1 = new HoodieSplitBucketAssigner(5);
-    HoodieSplitBucketAssigner assigner2 = new HoodieSplitBucketAssigner(5);
-
-    // Two assigners with same parallelism should assign splits identically
-    for (int i = 0; i < 20; i++) {
-      HoodieSourceSplit split = createTestSplitWithBucket(i, i);
-      assertEquals(
-          assigner1.assign(split),
-          assigner2.assign(split),
-          "Different assigner instances should assign same bucket to same task"
-      );
+  public void testTaskIdIsAlwaysInValidRange() {
+    int parallelism = 7;
+    HoodieSplitBucketAssigner assigner = new 
HoodieSplitBucketAssigner(parallelism, new Configuration());
+
+    for (int bucket = 0; bucket < 50; bucket++) {
+      int taskId = assigner.assign(splitWithBucket(bucket, ""));
+      assertTrue(taskId >= 0 && taskId < parallelism,
+          "taskId must be in [0, parallelism): got " + taskId);
     }
   }
 
+  // -------------------------------------------------------------------------
+  // Co-location: same (partition, bucket) always lands on the same task
+  // -------------------------------------------------------------------------
+
   @Test
-  public void testBucketCoLocationAcrossMultipleSplits() {
-    HoodieSplitBucketAssigner assigner = new HoodieSplitBucketAssigner(8);
-
-    // Create multiple splits for the same bucket
-    int bucketId = 15;
-    HoodieSourceSplit[] splits = new HoodieSourceSplit[10];
-    for (int i = 0; i < 10; i++) {
-      splits[i] = createTestSplitWithBucket(i, bucketId);
+  public void testSameBucketSamePartitionAlwaysCoLocated() {
+    int parallelism = 4;
+    HoodieSplitBucketAssigner assigner = new 
HoodieSplitBucketAssigner(parallelism, new Configuration());
+
+    int[] bucketsToTest = {0, 3, 7, 15};
+    String[] partitions = {"", "2024/01/01", "country=US"};
+
+    for (String partition : partitions) {
+      for (int bucket : bucketsToTest) {
+        int first = assigner.assign(splitWithBucket(bucket, partition));
+        for (int i = 0; i < 5; i++) {
+          assertEquals(first, assigner.assign(splitWithBucket(bucket, 
partition)),
+              "Same bucket+partition must always map to the same task");
+        }
+      }
     }
+  }
 
-    // All splits with the same bucket should be assigned to the same task
-    int expectedTaskId = bucketId % 8;
-    for (HoodieSourceSplit split : splits) {
-      assertEquals(expectedTaskId, assigner.assign(split),
-          "All splits for bucket " + bucketId + " should be on task " + 
expectedTaskId);
-    }
+  @Test
+  public void testCoLocationHoldsAcrossDifferentSplitNumbers() {
+    HoodieSplitBucketAssigner assigner = new HoodieSplitBucketAssigner(6, new 
Configuration());
+    int bucket = 11;
+    String partition = "region=EU";
+
+    // Different split numbers, same bucket and partition
+    int expected = assigner.assign(split(0, bucket, partition));
+    assertEquals(expected, assigner.assign(split(100, bucket, partition)));
+    assertEquals(expected, assigner.assign(split(999, bucket, partition)));
   }
 
+  // -------------------------------------------------------------------------
+  // Custom numBuckets via config
+  // -------------------------------------------------------------------------
+
   @Test
-  public void testManyBucketsDistribution() {
-    int parallelism = 4;
-    HoodieSplitBucketAssigner assigner = new 
HoodieSplitBucketAssigner(parallelism);
+  public void testCustomNumBucketsIsUsed() {
+    // With a non-trivial partition hash, changing numBuckets changes the 
assignment.
+    // Partition "p1" has hashCode 3521.
+    // parallelism=3, numBuckets=4:
+    //   partitionIndex = (3521 % 3) * 4 = 2 * 4 = 8
+    //   taskId(bucket=0) = (8 + 0) % 3 = 2
+    //   taskId(bucket=1) = (8 + 1) % 3 = 0
+    //   taskId(bucket=2) = (8 + 2) % 3 = 1
+    int parallelism = 3;
+    int numBuckets = 4;
+    String partition = "p1"; // hashCode = 112*31 + 49 = 3521
+
+    Configuration conf = new Configuration();
+    conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, numBuckets);
+    HoodieSplitBucketAssigner assigner = new 
HoodieSplitBucketAssigner(parallelism, conf);
+
+    assertEquals(expectedTaskId(partition, 0, numBuckets, parallelism), 
assigner.assign(split(0, 0, partition)));
+    assertEquals(expectedTaskId(partition, 1, numBuckets, parallelism), 
assigner.assign(split(1, 1, partition)));
+    assertEquals(expectedTaskId(partition, 2, numBuckets, parallelism), 
assigner.assign(split(2, 2, partition)));
+  }
 
-    // Test distribution of many buckets
-    int[] taskCounts = new int[parallelism];
-    int totalBuckets = 100;
+  @Test
+  public void testDifferentNumBucketsProduceDifferentAssignments() {
+    // Use a partition with non-zero hash so numBuckets affects the result.
+    String partition = "country=DE"; // non-zero hashCode guaranteed
+    int parallelism = 5;
+    int bucket = 3;
 
-    for (int bucketId = 0; bucketId < totalBuckets; bucketId++) {
-      HoodieSourceSplit split = createTestSplitWithBucket(bucketId, bucketId);
-      int taskId = assigner.assign(split);
-      taskCounts[taskId]++;
+    Configuration conf4 = new Configuration();
+    conf4.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
+    Configuration conf8 = new Configuration();
+    conf8.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 8);
 
-      // Verify task ID is within valid range
-      assertEquals(bucketId % parallelism, taskId,
-          "Bucket should be assigned using modulo operation");
-    }
+    int taskWith4 = new HoodieSplitBucketAssigner(parallelism, 
conf4).assign(split(0, bucket, partition));
+    int taskWith8 = new HoodieSplitBucketAssigner(parallelism, 
conf8).assign(split(0, bucket, partition));
 
-    // Verify even distribution
-    for (int count : taskCounts) {
-      assertEquals(totalBuckets / parallelism, count,
-          "Buckets should be evenly distributed");
-    }
+    // Both must be valid task IDs, and at least verify they're computed from 
the right formula
+    assertEquals(expectedTaskId(partition, bucket, 4, parallelism), taskWith4);
+    assertEquals(expectedTaskId(partition, bucket, 8, parallelism), taskWith8);
   }
 
+  // -------------------------------------------------------------------------
+  // Full formula verification with non-trivial partition hash
+  // -------------------------------------------------------------------------
+
   @Test
-  public void testFileIdParsing() {
-    HoodieSplitBucketAssigner assigner = new HoodieSplitBucketAssigner(10);
+  public void testFullFormulaWithNonTrivialPartitionHash() {
+    int parallelism = 4;
+    int numBuckets = 4;
+    String partition = "dt=2024-01-15";
 
-    // Test with various file ID formats that have bucket ID in first 8 
characters
-    HoodieSourceSplit split1 = createTestSplit(0, 
"00000005-0000-0000-0000-000000000000");
-    HoodieSourceSplit split2 = createTestSplit(1, 
"00000012-1234-5678-abcd-ef0123456789");
-    HoodieSourceSplit split3 = createTestSplit(2, 
"00000099-aaaa-bbbb-cccc-dddddddddddd");
+    Configuration conf = new Configuration();
+    conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, numBuckets);
+    HoodieSplitBucketAssigner assigner = new 
HoodieSplitBucketAssigner(parallelism, conf);
 
-    assertEquals(5, assigner.assign(split1), "Should extract bucket 5 from 
file ID");
-    assertEquals(2, assigner.assign(split2), "Should extract bucket 12 and 
assign to task 2 (12 % 10)");
-    assertEquals(9, assigner.assign(split3), "Should extract bucket 99 and 
assign to task 9 (99 % 10)");
+    for (int bucket = 0; bucket < numBuckets; bucket++) {
+      int expected = expectedTaskId(partition, bucket, numBuckets, 
parallelism);
+      assertEquals(expected, assigner.assign(split(bucket, bucket, partition)),
+          "Task ID must match the formula for bucket " + bucket);
+    }
   }
 
+  // -------------------------------------------------------------------------
+  // Bucket ID parsing from file ID
+  // -------------------------------------------------------------------------
+
   @Test
-  public void testDifferentSplitNumbersSameBucket() {
-    HoodieSplitBucketAssigner assigner = new HoodieSplitBucketAssigner(5);
-
-    // Different split numbers but same bucket should go to same task
-    HoodieSourceSplit split1 = createTestSplitWithBucket(0, 8);
-    HoodieSourceSplit split2 = createTestSplitWithBucket(100, 8);
-    HoodieSourceSplit split3 = createTestSplitWithBucket(999, 8);
-
-    int taskId = assigner.assign(split1);
-    assertEquals(taskId, assigner.assign(split2),
-        "Split number should not affect bucket-based assignment");
-    assertEquals(taskId, assigner.assign(split3),
-        "Split number should not affect bucket-based assignment");
-    assertEquals(3, taskId, "Bucket 8 % 5 = 3");
+  public void testBucketIdParsedFromFirst8CharsOfFileId() {
+    // bucketIdFromFileId parses the first 8 characters as an integer
+    int parallelism = 10;
+    HoodieSplitBucketAssigner assigner = new 
HoodieSplitBucketAssigner(parallelism, new Configuration());
+
+    // File ID "00000005-..." → bucket 5 → task 5 % 10 = 5 (empty partition)
+    HoodieSourceSplit split5 = createTestSplit(0, 
"00000005-0000-0000-0000-000000000000", "");
+    assertEquals(5, assigner.assign(split5));
+
+    // File ID "00000012-..." → bucket 12 → task 12 % 10 = 2
+    HoodieSourceSplit split12 = createTestSplit(1, 
"00000012-1234-5678-abcd-ef0123456789", "");
+    assertEquals(2, assigner.assign(split12));
+
+    // File ID "00000099-..." → bucket 99 → task 99 % 10 = 9
+    HoodieSourceSplit split99 = createTestSplit(2, 
"00000099-aaaa-bbbb-cccc-dddddddddddd", "");
+    assertEquals(9, assigner.assign(split99));
   }
 
-  @Test
-  public void testComparisonWithDefaultAssigner() {
-    int parallelism = 5;
-    HoodieSplitBucketAssigner bucketAssigner = new 
HoodieSplitBucketAssigner(parallelism);
-    HoodieSplitNumberAssigner defaultAssigner = new 
HoodieSplitNumberAssigner(parallelism);
+  // -------------------------------------------------------------------------
+  // Consistency between assigner instances
+  // -------------------------------------------------------------------------
 
-    // Create splits where bucket ID != split number
-    HoodieSourceSplit split1 = createTestSplitWithBucket(0, 3);
-    HoodieSourceSplit split2 = createTestSplitWithBucket(3, 0);
+  @Test
+  public void testTwoAssignersWithSameConfigProduceSameResults() {
+    Configuration conf = new Configuration();
+    conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 8);
 
-    // Bucket assigner uses bucket ID from file ID
-    assertEquals(3, bucketAssigner.assign(split1), "Should use bucket 3");
-    assertEquals(0, bucketAssigner.assign(split2), "Should use bucket 0");
+    HoodieSplitBucketAssigner a1 = new HoodieSplitBucketAssigner(5, conf);
+    HoodieSplitBucketAssigner a2 = new HoodieSplitBucketAssigner(5, conf);
 
-    // Default assigner uses split number
-    assertEquals(0, defaultAssigner.assign(split1), "Should use split number 
0");
-    assertEquals(3, defaultAssigner.assign(split2), "Should use split number 
3");
+    for (int bucket = 0; bucket < 30; bucket++) {
+      HoodieSourceSplit split = splitWithBucket(bucket, "region=APAC");
+      assertEquals(a1.assign(split), a2.assign(split),
+          "Two assigners with same config must produce identical results");
+    }
   }
 
+  // -------------------------------------------------------------------------
+  // Helpers
+  // -------------------------------------------------------------------------
+
   /**
-   * Creates a test split with a specific bucket ID encoded in the file ID.
-   * File ID format: "{8-digit bucket ID}-{rest of UUID}"
+   * Computes the expected task ID using the same formula as {@link 
HoodieSplitBucketAssigner}:
+   * <pre>
+   *   partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) % 
parallelism * numBuckets
+   *   taskId = (partitionIndex + curBucket) % parallelism
+   * </pre>
    */
-  private HoodieSourceSplit createTestSplitWithBucket(int splitNum, int 
bucketId) {
+  private static int expectedTaskId(String partition, int curBucket, int 
numBuckets, int parallelism) {
+    long partitionIndex = (long) ((partition.hashCode() & Integer.MAX_VALUE) % 
parallelism) * numBuckets;
+    return (int) ((partitionIndex + curBucket) % parallelism);
+  }
+
+  /** Creates a split whose file ID encodes {@code bucketId} in the first 8 
characters. */
+  private static HoodieSourceSplit splitWithBucket(int bucketId, String 
partitionPath) {
+    return split(bucketId, bucketId, partitionPath);
+  }
+
+  /** Creates a split with an explicit split number, bucket ID, and partition 
path. */
+  private static HoodieSourceSplit split(int splitNum, int bucketId, String 
partitionPath) {
     String fileId = String.format("%08d-0000-0000-0000-000000000000", 
bucketId);
-    return createTestSplit(splitNum, fileId);
+    return createTestSplit(splitNum, fileId, partitionPath);
   }
 
-  private HoodieSourceSplit createTestSplit(int splitNum, String fileId) {
+  private static HoodieSourceSplit createTestSplit(int splitNum, String 
fileId, String partitionPath) {
     return new HoodieSourceSplit(
         splitNum,
         "basePath_" + splitNum,
         Option.empty(),
         "/table/path",
-        "/table/path/partition1",
+        partitionPath,
         "read_optimized",
         "19700101000000000",
         fileId,
-        Option.empty()
-    );
+        Option.empty());
   }
 }

Reply via email to