This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new cf22950c16 [core] Support upper bound in dynamic bucket mode (#4974)
cf22950c16 is described below
commit cf22950c1670b2167994c6d50193b09745a0ddc3
Author: Yubin Li <[email protected]>
AuthorDate: Wed Feb 19 14:49:19 2025 +0800
[core] Support upper bound in dynamic bucket mode (#4974)
---
.../content/primary-key-table/data-distribution.md | 1 +
.../shortcodes/generated/core_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 12 ++
.../apache/paimon/index/HashBucketAssigner.java | 11 +-
.../org/apache/paimon/index/PartitionIndex.java | 48 +++++---
.../paimon/index/SimpleHashBucketAssigner.java | 34 +++++-
.../paimon/table/sink/KeyAndBucketExtractor.java | 21 ++++
.../paimon/index/HashBucketAssignerTest.java | 136 ++++++++++++++++++++-
.../paimon/index/SimpleHashBucketAssignerTest.java | 79 +++++++++++-
.../flink/sink/HashBucketAssignerOperator.java | 12 +-
.../paimon/spark/commands/BucketProcessor.scala | 5 +-
.../paimon/spark/commands/PaimonSparkWriter.scala | 6 +-
12 files changed, 330 insertions(+), 41 deletions(-)
diff --git a/docs/content/primary-key-table/data-distribution.md
b/docs/content/primary-key-table/data-distribution.md
index 3a066031d4..baf3327ed9 100644
--- a/docs/content/primary-key-table/data-distribution.md
+++ b/docs/content/primary-key-table/data-distribution.md
@@ -48,6 +48,7 @@ Paimon will automatically expand the number of buckets.
- Option1: `'dynamic-bucket.target-row-num'`: controls the target row number
for one bucket.
- Option2: `'dynamic-bucket.initial-buckets'`: controls the number of
initialized bucket.
+- Option3: `'dynamic-bucket.max-buckets'`: controls the number of max buckets.
{{< hint info >}}
Dynamic Bucket only support single write job. Please do not start multiple
jobs to write to the same partition
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index d650670bf8..e3e1c0f673 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -290,6 +290,12 @@ under the License.
<td>Integer</td>
<td>Initial buckets for a partition in assigner operator for
dynamic bucket mode.</td>
</tr>
+ <tr>
+ <td><h5>dynamic-bucket.max-buckets</h5></td>
+ <td style="word-wrap: break-word;">-1</td>
+ <td>Integer</td>
+ <td>Max buckets for a partition in dynamic bucket mode, It should
either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper
bound).</td>
+ </tr>
<tr>
<td><h5>dynamic-bucket.target-row-num</h5></td>
<td style="word-wrap: break-word;">2000000</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index ab84aed36f..380cf84744 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1083,6 +1083,14 @@ public class CoreOptions implements Serializable {
.withDescription(
"Initial buckets for a partition in assigner
operator for dynamic bucket mode.");
+ public static final ConfigOption<Integer> DYNAMIC_BUCKET_MAX_BUCKETS =
+ key("dynamic-bucket.max-buckets")
+ .intType()
+ .defaultValue(-1)
+ .withDescription(
+ "Max buckets for a partition in dynamic bucket
mode, It should "
+ + "either be equal to -1 (unlimited), or
it must be greater than 0 (fixed upper bound).");
+
public static final ConfigOption<Integer>
DYNAMIC_BUCKET_ASSIGNER_PARALLELISM =
key("dynamic-bucket.assigner-parallelism")
.intType()
@@ -2226,6 +2234,10 @@ public class CoreOptions implements Serializable {
return options.get(DYNAMIC_BUCKET_INITIAL_BUCKETS);
}
+ public Integer dynamicBucketMaxBuckets() {
+ return options.get(DYNAMIC_BUCKET_MAX_BUCKETS);
+ }
+
public Integer dynamicBucketAssignerParallelism() {
return options.get(DYNAMIC_BUCKET_ASSIGNER_PARALLELISM);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
index 60bade8177..d549f5443b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
@@ -45,6 +45,8 @@ public class HashBucketAssigner implements BucketAssigner {
private final int numAssigners;
private final int assignId;
private final long targetBucketRowNumber;
+ private final int maxBucketsNum;
+ private int maxBucketId;
private final Map<BinaryRow, PartitionIndex> partitionIndex;
@@ -55,7 +57,8 @@ public class HashBucketAssigner implements BucketAssigner {
int numChannels,
int numAssigners,
int assignId,
- long targetBucketRowNumber) {
+ long targetBucketRowNumber,
+ int maxBucketsNum) {
this.snapshotManager = snapshotManager;
this.commitUser = commitUser;
this.indexFileHandler = indexFileHandler;
@@ -64,6 +67,7 @@ public class HashBucketAssigner implements BucketAssigner {
this.assignId = assignId;
this.targetBucketRowNumber = targetBucketRowNumber;
this.partitionIndex = new HashMap<>();
+ this.maxBucketsNum = maxBucketsNum;
}
/** Assign a bucket for key hash of a record. */
@@ -84,11 +88,14 @@ public class HashBucketAssigner implements BucketAssigner {
this.partitionIndex.put(partition, index);
}
- int assigned = index.assign(hash, this::isMyBucket);
+ int assigned = index.assign(hash, this::isMyBucket, maxBucketsNum,
maxBucketId);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Assign " + assigned + " to the partition " + partition +
" key hash " + hash);
}
+ if (assigned > maxBucketId) {
+ maxBucketId = assigned;
+ }
return assigned;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
index bace2c1ac1..14c5a9fa74 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
@@ -20,6 +20,7 @@ package org.apache.paimon.index;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
import org.apache.paimon.utils.Int2ShortHashMap;
import org.apache.paimon.utils.IntIterator;
@@ -27,8 +28,8 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -57,13 +58,13 @@ public class PartitionIndex {
long targetBucketRowNumber) {
this.hash2Bucket = hash2Bucket;
this.nonFullBucketInformation = bucketInformation;
- this.totalBucket = new HashSet<>(bucketInformation.keySet());
+ this.totalBucket = new LinkedHashSet<>(bucketInformation.keySet());
this.targetBucketRowNumber = targetBucketRowNumber;
this.lastAccessedCommitIdentifier = Long.MIN_VALUE;
this.accessed = true;
}
- public int assign(int hash, IntPredicate bucketFilter) {
+ public int assign(int hash, IntPredicate bucketFilter, int maxBucketsNum,
int maxBucketId) {
accessed = true;
// 1. is it a key that has appeared before
@@ -80,29 +81,42 @@ public class PartitionIndex {
Long number = entry.getValue();
if (number < targetBucketRowNumber) {
entry.setValue(number + 1);
- hash2Bucket.put(hash, bucket.shortValue());
+ hash2Bucket.put(hash, (short) bucket.intValue());
return bucket;
} else {
iterator.remove();
}
}
- // 3. create a new bucket
- for (int i = 0; i < Short.MAX_VALUE; i++) {
- if (bucketFilter.test(i) && !totalBucket.contains(i)) {
- hash2Bucket.put(hash, (short) i);
- nonFullBucketInformation.put(i, 1L);
- totalBucket.add(i);
- return i;
+ if (-1 == maxBucketsNum || totalBucket.isEmpty() || maxBucketId <
maxBucketsNum - 1) {
+ // 3. create a new bucket
+ for (int i = 0; i < Short.MAX_VALUE; i++) {
+ if (bucketFilter.test(i) && !totalBucket.contains(i)) {
+ // The new bucketId may still be larger than the upper
bound
+ if (-1 == maxBucketsNum || i <= maxBucketsNum - 1) {
+ nonFullBucketInformation.put(i, 1L);
+ totalBucket.add(i);
+ hash2Bucket.put(hash, (short) i);
+ return i;
+ } else {
+ // No need to enter the next iteration when upper
bound exceeded
+ break;
+ }
+ }
+ }
+ if (-1 == maxBucketsNum) {
+ throw new RuntimeException(
+ String.format(
+ "Too more bucket %s, you should increase
target bucket row number %s.",
+ maxBucketId, targetBucketRowNumber));
}
}
- @SuppressWarnings("OptionalGetWithoutIsPresent")
- int maxBucket =
totalBucket.stream().mapToInt(Integer::intValue).max().getAsInt();
- throw new RuntimeException(
- String.format(
- "Too more bucket %s, you should increase target bucket
row number %s.",
- maxBucket, targetBucketRowNumber));
+ // 4. exceed buckets upper bound
+ int bucket =
+ KeyAndBucketExtractor.bucketWithUpperBound(totalBucket, hash,
totalBucket.size());
+ hash2Bucket.put(hash, (short) bucket);
+ return bucket;
}
public static PartitionIndex loadIndex(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
index 5f7599370d..4f49841f79 100644
---
a/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
@@ -20,9 +20,11 @@ package org.apache.paimon.index;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
import org.apache.paimon.utils.Int2ShortHashMap;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
@@ -32,14 +34,18 @@ public class SimpleHashBucketAssigner implements
BucketAssigner {
private final int numAssigners;
private final int assignId;
private final long targetBucketRowNumber;
+ private final int maxBucketsNum;
+ private int maxBucketId;
private final Map<BinaryRow, SimplePartitionIndex> partitionIndex;
- public SimpleHashBucketAssigner(int numAssigners, int assignId, long
targetBucketRowNumber) {
+ public SimpleHashBucketAssigner(
+ int numAssigners, int assignId, long targetBucketRowNumber, int
maxBucketsNum) {
this.numAssigners = numAssigners;
this.assignId = assignId;
this.targetBucketRowNumber = targetBucketRowNumber;
this.partitionIndex = new HashMap<>();
+ this.maxBucketsNum = maxBucketsNum;
}
@Override
@@ -50,7 +56,11 @@ public class SimpleHashBucketAssigner implements
BucketAssigner {
index = new SimplePartitionIndex();
this.partitionIndex.put(partition, index);
}
- return index.assign(hash);
+ int assigned = index.assign(hash);
+ if (assigned > maxBucketId) {
+ maxBucketId = assigned;
+ }
+ return assigned;
}
@Override
@@ -71,7 +81,7 @@ public class SimpleHashBucketAssigner implements
BucketAssigner {
private int currentBucket;
private SimplePartitionIndex() {
- bucketInformation = new HashMap<>();
+ bucketInformation = new LinkedHashMap<>();
loadNewBucket();
}
@@ -82,8 +92,17 @@ public class SimpleHashBucketAssigner implements
BucketAssigner {
}
Long num = bucketInformation.computeIfAbsent(currentBucket, i ->
0L);
+
if (num >= targetBucketRowNumber) {
- loadNewBucket();
+ if (-1 == maxBucketsNum
+ || bucketInformation.isEmpty()
+ || maxBucketId < maxBucketsNum - 1) {
+ loadNewBucket();
+ } else {
+ currentBucket =
+ KeyAndBucketExtractor.bucketWithUpperBound(
+ bucketInformation.keySet(), hash,
bucketInformation.size());
+ }
}
bucketInformation.compute(currentBucket, (i, l) -> l == null ? 1L
: l + 1);
hash2Bucket.put(hash, (short) currentBucket);
@@ -93,7 +112,12 @@ public class SimpleHashBucketAssigner implements
BucketAssigner {
private void loadNewBucket() {
for (int i = 0; i < Short.MAX_VALUE; i++) {
if (isMyBucket(i) && !bucketInformation.containsKey(i)) {
- currentBucket = i;
+ // The new bucketId may still be larger than the upper
bound
+ if (-1 == maxBucketsNum || i <= maxBucketsNum - 1) {
+ currentBucket = i;
+ return;
+ }
+ // No need to enter the next iteration when upper bound
exceeded
return;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
index 0b0b1a154b..dcbd02508d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
@@ -21,6 +21,13 @@ package org.apache.paimon.table.sink;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.types.RowKind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.CoreOptions.DYNAMIC_BUCKET_MAX_BUCKETS;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
@@ -31,6 +38,7 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
* @param <T> type of record
*/
public interface KeyAndBucketExtractor<T> {
+ Logger LOG = LoggerFactory.getLogger(KeyAndBucketExtractor.class);
void setRecord(T record);
@@ -51,4 +59,17 @@ public interface KeyAndBucketExtractor<T> {
checkArgument(numBuckets > 0, "Num bucket is illegal: " + numBuckets);
return Math.abs(hashcode % numBuckets);
}
+
+ static int bucketWithUpperBound(Set<Integer> bucketsSet, int hashcode, int
maxBucketsNum) {
+ checkArgument(maxBucketsNum > 0, "Num max-buckets is illegal: " +
maxBucketsNum);
+ LOG.debug(
+ "Assign record (hashcode '{}') to new bucket exceed upper
bound '{}' defined in '{}', Stop creating new buckets.",
+ hashcode,
+ maxBucketsNum,
+ DYNAMIC_BUCKET_MAX_BUCKETS.key());
+ return bucketsSet.stream()
+ .skip(ThreadLocalRandom.current().nextInt(maxBucketsNum))
+ .findFirst()
+ .orElse(0);
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
index 1b4a7b1be5..b9c6a28378 100644
---
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
@@ -27,9 +27,12 @@ import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.StreamTableCommit;
+import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.Collections;
@@ -63,7 +66,21 @@ public class HashBucketAssignerTest extends
PrimaryKeyTableTestBase {
numChannels,
numAssigners,
assignId,
- 5);
+ 5,
+ -1);
+ }
+
+ private HashBucketAssigner createAssigner(
+ int numChannels, int numAssigners, int assignId, int
maxBucketsNum) {
+ return new HashBucketAssigner(
+ table.snapshotManager(),
+ commitUser,
+ fileHandler,
+ numChannels,
+ numAssigners,
+ assignId,
+ 5,
+ maxBucketsNum);
}
@Test
@@ -92,8 +109,93 @@ public class HashBucketAssignerTest extends
PrimaryKeyTableTestBase {
}
@Test
- public void testPartitionCopy() {
- HashBucketAssigner assigner = createAssigner(1, 1, 0);
+ public void testAssignWithUpperBound() {
+ HashBucketAssigner assigner = createAssigner(2, 2, 0, 2);
+
+ // assign
+ assertThat(assigner.assign(row(1), 0)).isEqualTo(0);
+ assertThat(assigner.assign(row(1), 2)).isEqualTo(0);
+ assertThat(assigner.assign(row(1), 4)).isEqualTo(0);
+ assertThat(assigner.assign(row(1), 6)).isEqualTo(0);
+ assertThat(assigner.assign(row(1), 8)).isEqualTo(0);
+
+ // full
+ assertThat(assigner.assign(row(1), 10)).isEqualTo(0);
+ assertThat(assigner.assign(row(1), 12)).isEqualTo(0);
+ assertThat(assigner.assign(row(1), 14)).isEqualTo(0);
+ assertThat(assigner.assign(row(1), 16)).isEqualTo(0);
+ assertThat(assigner.assign(row(1), 18)).isEqualTo(0);
+
+ // another partition
+ assertThat(assigner.assign(row(2), 12)).isEqualTo(0);
+
+ // read assigned
+ assertThat(assigner.assign(row(1), 6)).isEqualTo(0);
+
+ // not mine
+ assertThatThrownBy(() -> assigner.assign(row(1), 1))
+ .hasMessageContaining("This is a bug, record assign id");
+
+ // exceed buckets upper bound
+ // partition 1
+ int hash = 18;
+ for (int i = 0; i < 200; i++) {
+ int bucket = assigner.assign(row(1), hash += 2);
+ Assertions.assertThat(bucket).isIn(0, 2);
+ }
+ // partition 2
+ hash = 12;
+ for (int i = 0; i < 200; i++) {
+ int bucket = assigner.assign(row(2), hash += 2);
+ Assertions.assertThat(bucket).isIn(0, 2);
+ }
+ }
+
+ @Test
+ public void testAssignWithUpperBoundMultiAssigners() {
+ HashBucketAssigner assigner0 = createAssigner(2, 2, 0, 3);
+ HashBucketAssigner assigner1 = createAssigner(2, 2, 1, 3);
+
+ // assigner0: assign
+ assertThat(assigner0.assign(row(1), 0)).isEqualTo(0);
+ assertThat(assigner0.assign(row(1), 2)).isEqualTo(0);
+ assertThat(assigner0.assign(row(1), 4)).isEqualTo(0);
+ assertThat(assigner0.assign(row(1), 6)).isEqualTo(0);
+ assertThat(assigner0.assign(row(1), 8)).isEqualTo(0);
+
+ // assigner0: full
+ assertThat(assigner0.assign(row(1), 10)).isEqualTo(2);
+ assertThat(assigner0.assign(row(1), 12)).isEqualTo(2);
+ assertThat(assigner0.assign(row(1), 14)).isEqualTo(2);
+ assertThat(assigner0.assign(row(1), 16)).isEqualTo(2);
+ assertThat(assigner0.assign(row(1), 18)).isEqualTo(2);
+
+ // assigner0: exceed buckets upper bound
+ int hash = 18;
+ for (int i = 0; i < 200; i++) {
+ int bucket = assigner0.assign(row(2), hash += 2);
+ Assertions.assertThat(bucket).isIn(0, 2);
+ }
+
+ // assigner1: assign
+ assertThat(assigner1.assign(row(1), 1)).isEqualTo(1);
+ assertThat(assigner1.assign(row(1), 3)).isEqualTo(1);
+ assertThat(assigner1.assign(row(1), 5)).isEqualTo(1);
+ assertThat(assigner1.assign(row(1), 7)).isEqualTo(1);
+ assertThat(assigner1.assign(row(1), 9)).isEqualTo(1);
+
+ // assigner1: exceed buckets upper bound
+ hash = 9;
+ for (int i = 0; i < 200; i++) {
+ int bucket = assigner1.assign(row(2), hash += 2);
+ Assertions.assertThat(bucket).isIn(1);
+ }
+ }
+
+ @ParameterizedTest(name = "maxBuckets: {0}")
+ @ValueSource(ints = {-1, 1, 2})
+ public void testPartitionCopy(int maxBucketsNum) {
+ HashBucketAssigner assigner = createAssigner(1, 1, 0, maxBucketsNum);
BinaryRow partition = row(1);
assertThat(assigner.assign(partition, 0)).isEqualTo(0);
@@ -144,6 +246,34 @@ public class HashBucketAssignerTest extends
PrimaryKeyTableTestBase {
assertThat(assigner0.assign(row(1), 17)).isEqualTo(3);
}
+ @Test
+ public void testAssignRestoreWithUpperBound() {
+ IndexFileMeta bucket0 = fileHandler.writeHashIndex(new int[] {2, 5});
+ IndexFileMeta bucket2 = fileHandler.writeHashIndex(new int[] {4, 7});
+ commit.commit(
+ 0,
+ Arrays.asList(
+ createCommitMessage(row(1), 0, bucket0),
+ createCommitMessage(row(1), 2, bucket2)));
+
+ HashBucketAssigner assigner0 = createAssigner(3, 3, 0, 1);
+ HashBucketAssigner assigner2 = createAssigner(3, 3, 2, 1);
+
+ // read assigned
+ assertThat(assigner0.assign(row(1), 2)).isEqualTo(0);
+ assertThat(assigner2.assign(row(1), 4)).isEqualTo(2);
+ assertThat(assigner0.assign(row(1), 5)).isEqualTo(0);
+ assertThat(assigner2.assign(row(1), 7)).isEqualTo(2);
+
+ // new assign
+ assertThat(assigner0.assign(row(1), 8)).isEqualTo(0);
+ assertThat(assigner0.assign(row(1), 11)).isEqualTo(0);
+ assertThat(assigner0.assign(row(1), 14)).isEqualTo(0);
+ assertThat(assigner2.assign(row(1), 16)).isEqualTo(2);
+ // exceed buckets upper bound
+ assertThat(assigner0.assign(row(1), 17)).isEqualTo(0);
+ }
+
@Test
public void testAssignDecoupled() {
HashBucketAssigner assigner1 = createAssigner(3, 2, 1);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java
b/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java
index d1b26019fc..2e2e53b7ef 100644
---
a/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java
@@ -22,6 +22,8 @@ import org.apache.paimon.data.BinaryRow;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
@@ -31,7 +33,8 @@ public class SimpleHashBucketAssignerTest {
@Test
public void testAssign() {
- SimpleHashBucketAssigner simpleHashBucketAssigner = new
SimpleHashBucketAssigner(2, 0, 100);
+ SimpleHashBucketAssigner simpleHashBucketAssigner =
+ new SimpleHashBucketAssigner(2, 0, 100, -1);
BinaryRow binaryRow = BinaryRow.EMPTY_ROW;
int hash = 0;
@@ -51,8 +54,71 @@ public class SimpleHashBucketAssignerTest {
}
@Test
- public void testAssignWithSameHash() {
- SimpleHashBucketAssigner simpleHashBucketAssigner = new
SimpleHashBucketAssigner(2, 0, 100);
+ public void testAssignWithUpperBound() {
+ SimpleHashBucketAssigner simpleHashBucketAssigner =
+ new SimpleHashBucketAssigner(2, 0, 100, 3);
+
+ BinaryRow binaryRow = BinaryRow.EMPTY_ROW;
+ int hash = 0;
+
+ for (int i = 0; i < 100; i++) {
+ int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
+ Assertions.assertThat(bucket).isEqualTo(0);
+ }
+
+ for (int i = 0; i < 100; i++) {
+ int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
+ Assertions.assertThat(bucket).isEqualTo(2);
+ }
+
+ // exceed upper bound
+ for (int i = 0; i < 200; i++) {
+ int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
+ Assertions.assertThat(bucket).isIn(0, 2);
+ }
+ }
+
+ @Test
+ public void testAssignWithUpperBoundMultiAssigners() {
+ SimpleHashBucketAssigner simpleHashBucketAssigner0 =
+ new SimpleHashBucketAssigner(2, 0, 100, 3);
+ SimpleHashBucketAssigner simpleHashBucketAssigner1 =
+ new SimpleHashBucketAssigner(2, 1, 100, 3);
+
+ BinaryRow binaryRow = BinaryRow.EMPTY_ROW;
+ int hash = 0;
+
+ for (int i = 0; i < 100; i++) {
+ int bucket = simpleHashBucketAssigner0.assign(binaryRow, hash++);
+ Assertions.assertThat(bucket).isEqualTo(0);
+ }
+
+ for (int i = 0; i < 100; i++) {
+ int bucket = simpleHashBucketAssigner1.assign(binaryRow, hash++);
+ Assertions.assertThat(bucket).isEqualTo(1);
+ }
+
+ for (int i = 0; i < 100; i++) {
+ int bucket = simpleHashBucketAssigner0.assign(binaryRow, hash++);
+ Assertions.assertThat(bucket).isEqualTo(2);
+ }
+
+ // exceed upper bound
+ for (int i = 0; i < 200; i++) {
+ int bucket = simpleHashBucketAssigner0.assign(binaryRow, hash++);
+ Assertions.assertThat(bucket).isIn(0, 2);
+ }
+ for (int i = 0; i < 200; i++) {
+ int bucket = simpleHashBucketAssigner1.assign(binaryRow, hash++);
+ Assertions.assertThat(bucket).isIn(1);
+ }
+ }
+
+ @ParameterizedTest(name = "maxBuckets: {0}")
+ @ValueSource(ints = {-1, 1, 2})
+ public void testAssignWithSameHash(int maxBucketsNum) {
+ SimpleHashBucketAssigner simpleHashBucketAssigner =
+ new SimpleHashBucketAssigner(2, 0, 100, maxBucketsNum);
BinaryRow binaryRow = BinaryRow.EMPTY_ROW;
int hash = 0;
@@ -70,9 +136,10 @@ public class SimpleHashBucketAssignerTest {
}
}
- @Test
- public void testPartitionCopy() {
- SimpleHashBucketAssigner assigner = new SimpleHashBucketAssigner(1, 0,
5);
+ @ParameterizedTest(name = "maxBuckets: {0}")
+ @ValueSource(ints = {-1, 1, 2})
+ public void testPartitionCopy(int maxBucketsNum) {
+ SimpleHashBucketAssigner assigner = new SimpleHashBucketAssigner(1, 0,
5, maxBucketsNum);
BinaryRow partition = row(1);
assertThat(assigner.assign(partition, 0)).isEqualTo(0);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
index 0c101c6d1e..5839fc98c2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
@@ -49,6 +49,7 @@ public class HashBucketAssignerOperator<T> extends
AbstractStreamOperator<Tuple2
private final Integer numAssigners;
private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
extractorFunction;
private final boolean overwrite;
+ private int[] maxBucketsArr;
private transient BucketAssigner assigner;
private transient PartitionKeyExtractor<T> extractor;
@@ -70,8 +71,8 @@ public class HashBucketAssignerOperator<T> extends
AbstractStreamOperator<Tuple2
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
- // Each job can only have one user name and this name must be
consistent across restarts.
- // We cannot use job id as commit user name here because user may
change job id by creating
+ // Each job can only have one username and this name must be
consistent across restarts.
+ // We cannot use job id as commit username here because user may
change job id by creating
// a savepoint, stop the job and then resume from savepoint.
String commitUser =
StateUtils.getSingleValueFromState(
@@ -80,9 +81,11 @@ public class HashBucketAssignerOperator<T> extends
AbstractStreamOperator<Tuple2
int numberTasks =
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
int taskId =
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum();
+ Integer maxBucketsNum = table.coreOptions().dynamicBucketMaxBuckets();
this.assigner =
overwrite
- ? new SimpleHashBucketAssigner(numberTasks, taskId,
targetRowNum)
+ ? new SimpleHashBucketAssigner(
+ numberTasks, taskId, targetRowNum,
maxBucketsNum)
: new HashBucketAssigner(
table.snapshotManager(),
commitUser,
@@ -90,7 +93,8 @@ public class HashBucketAssignerOperator<T> extends
AbstractStreamOperator<Tuple2
numberTasks,
MathUtils.min(numAssigners, numberTasks),
taskId,
- targetRowNum);
+ targetRowNum,
+ maxBucketsNum);
this.extractor = extractorFunction.apply(table.schema());
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala
index 57a8a8e4ab..19494fc88d 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala
@@ -21,7 +21,7 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.crosspartition.{GlobalIndexAssigner, KeyPartOrRow}
import org.apache.paimon.data.{BinaryRow, GenericRow, InternalRow =>
PaimonInternalRow, JoinedRow}
import org.apache.paimon.disk.IOManager
-import org.apache.paimon.index.HashBucketAssigner
+import org.apache.paimon.index.{HashBucketAssigner, PartitionIndex}
import org.apache.paimon.spark.{DataConverter, SparkRow}
import org.apache.paimon.spark.SparkUtils.createIOManager
import org.apache.paimon.spark.util.EncoderUtils
@@ -111,7 +111,8 @@ case class DynamicBucketProcessor(
numSparkPartitions,
numAssigners,
TaskContext.getPartitionId(),
- targetBucketRowNumber
+ targetBucketRowNumber,
+ fileStoreTable.coreOptions.dynamicBucketMaxBuckets
)
new Iterator[Row]() {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 7d56fe867a..061337b56f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -24,7 +24,7 @@ import org.apache.paimon.crosspartition.{IndexBootstrap,
KeyPartOrRow}
import org.apache.paimon.data.serializer.InternalSerializers
import org.apache.paimon.deletionvectors.DeletionVector
import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer
-import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner}
+import org.apache.paimon.index.{BucketAssigner, PartitionIndex,
SimpleHashBucketAssigner}
import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement}
import org.apache.paimon.manifest.{FileKind, IndexManifestEntry}
import org.apache.paimon.spark.{SparkRow, SparkTableWrite, SparkTypeUtils}
@@ -196,7 +196,9 @@ case class PaimonSparkWriter(table: FileStoreTable) {
new SimpleHashBucketAssigner(
numAssigners,
TaskContext.getPartitionId(),
- table.coreOptions.dynamicBucketTargetRowNum)
+ table.coreOptions.dynamicBucketTargetRowNum,
+ table.coreOptions.dynamicBucketMaxBuckets
+ )
row => {
val sparkRow = new SparkRow(rowType, row)
assigner.assign(