This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new cb25653f10 [core] Adjust 'dynamic-bucket.max-buckets' random pick 
logical
cb25653f10 is described below

commit cb25653f10c546eaa71c9fbafa6c0230832ae82c
Author: JingsongLi <[email protected]>
AuthorDate: Wed Feb 19 15:09:08 2025 +0800

    [core] Adjust 'dynamic-bucket.max-buckets' random pick logical
---
 .../java/org/apache/paimon/utils/ListUtils.java    | 34 ++++++++++++++++++++++
 .../apache/paimon/index/HashBucketAssigner.java    |  3 +-
 .../org/apache/paimon/index/PartitionIndex.java    | 19 +++++++-----
 .../paimon/index/SimpleHashBucketAssigner.java     | 18 ++++++++----
 .../paimon/table/sink/KeyAndBucketExtractor.java   | 17 -----------
 .../flink/sink/HashBucketAssignerOperator.java     |  1 -
 6 files changed, 59 insertions(+), 33 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java
new file mode 100644
index 0000000000..6919ac15f4
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Utils for {@link List}. */
+public class ListUtils {
+
+    public static <T> T pickRandomly(List<T> list) {
+        checkArgument(!list.isEmpty(), "list is empty");
+        int index = ThreadLocalRandom.current().nextInt(list.size());
+        return list.get(index);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java 
b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
index d549f5443b..ab3d125156 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
@@ -90,8 +90,7 @@ public class HashBucketAssigner implements BucketAssigner {
 
         int assigned = index.assign(hash, this::isMyBucket, maxBucketsNum, 
maxBucketId);
         if (LOG.isDebugEnabled()) {
-            LOG.debug(
-                    "Assign " + assigned + " to the partition " + partition + 
" key hash " + hash);
+            LOG.debug("Assign {} to the partition {} key hash {}", assigned, 
partition, hash);
         }
         if (assigned > maxBucketId) {
             maxBucketId = assigned;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java 
b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
index 14c5a9fa74..decc1f12f1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
@@ -20,13 +20,14 @@ package org.apache.paimon.index;
 
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.manifest.IndexManifestEntry;
-import org.apache.paimon.table.sink.KeyAndBucketExtractor;
 import org.apache.paimon.utils.Int2ShortHashMap;
 import org.apache.paimon.utils.IntIterator;
+import org.apache.paimon.utils.ListUtils;
 
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
@@ -44,7 +45,8 @@ public class PartitionIndex {
 
     public final Map<Integer, Long> nonFullBucketInformation;
 
-    public final Set<Integer> totalBucket;
+    public final Set<Integer> totalBucketSet;
+    public final List<Integer> totalBucketArray;
 
     private final long targetBucketRowNumber;
 
@@ -58,7 +60,8 @@ public class PartitionIndex {
             long targetBucketRowNumber) {
         this.hash2Bucket = hash2Bucket;
         this.nonFullBucketInformation = bucketInformation;
-        this.totalBucket = new LinkedHashSet<>(bucketInformation.keySet());
+        this.totalBucketSet = new LinkedHashSet<>(bucketInformation.keySet());
+        this.totalBucketArray = new ArrayList<>(totalBucketSet);
         this.targetBucketRowNumber = targetBucketRowNumber;
         this.lastAccessedCommitIdentifier = Long.MIN_VALUE;
         this.accessed = true;
@@ -88,14 +91,15 @@ public class PartitionIndex {
             }
         }
 
-        if (-1 == maxBucketsNum || totalBucket.isEmpty() || maxBucketId < 
maxBucketsNum - 1) {
+        if (-1 == maxBucketsNum || totalBucketSet.isEmpty() || maxBucketId < 
maxBucketsNum - 1) {
             // 3. create a new bucket
             for (int i = 0; i < Short.MAX_VALUE; i++) {
-                if (bucketFilter.test(i) && !totalBucket.contains(i)) {
+                if (bucketFilter.test(i) && !totalBucketSet.contains(i)) {
                     // The new bucketId may still be larger than the upper 
bound
                     if (-1 == maxBucketsNum || i <= maxBucketsNum - 1) {
                         nonFullBucketInformation.put(i, 1L);
-                        totalBucket.add(i);
+                        totalBucketSet.add(i);
+                        totalBucketArray.add(i);
                         hash2Bucket.put(hash, (short) i);
                         return i;
                     } else {
@@ -113,8 +117,7 @@ public class PartitionIndex {
         }
 
         // 4. exceed buckets upper bound
-        int bucket =
-                KeyAndBucketExtractor.bucketWithUpperBound(totalBucket, hash, 
totalBucket.size());
+        int bucket = ListUtils.pickRandomly(totalBucketArray);
         hash2Bucket.put(hash, (short) bucket);
         return bucket;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
index 4f49841f79..e5249bb0a1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
@@ -20,11 +20,13 @@ package org.apache.paimon.index;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.table.sink.KeyAndBucketExtractor;
 import org.apache.paimon.utils.Int2ShortHashMap;
+import org.apache.paimon.utils.ListUtils;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -78,10 +80,12 @@ public class SimpleHashBucketAssigner implements 
BucketAssigner {
 
         public final Int2ShortHashMap hash2Bucket = new Int2ShortHashMap();
         private final Map<Integer, Long> bucketInformation;
+        private final List<Integer> bucketList;
         private int currentBucket;
 
         private SimplePartitionIndex() {
             bucketInformation = new LinkedHashMap<>();
+            bucketList = new ArrayList<>();
             loadNewBucket();
         }
 
@@ -91,7 +95,13 @@ public class SimpleHashBucketAssigner implements 
BucketAssigner {
                 return hash2Bucket.get(hash);
             }
 
-            Long num = bucketInformation.computeIfAbsent(currentBucket, i -> 
0L);
+            Long num =
+                    bucketInformation.computeIfAbsent(
+                            currentBucket,
+                            bucket -> {
+                                bucketList.add(bucket);
+                                return 0L;
+                            });
 
             if (num >= targetBucketRowNumber) {
                 if (-1 == maxBucketsNum
@@ -99,9 +109,7 @@ public class SimpleHashBucketAssigner implements 
BucketAssigner {
                         || maxBucketId < maxBucketsNum - 1) {
                     loadNewBucket();
                 } else {
-                    currentBucket =
-                            KeyAndBucketExtractor.bucketWithUpperBound(
-                                    bucketInformation.keySet(), hash, 
bucketInformation.size());
+                    currentBucket = ListUtils.pickRandomly(bucketList);
                 }
             }
             bucketInformation.compute(currentBucket, (i, l) -> l == null ? 1L 
: l + 1);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
index dcbd02508d..283aa773ee 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
@@ -24,10 +24,6 @@ import org.apache.paimon.types.RowKind;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Set;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.apache.paimon.CoreOptions.DYNAMIC_BUCKET_MAX_BUCKETS;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
@@ -59,17 +55,4 @@ public interface KeyAndBucketExtractor<T> {
         checkArgument(numBuckets > 0, "Num bucket is illegal: " + numBuckets);
         return Math.abs(hashcode % numBuckets);
     }
-
-    static int bucketWithUpperBound(Set<Integer> bucketsSet, int hashcode, int 
maxBucketsNum) {
-        checkArgument(maxBucketsNum > 0, "Num max-buckets is illegal: " + 
maxBucketsNum);
-        LOG.debug(
-                "Assign record (hashcode '{}') to new bucket exceed upper 
bound '{}' defined in '{}', Stop creating new buckets.",
-                hashcode,
-                maxBucketsNum,
-                DYNAMIC_BUCKET_MAX_BUCKETS.key());
-        return bucketsSet.stream()
-                .skip(ThreadLocalRandom.current().nextInt(maxBucketsNum))
-                .findFirst()
-                .orElse(0);
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
index 5839fc98c2..5c8c1dfe4e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
@@ -49,7 +49,6 @@ public class HashBucketAssignerOperator<T> extends 
AbstractStreamOperator<Tuple2
     private final Integer numAssigners;
     private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>> 
extractorFunction;
     private final boolean overwrite;
-    private int[] maxBucketsArr;
 
     private transient BucketAssigner assigner;
     private transient PartitionKeyExtractor<T> extractor;

Reply via email to