This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3e662f9828d3 feat (flink): improvement bucket assignment for MOR with
bucket index (#18444)
3e662f9828d3 is described below
commit 3e662f9828d34d516adbc42c1d78326d93e7699e
Author: Peter Huang <[email protected]>
AuthorDate: Wed Apr 1 19:02:36 2026 -0700
feat (flink): improvement bucket assignment for MOR with bucket index
(#18444)
---
.../source/split/assign/HoodieSplitAssigners.java | 2 +-
.../split/assign/HoodieSplitBucketAssigner.java | 23 +-
.../split/TestDefaultHoodieSplitProvider.java | 3 +-
.../assign/TestHoodieSplitBucketAssigner.java | 401 +++++++++++----------
4 files changed, 223 insertions(+), 206 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitAssigners.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitAssigners.java
index e50751f79ab4..a6954c977f0a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitAssigners.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitAssigners.java
@@ -31,7 +31,7 @@ public class HoodieSplitAssigners {
int parallelism) {
if (OptionsResolver.isMorWithBucketIndexUpsert(config)) {
- return new HoodieSplitBucketAssigner(parallelism);
+ return new HoodieSplitBucketAssigner(parallelism, config);
} else if (OptionsResolver.isAppendMode(config)) {
return new HoodieSplitNumberAssigner(parallelism);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitBucketAssigner.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitBucketAssigner.java
index c13d28b03e75..a5d675b35102 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitBucketAssigner.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/assign/HoodieSplitBucketAssigner.java
@@ -18,7 +18,12 @@
package org.apache.hudi.source.split.assign;
+import org.apache.flink.configuration.Configuration;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.hash.BucketIndexUtil;
+import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
import org.apache.hudi.source.split.HoodieSourceSplit;
/**
@@ -26,24 +31,30 @@ import org.apache.hudi.source.split.HoodieSourceSplit;
* source splits to task IDs using bucket index information for co-location.
*/
public class HoodieSplitBucketAssigner implements HoodieSplitAssigner {
- private final int parallelism;
+ private final NumBucketsFunction numBucketsFunction;
+ private Functions.Function3<Integer, String, Integer, Integer>
partitionIndexFunc;
/**
- * Creates a new HoodieSplitBucketAssigner.
+ * Creates a new HoodieSplitBucketAssigner using bucket index configuration.
*
* @param parallelism the number of parallel tasks (must be positive)
+ * @param conf Flink configuration containing bucket index settings
* @throws IllegalArgumentException if parallelism is less than or equal to 0
*/
- public HoodieSplitBucketAssigner(int parallelism) {
+ public HoodieSplitBucketAssigner(int parallelism, Configuration conf) {
if (parallelism <= 0) {
throw new IllegalArgumentException("Parallelism must be positive, but
was: " + parallelism);
}
- this.parallelism = parallelism;
+ this.numBucketsFunction = new
NumBucketsFunction(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS),
+ conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_RULE),
conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS));
+ this.partitionIndexFunc =
BucketIndexUtil.getPartitionIndexFunc(parallelism);
}
@Override
public int assign(HoodieSourceSplit split) {
- int bucketId = BucketIdentifier.bucketIdFromFileId(split.getFileId());
- return bucketId % parallelism;
+
+ int curBucket = BucketIdentifier.bucketIdFromFileId(split.getFileId());
+ int numBuckets =
numBucketsFunction.getNumBuckets(split.getPartitionPath());
+ return this.partitionIndexFunc.apply(numBuckets, split.getPartitionPath(),
curBucket);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
index 5571e471819c..6ea01d26e5d9 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitProvider.java
@@ -18,6 +18,7 @@
package org.apache.hudi.source.split;
+import org.apache.flink.configuration.Configuration;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.source.split.assign.HoodieSplitBucketAssigner;
@@ -564,7 +565,7 @@ public class TestDefaultHoodieSplitProvider {
public void testGetNextWithBucketAssigner() {
// Test with HoodieSplitBucketAssigner
HoodieSplitBucketAssigner assigner =
- new HoodieSplitBucketAssigner(4);
+ new HoodieSplitBucketAssigner(4, new Configuration());
DefaultHoodieSplitProvider provider = new
DefaultHoodieSplitProvider(assigner);
// Create splits with bucket-encoded file IDs
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/assign/TestHoodieSplitBucketAssigner.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/assign/TestHoodieSplitBucketAssigner.java
index 2d3da5b143a3..482d0aada983 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/assign/TestHoodieSplitBucketAssigner.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/assign/TestHoodieSplitBucketAssigner.java
@@ -18,277 +18,282 @@
package org.apache.hudi.source.split.assign;
+import org.apache.flink.configuration.Configuration;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.source.split.HoodieSourceSplit;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test cases for {@link HoodieSplitBucketAssigner}.
+ *
+ * <p>The assignment formula is:
+ * <pre>
+ * partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) % parallelism
* numBuckets
+ * taskId = (partitionIndex + curBucket) % parallelism
+ * </pre>
+ * When the partition path is empty (hashCode = 0), this simplifies to {@code
curBucket % parallelism}.
*/
public class TestHoodieSplitBucketAssigner {
- @Test
- public void testAssignWithSingleParallelism() {
- HoodieSplitBucketAssigner assigner = new HoodieSplitBucketAssigner(1);
-
- HoodieSourceSplit split1 = createTestSplitWithBucket(0, 0);
- HoodieSourceSplit split2 = createTestSplitWithBucket(1, 5);
- HoodieSourceSplit split3 = createTestSplitWithBucket(2, 10);
-
- assertEquals(0, assigner.assign(split1), "All splits should be assigned to
task 0");
- assertEquals(0, assigner.assign(split2), "All splits should be assigned to
task 0");
- assertEquals(0, assigner.assign(split3), "All splits should be assigned to
task 0");
- }
+ // -------------------------------------------------------------------------
+ // Constructor validation
+ // -------------------------------------------------------------------------
@Test
- public void testAssignWithMultipleParallelism() {
- HoodieSplitBucketAssigner assigner = new HoodieSplitBucketAssigner(3);
-
- HoodieSourceSplit split0 = createTestSplitWithBucket(0, 0);
- HoodieSourceSplit split1 = createTestSplitWithBucket(1, 1);
- HoodieSourceSplit split2 = createTestSplitWithBucket(2, 2);
- HoodieSourceSplit split3 = createTestSplitWithBucket(3, 3);
- HoodieSourceSplit split4 = createTestSplitWithBucket(4, 4);
- HoodieSourceSplit split5 = createTestSplitWithBucket(5, 5);
-
- assertEquals(0, assigner.assign(split0), "Bucket 0 should be assigned to
task 0");
- assertEquals(1, assigner.assign(split1), "Bucket 1 should be assigned to
task 1");
- assertEquals(2, assigner.assign(split2), "Bucket 2 should be assigned to
task 2");
- assertEquals(0, assigner.assign(split3), "Bucket 3 should be assigned to
task 0 (round-robin)");
- assertEquals(1, assigner.assign(split4), "Bucket 4 should be assigned to
task 1 (round-robin)");
- assertEquals(2, assigner.assign(split5), "Bucket 5 should be assigned to
task 2 (round-robin)");
+ public void testConstructorWithZeroParallelismThrows() {
+ IllegalArgumentException ex = assertThrows(
+ IllegalArgumentException.class,
+ () -> new HoodieSplitBucketAssigner(0, new Configuration()));
+ assertEquals("Parallelism must be positive, but was: 0", ex.getMessage());
}
@Test
- public void testAssignBucketBasedCoLocation() {
- int parallelism = 4;
- HoodieSplitBucketAssigner assigner = new
HoodieSplitBucketAssigner(parallelism);
-
- // Splits with the same bucket ID should always be assigned to the same
task
- HoodieSourceSplit split1a = createTestSplitWithBucket(0, 5);
- HoodieSourceSplit split1b = createTestSplitWithBucket(1, 5);
- HoodieSourceSplit split1c = createTestSplitWithBucket(2, 5);
-
- int taskId = assigner.assign(split1a);
- assertEquals(taskId, assigner.assign(split1b),
- "Splits with same bucket should be co-located on same task");
- assertEquals(taskId, assigner.assign(split1c),
- "Splits with same bucket should be co-located on same task");
- assertEquals(1, taskId, "Bucket 5 % 4 = 1");
+ public void testConstructorWithNegativeParallelismThrows() {
+ IllegalArgumentException ex = assertThrows(
+ IllegalArgumentException.class,
+ () -> new HoodieSplitBucketAssigner(-1, new Configuration()));
+ assertEquals("Parallelism must be positive, but was: -1", ex.getMessage());
}
@Test
- public void testAssignDifferentBucketsDistributed() {
- HoodieSplitBucketAssigner assigner = new HoodieSplitBucketAssigner(5);
-
- // Different buckets should be distributed across different tasks
- HoodieSourceSplit split0 = createTestSplitWithBucket(0, 0);
- HoodieSourceSplit split1 = createTestSplitWithBucket(1, 1);
- HoodieSourceSplit split2 = createTestSplitWithBucket(2, 2);
-
- assertNotEquals(assigner.assign(split0), assigner.assign(split1),
- "Different buckets should typically be on different tasks");
- assertNotEquals(assigner.assign(split1), assigner.assign(split2),
- "Different buckets should typically be on different tasks");
+ public void testConstructorWithLargeNegativeParallelismThrows() {
+ IllegalArgumentException ex = assertThrows(
+ IllegalArgumentException.class,
+ () -> new HoodieSplitBucketAssigner(-100, new Configuration()));
+ assertEquals("Parallelism must be positive, but was: -100",
ex.getMessage());
}
- @Test
- public void testAssignWithHighParallelism() {
- HoodieSplitBucketAssigner assigner = new HoodieSplitBucketAssigner(100);
-
- HoodieSourceSplit split0 = createTestSplitWithBucket(0, 0);
- HoodieSourceSplit split50 = createTestSplitWithBucket(1, 50);
- HoodieSourceSplit split99 = createTestSplitWithBucket(2, 99);
- HoodieSourceSplit split100 = createTestSplitWithBucket(3, 100);
-
- assertEquals(0, assigner.assign(split0));
- assertEquals(50, assigner.assign(split50));
- assertEquals(99, assigner.assign(split99));
- assertEquals(0, assigner.assign(split100), "Bucket 100 should wrap around
to task 0");
- }
+ // -------------------------------------------------------------------------
+ // Single parallelism — everything maps to task 0
+ // -------------------------------------------------------------------------
@Test
- public void testAssignSameBucketMultipleTimes() {
- HoodieSplitBucketAssigner assigner = new HoodieSplitBucketAssigner(5);
-
- HoodieSourceSplit split1 = createTestSplitWithBucket(0, 7);
- HoodieSourceSplit split2 = createTestSplitWithBucket(1, 7);
-
- // Assigning splits with the same bucket multiple times should return the
same task ID
- int taskId = assigner.assign(split1);
- assertEquals(2, taskId, "Bucket 7 % 5 = 2");
- assertEquals(taskId, assigner.assign(split2),
- "Same bucket should always return same task ID");
- assertEquals(taskId, assigner.assign(split1),
- "Same bucket should always return same task ID");
- }
+ public void testSingleParallelismAlwaysAssignsToTaskZero() {
+ HoodieSplitBucketAssigner assigner = new HoodieSplitBucketAssigner(1, new
Configuration());
- @Test
- public void testConstructorWithZeroParallelism() {
- IllegalArgumentException exception = assertThrows(
- IllegalArgumentException.class,
- () -> new HoodieSplitBucketAssigner(0),
- "Should throw exception for zero parallelism"
- );
- assertEquals("Parallelism must be positive, but was: 0",
exception.getMessage());
+ for (int bucket = 0; bucket < 10; bucket++) {
+ assertEquals(0, assigner.assign(splitWithBucket(bucket, "")),
+ "All splits must go to task 0 when parallelism is 1");
+ }
}
- @Test
- public void testConstructorWithNegativeParallelism() {
- IllegalArgumentException exception = assertThrows(
- IllegalArgumentException.class,
- () -> new HoodieSplitBucketAssigner(-1),
- "Should throw exception for negative parallelism"
- );
- assertEquals("Parallelism must be positive, but was: -1",
exception.getMessage());
- }
+ // -------------------------------------------------------------------------
+ // Empty partition path: taskId = curBucket % parallelism
+ // When partition hashCode is 0, partitionIndex = 0, so the formula
+ // reduces to a simple modulo on the bucket ID.
+ // -------------------------------------------------------------------------
@Test
- public void testConstructorWithNegativeLargeParallelism() {
- IllegalArgumentException exception = assertThrows(
- IllegalArgumentException.class,
- () -> new HoodieSplitBucketAssigner(-100),
- "Should throw exception for large negative parallelism"
- );
- assertEquals("Parallelism must be positive, but was: -100",
exception.getMessage());
+ public void testAssignWithEmptyPartitionUsesSimpleModulo() {
+ int parallelism = 5;
+ HoodieSplitBucketAssigner assigner = new
HoodieSplitBucketAssigner(parallelism, new Configuration());
+
+ for (int bucket = 0; bucket < 20; bucket++) {
+ int expected = bucket % parallelism;
+ assertEquals(expected, assigner.assign(splitWithBucket(bucket, "")),
+ "With empty partition, taskId must equal curBucket % parallelism");
+ }
}
@Test
- public void testAssignmentConsistency() {
- HoodieSplitBucketAssigner assigner1 = new HoodieSplitBucketAssigner(5);
- HoodieSplitBucketAssigner assigner2 = new HoodieSplitBucketAssigner(5);
-
- // Two assigners with same parallelism should assign splits identically
- for (int i = 0; i < 20; i++) {
- HoodieSourceSplit split = createTestSplitWithBucket(i, i);
- assertEquals(
- assigner1.assign(split),
- assigner2.assign(split),
- "Different assigner instances should assign same bucket to same task"
- );
+ public void testTaskIdIsAlwaysInValidRange() {
+ int parallelism = 7;
+ HoodieSplitBucketAssigner assigner = new
HoodieSplitBucketAssigner(parallelism, new Configuration());
+
+ for (int bucket = 0; bucket < 50; bucket++) {
+ int taskId = assigner.assign(splitWithBucket(bucket, ""));
+ assertTrue(taskId >= 0 && taskId < parallelism,
+ "taskId must be in [0, parallelism): got " + taskId);
}
}
+ // -------------------------------------------------------------------------
+ // Co-location: same (partition, bucket) always lands on the same task
+ // -------------------------------------------------------------------------
+
@Test
- public void testBucketCoLocationAcrossMultipleSplits() {
- HoodieSplitBucketAssigner assigner = new HoodieSplitBucketAssigner(8);
-
- // Create multiple splits for the same bucket
- int bucketId = 15;
- HoodieSourceSplit[] splits = new HoodieSourceSplit[10];
- for (int i = 0; i < 10; i++) {
- splits[i] = createTestSplitWithBucket(i, bucketId);
+ public void testSameBucketSamePartitionAlwaysCoLocated() {
+ int parallelism = 4;
+ HoodieSplitBucketAssigner assigner = new
HoodieSplitBucketAssigner(parallelism, new Configuration());
+
+ int[] bucketsToTest = {0, 3, 7, 15};
+ String[] partitions = {"", "2024/01/01", "country=US"};
+
+ for (String partition : partitions) {
+ for (int bucket : bucketsToTest) {
+ int first = assigner.assign(splitWithBucket(bucket, partition));
+ for (int i = 0; i < 5; i++) {
+ assertEquals(first, assigner.assign(splitWithBucket(bucket,
partition)),
+ "Same bucket+partition must always map to the same task");
+ }
+ }
}
+ }
- // All splits with the same bucket should be assigned to the same task
- int expectedTaskId = bucketId % 8;
- for (HoodieSourceSplit split : splits) {
- assertEquals(expectedTaskId, assigner.assign(split),
- "All splits for bucket " + bucketId + " should be on task " +
expectedTaskId);
- }
+ @Test
+ public void testCoLocationHoldsAcrossDifferentSplitNumbers() {
+ HoodieSplitBucketAssigner assigner = new HoodieSplitBucketAssigner(6, new
Configuration());
+ int bucket = 11;
+ String partition = "region=EU";
+
+ // Different split numbers, same bucket and partition
+ int expected = assigner.assign(split(0, bucket, partition));
+ assertEquals(expected, assigner.assign(split(100, bucket, partition)));
+ assertEquals(expected, assigner.assign(split(999, bucket, partition)));
}
+ // -------------------------------------------------------------------------
+ // Custom numBuckets via config
+ // -------------------------------------------------------------------------
+
@Test
- public void testManyBucketsDistribution() {
- int parallelism = 4;
- HoodieSplitBucketAssigner assigner = new
HoodieSplitBucketAssigner(parallelism);
+ public void testCustomNumBucketsIsUsed() {
+ // With a non-trivial partition hash, changing numBuckets changes the
assignment.
+ // Partition "p1" has hashCode 3521.
+ // parallelism=3, numBuckets=4:
+ // partitionIndex = (3521 % 3) * 4 = 2 * 4 = 8
+ // taskId(bucket=0) = (8 + 0) % 3 = 2
+ // taskId(bucket=1) = (8 + 1) % 3 = 0
+ // taskId(bucket=2) = (8 + 2) % 3 = 1
+ int parallelism = 3;
+ int numBuckets = 4;
+ String partition = "p1"; // hashCode = 112*31 + 49 = 3521
+
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, numBuckets);
+ HoodieSplitBucketAssigner assigner = new
HoodieSplitBucketAssigner(parallelism, conf);
+
+ assertEquals(expectedTaskId(partition, 0, numBuckets, parallelism),
assigner.assign(split(0, 0, partition)));
+ assertEquals(expectedTaskId(partition, 1, numBuckets, parallelism),
assigner.assign(split(1, 1, partition)));
+ assertEquals(expectedTaskId(partition, 2, numBuckets, parallelism),
assigner.assign(split(2, 2, partition)));
+ }
- // Test distribution of many buckets
- int[] taskCounts = new int[parallelism];
- int totalBuckets = 100;
+ @Test
+ public void testDifferentNumBucketsProduceDifferentAssignments() {
+ // Use a partition with non-zero hash so numBuckets affects the result.
+ String partition = "country=DE"; // non-zero hashCode guaranteed
+ int parallelism = 5;
+ int bucket = 3;
- for (int bucketId = 0; bucketId < totalBuckets; bucketId++) {
- HoodieSourceSplit split = createTestSplitWithBucket(bucketId, bucketId);
- int taskId = assigner.assign(split);
- taskCounts[taskId]++;
+ Configuration conf4 = new Configuration();
+ conf4.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
+ Configuration conf8 = new Configuration();
+ conf8.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 8);
- // Verify task ID is within valid range
- assertEquals(bucketId % parallelism, taskId,
- "Bucket should be assigned using modulo operation");
- }
+ int taskWith4 = new HoodieSplitBucketAssigner(parallelism,
conf4).assign(split(0, bucket, partition));
+ int taskWith8 = new HoodieSplitBucketAssigner(parallelism,
conf8).assign(split(0, bucket, partition));
- // Verify even distribution
- for (int count : taskCounts) {
- assertEquals(totalBuckets / parallelism, count,
- "Buckets should be evenly distributed");
- }
+ // Both must be valid task IDs, and at least verify they're computed from
the right formula
+ assertEquals(expectedTaskId(partition, bucket, 4, parallelism), taskWith4);
+ assertEquals(expectedTaskId(partition, bucket, 8, parallelism), taskWith8);
}
+ // -------------------------------------------------------------------------
+ // Full formula verification with non-trivial partition hash
+ // -------------------------------------------------------------------------
+
@Test
- public void testFileIdParsing() {
- HoodieSplitBucketAssigner assigner = new HoodieSplitBucketAssigner(10);
+ public void testFullFormulaWithNonTrivialPartitionHash() {
+ int parallelism = 4;
+ int numBuckets = 4;
+ String partition = "dt=2024-01-15";
- // Test with various file ID formats that have bucket ID in first 8
characters
- HoodieSourceSplit split1 = createTestSplit(0,
"00000005-0000-0000-0000-000000000000");
- HoodieSourceSplit split2 = createTestSplit(1,
"00000012-1234-5678-abcd-ef0123456789");
- HoodieSourceSplit split3 = createTestSplit(2,
"00000099-aaaa-bbbb-cccc-dddddddddddd");
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, numBuckets);
+ HoodieSplitBucketAssigner assigner = new
HoodieSplitBucketAssigner(parallelism, conf);
- assertEquals(5, assigner.assign(split1), "Should extract bucket 5 from
file ID");
- assertEquals(2, assigner.assign(split2), "Should extract bucket 12 and
assign to task 2 (12 % 10)");
- assertEquals(9, assigner.assign(split3), "Should extract bucket 99 and
assign to task 9 (99 % 10)");
+ for (int bucket = 0; bucket < numBuckets; bucket++) {
+ int expected = expectedTaskId(partition, bucket, numBuckets,
parallelism);
+ assertEquals(expected, assigner.assign(split(bucket, bucket, partition)),
+ "Task ID must match the formula for bucket " + bucket);
+ }
}
+ // -------------------------------------------------------------------------
+ // Bucket ID parsing from file ID
+ // -------------------------------------------------------------------------
+
@Test
- public void testDifferentSplitNumbersSameBucket() {
- HoodieSplitBucketAssigner assigner = new HoodieSplitBucketAssigner(5);
-
- // Different split numbers but same bucket should go to same task
- HoodieSourceSplit split1 = createTestSplitWithBucket(0, 8);
- HoodieSourceSplit split2 = createTestSplitWithBucket(100, 8);
- HoodieSourceSplit split3 = createTestSplitWithBucket(999, 8);
-
- int taskId = assigner.assign(split1);
- assertEquals(taskId, assigner.assign(split2),
- "Split number should not affect bucket-based assignment");
- assertEquals(taskId, assigner.assign(split3),
- "Split number should not affect bucket-based assignment");
- assertEquals(3, taskId, "Bucket 8 % 5 = 3");
+ public void testBucketIdParsedFromFirst8CharsOfFileId() {
+ // bucketIdFromFileId parses the first 8 characters as an integer
+ int parallelism = 10;
+ HoodieSplitBucketAssigner assigner = new
HoodieSplitBucketAssigner(parallelism, new Configuration());
+
+ // File ID "00000005-..." → bucket 5 → task 5 % 10 = 5 (empty partition)
+ HoodieSourceSplit split5 = createTestSplit(0,
"00000005-0000-0000-0000-000000000000", "");
+ assertEquals(5, assigner.assign(split5));
+
+ // File ID "00000012-..." → bucket 12 → task 12 % 10 = 2
+ HoodieSourceSplit split12 = createTestSplit(1,
"00000012-1234-5678-abcd-ef0123456789", "");
+ assertEquals(2, assigner.assign(split12));
+
+ // File ID "00000099-..." → bucket 99 → task 99 % 10 = 9
+ HoodieSourceSplit split99 = createTestSplit(2,
"00000099-aaaa-bbbb-cccc-dddddddddddd", "");
+ assertEquals(9, assigner.assign(split99));
}
- @Test
- public void testComparisonWithDefaultAssigner() {
- int parallelism = 5;
- HoodieSplitBucketAssigner bucketAssigner = new
HoodieSplitBucketAssigner(parallelism);
- HoodieSplitNumberAssigner defaultAssigner = new
HoodieSplitNumberAssigner(parallelism);
+ // -------------------------------------------------------------------------
+ // Consistency between assigner instances
+ // -------------------------------------------------------------------------
- // Create splits where bucket ID != split number
- HoodieSourceSplit split1 = createTestSplitWithBucket(0, 3);
- HoodieSourceSplit split2 = createTestSplitWithBucket(3, 0);
+ @Test
+ public void testTwoAssignersWithSameConfigProduceSameResults() {
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 8);
- // Bucket assigner uses bucket ID from file ID
- assertEquals(3, bucketAssigner.assign(split1), "Should use bucket 3");
- assertEquals(0, bucketAssigner.assign(split2), "Should use bucket 0");
+ HoodieSplitBucketAssigner a1 = new HoodieSplitBucketAssigner(5, conf);
+ HoodieSplitBucketAssigner a2 = new HoodieSplitBucketAssigner(5, conf);
- // Default assigner uses split number
- assertEquals(0, defaultAssigner.assign(split1), "Should use split number
0");
- assertEquals(3, defaultAssigner.assign(split2), "Should use split number
3");
+ for (int bucket = 0; bucket < 30; bucket++) {
+ HoodieSourceSplit split = splitWithBucket(bucket, "region=APAC");
+ assertEquals(a1.assign(split), a2.assign(split),
+ "Two assigners with same config must produce identical results");
+ }
}
+ // -------------------------------------------------------------------------
+ // Helpers
+ // -------------------------------------------------------------------------
+
/**
- * Creates a test split with a specific bucket ID encoded in the file ID.
- * File ID format: "{8-digit bucket ID}-{rest of UUID}"
+ * Computes the expected task ID using the same formula as {@link
HoodieSplitBucketAssigner}:
+ * <pre>
+ * partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) %
parallelism * numBuckets
+ * taskId = (partitionIndex + curBucket) % parallelism
+ * </pre>
*/
- private HoodieSourceSplit createTestSplitWithBucket(int splitNum, int
bucketId) {
+ private static int expectedTaskId(String partition, int curBucket, int
numBuckets, int parallelism) {
+ long partitionIndex = (long) ((partition.hashCode() & Integer.MAX_VALUE) %
parallelism) * numBuckets;
+ return (int) ((partitionIndex + curBucket) % parallelism);
+ }
+
+ /** Creates a split whose file ID encodes {@code bucketId} in the first 8
characters. */
+ private static HoodieSourceSplit splitWithBucket(int bucketId, String
partitionPath) {
+ return split(bucketId, bucketId, partitionPath);
+ }
+
+ /** Creates a split with an explicit split number, bucket ID, and partition
path. */
+ private static HoodieSourceSplit split(int splitNum, int bucketId, String
partitionPath) {
String fileId = String.format("%08d-0000-0000-0000-000000000000",
bucketId);
- return createTestSplit(splitNum, fileId);
+ return createTestSplit(splitNum, fileId, partitionPath);
}
- private HoodieSourceSplit createTestSplit(int splitNum, String fileId) {
+ private static HoodieSourceSplit createTestSplit(int splitNum, String
fileId, String partitionPath) {
return new HoodieSourceSplit(
splitNum,
"basePath_" + splitNum,
Option.empty(),
"/table/path",
- "/table/path/partition1",
+ partitionPath,
"read_optimized",
"19700101000000000",
fileId,
- Option.empty()
- );
+ Option.empty());
}
}