This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new a33915822 [client] Fix NPE thrown by MetadataUpdater when getting 
bucket count (#2054)
a33915822 is described below

commit a339158220a13b6df236b410a70b1ee6dc5fce23
Author: yunhong <[email protected]>
AuthorDate: Sun Nov 30 15:21:12 2025 +0800

    [client] Fix NPE thrown by MetadataUpdater when getting bucket count (#2054)
---
 .../fluss/client/write/RoundRobinBucketAssigner.java       |  7 ++++---
 .../apache/fluss/client/write/StickyBucketAssigner.java    |  6 ++++--
 .../java/org/apache/fluss/client/write/WriterClient.java   |  4 ++--
 .../apache/fluss/client/write/RecordAccumulatorTest.java   |  3 ++-
 .../fluss/client/write/StickyStaticBucketAssignerTest.java | 14 +++++++-------
 .../src/main/java/org/apache/fluss/cluster/Cluster.java    |  4 ----
 6 files changed, 19 insertions(+), 19 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/RoundRobinBucketAssigner.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/RoundRobinBucketAssigner.java
index 69beb3751..d87bb2669 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/RoundRobinBucketAssigner.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/RoundRobinBucketAssigner.java
@@ -31,10 +31,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 @Internal
 public class RoundRobinBucketAssigner extends DynamicBucketAssigner {
     private final PhysicalTablePath physicalTablePath;
+    private final int bucketNumber;
     private final AtomicInteger counter = new AtomicInteger(new 
Random().nextInt());
 
-    public RoundRobinBucketAssigner(PhysicalTablePath physicalTablePath) {
+    public RoundRobinBucketAssigner(PhysicalTablePath physicalTablePath, int 
bucketNumber) {
         this.physicalTablePath = physicalTablePath;
+        this.bucketNumber = bucketNumber;
     }
 
     @Override
@@ -47,8 +49,7 @@ public class RoundRobinBucketAssigner extends 
DynamicBucketAssigner {
             return bucketsForTable.get(bucket).getBucketId();
         } else {
             // no buckets are available, give a non-available bucket.
-            return MathUtils.toPositive(nextValue)
-                    % cluster.getBucketCount(physicalTablePath.getTablePath());
+            return MathUtils.toPositive(nextValue) % bucketNumber;
         }
     }
 
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/StickyBucketAssigner.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/StickyBucketAssigner.java
index 4d427127b..12dda3567 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/StickyBucketAssigner.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/StickyBucketAssigner.java
@@ -35,10 +35,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class StickyBucketAssigner extends DynamicBucketAssigner {
 
     private final PhysicalTablePath physicalTablePath;
+    private final int bucketNumber;
     private final AtomicInteger currentBucketId;
 
-    public StickyBucketAssigner(PhysicalTablePath physicalTablePath) {
+    public StickyBucketAssigner(PhysicalTablePath physicalTablePath, int 
bucketNumber) {
         this.physicalTablePath = physicalTablePath;
+        this.bucketNumber = bucketNumber;
         this.currentBucketId = new AtomicInteger(-1);
     }
 
@@ -73,7 +75,7 @@ public class StickyBucketAssigner extends 
DynamicBucketAssigner {
                     
cluster.getAvailableBucketsForPhysicalTablePath(physicalTablePath);
             if (availableBuckets.isEmpty()) {
                 int random = 
MathUtils.toPositive(ThreadLocalRandom.current().nextInt());
-                newBucket = random % 
cluster.getBucketCount(physicalTablePath.getTablePath());
+                newBucket = random % bucketNumber;
             } else if (availableBuckets.size() == 1) {
                 newBucket = availableBuckets.get(0).getBucketId();
             } else {
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
index 962369ef7..f61e6f544 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
@@ -354,9 +354,9 @@ public class WriterClient {
             ConfigOptions.NoKeyAssigner noKeyAssigner =
                     
conf.get(ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER);
             if (noKeyAssigner == ROUND_ROBIN) {
-                return new RoundRobinBucketAssigner(physicalTablePath);
+                return new RoundRobinBucketAssigner(physicalTablePath, 
bucketNumber);
             } else if (noKeyAssigner == STICKY) {
-                return new StickyBucketAssigner(physicalTablePath);
+                return new StickyBucketAssigner(physicalTablePath, 
bucketNumber);
             } else {
                 throw new IllegalArgumentException(
                         "Unsupported append only row bucket assigner: " + 
noKeyAssigner);
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
index c721e974c..5aeee2d10 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
@@ -343,7 +343,8 @@ class RecordAccumulatorTest {
         int batchSize = 100;
         IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});
 
-        StickyBucketAssigner bucketAssigner = new 
StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH);
+        StickyBucketAssigner bucketAssigner =
+                new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH, 3);
         RecordAccumulator accum =
                 createTestRecordAccumulator(
                         (int) Duration.ofMinutes(1).toMillis(),
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/StickyStaticBucketAssignerTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/StickyStaticBucketAssignerTest.java
index 467fb2af8..5ac05b0fb 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/StickyStaticBucketAssignerTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/StickyStaticBucketAssignerTest.java
@@ -65,7 +65,7 @@ class StickyStaticBucketAssignerTest {
         // init cluster.
         Cluster cluster = updateCluster(Arrays.asList(bucket1, bucket2, 
bucket3));
         StickyBucketAssigner stickyBucketAssigner =
-                new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH);
+                new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH, 3);
         int bucketId = stickyBucketAssigner.assignBucket(cluster);
         assertThat(bucketId >= 0 && bucketId < 3).isTrue();
 
@@ -94,7 +94,7 @@ class StickyStaticBucketAssignerTest {
         // init cluster.
         Cluster cluster = updateCluster(Arrays.asList(bucket1, bucket2, 
bucket3));
         StickyBucketAssigner stickyBucketAssigner =
-                new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH);
+                new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH, 3);
         int bucketId = stickyBucketAssigner.assignBucket(cluster);
         for (int i = 0; i < 3; i++) {
             if (i != bucketId) {
@@ -111,7 +111,7 @@ class StickyStaticBucketAssignerTest {
         // init cluster.
         Cluster cluster = updateCluster(Collections.singletonList(bucket1));
         StickyBucketAssigner stickyBucketAssigner =
-                new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH);
+                new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH, 3);
         int bucketId = stickyBucketAssigner.assignBucket(cluster);
 
         for (int i = 0; i < 100; i++) {
@@ -136,7 +136,7 @@ class StickyStaticBucketAssignerTest {
         Cluster cluster = updateCluster(allBuckets);
 
         // Assure we never choose bucket 1 for tp1 because it is unavailable.
-        StickyBucketAssigner stickyBucketAssigner = new 
StickyBucketAssigner(tp1);
+        StickyBucketAssigner stickyBucketAssigner = new 
StickyBucketAssigner(tp1, 3);
         int bucketForTp1 = stickyBucketAssigner.assignBucket(cluster);
         assertThat(bucketForTp1).isNotEqualTo(1);
         for (int i = 0; i < 100; i++) {
@@ -145,7 +145,7 @@ class StickyStaticBucketAssignerTest {
         }
 
         // Assure we always choose bucket 1 for tp2.
-        stickyBucketAssigner = new StickyBucketAssigner(tp2);
+        stickyBucketAssigner = new StickyBucketAssigner(tp2, 3);
         int bucketForTp2 = stickyBucketAssigner.assignBucket(cluster);
         assertThat(bucketForTp2).isEqualTo(1);
         for (int i = 0; i < 100; i++) {
@@ -154,7 +154,7 @@ class StickyStaticBucketAssignerTest {
         }
 
         // Assure that we can still choose one bucket even if there are no 
available buckets.
-        stickyBucketAssigner = new StickyBucketAssigner(tp3);
+        stickyBucketAssigner = new StickyBucketAssigner(tp3, 3);
         int bucketForTp3 = stickyBucketAssigner.assignBucket(cluster);
         assertThat(bucketForTp3).isIn(0, 1, 2);
         stickyBucketAssigner.onNewBatch(cluster, bucketForTp3);
@@ -165,7 +165,7 @@ class StickyStaticBucketAssignerTest {
     void testMultiThreadToCallOnNewBatch() {
         Cluster cluster = updateCluster(Arrays.asList(bucket1, bucket2, 
bucket3));
         StickyBucketAssigner stickyBucketAssigner =
-                new 
StickyBucketAssigner(PhysicalTablePath.of(DATA1_TABLE_PATH));
+                new 
StickyBucketAssigner(PhysicalTablePath.of(DATA1_TABLE_PATH), 3);
         int bucketId = stickyBucketAssigner.assignBucket(cluster);
         Queue<Integer> bucketIds = new ConcurrentLinkedQueue<>();
         Thread[] threads = new Thread[100];
diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java 
b/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
index ccd73e22c..1cec4540d 100644
--- a/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
+++ b/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
@@ -173,10 +173,6 @@ public final class Cluster {
                                                 + " in cluster"));
     }
 
-    public int getBucketCount(TablePath tablePath) {
-        return tableInfoByPath.get(tablePath).getNumBuckets();
-    }
-
     /** Get the bucket location for this table-bucket. */
     public Optional<BucketLocation> getBucketLocation(TableBucket tableBucket) 
{
         return Optional.ofNullable(availableLocationByBucket.get(tableBucket));

Reply via email to