This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 05458a39a [common] Change Cluster#getRandomTabletServer() to really 
random retrieve tabletServer (#1719)
05458a39a is described below

commit 05458a39afd37b01b5110c4f6d6d87a28aa047fb
Author: yunhong <[email protected]>
AuthorDate: Sun Sep 21 21:38:57 2025 +0800

    [common] Change Cluster#getRandomTabletServer() to really random retrieve 
tabletServer (#1719)
---
 .../java/org/apache/fluss/cluster/Cluster.java     |  8 +++++-
 .../java/org/apache/fluss/cluster/ClusterTest.java | 29 +++++++++++++++++++---
 2 files changed, 32 insertions(+), 5 deletions(-)

diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java 
b/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
index c5528c8f2..9aef86261 100644
--- a/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
+++ b/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * An immutable representation of a subset of the server nodes, tables, and 
buckets and schemas in
@@ -209,7 +210,12 @@ public final class Cluster {
     public ServerNode getRandomTabletServer() {
         // TODO this method need to get one tablet server according to the 
load.
         List<ServerNode> serverNodes = new 
ArrayList<>(aliveTabletServersById.values());
-        return !serverNodes.isEmpty() ? serverNodes.get(0) : null;
+        if (serverNodes.isEmpty()) {
+            return null;
+        }
+
+        int index = ThreadLocalRandom.current().nextInt(serverNodes.size());
+        return serverNodes.get(index);
     }
 
     /** Get the list of available buckets for this table/partition. */
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java 
b/fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java
index ff810c3dc..943163e44 100644
--- a/fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java
@@ -28,8 +28,10 @@ import org.junit.jupiter.api.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
 import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
@@ -68,7 +70,7 @@ class ClusterTest {
 
     @Test
     void testReturnModifiableCollections() {
-        Cluster cluster = createCluster();
+        Cluster cluster = createCluster(aliveTabletServersById);
         assertThatThrownBy(() -> cluster.getAliveTabletServers().put(1, 
NODES[3]))
                 .isInstanceOf(UnsupportedOperationException.class);
         assertThatThrownBy(
@@ -87,7 +89,7 @@ class ClusterTest {
 
     @Test
     void testGetTable() {
-        Cluster cluster = createCluster();
+        Cluster cluster = createCluster(aliveTabletServersById);
         
assertThat(cluster.getTable(DATA1_TABLE_PATH).get()).isEqualTo(DATA1_TABLE_INFO);
         
assertThat(cluster.getTable(DATA2_TABLE_PATH).get()).isEqualTo(DATA2_TABLE_INFO);
         assertThat(cluster.getSchema(DATA1_TABLE_PATH).get())
@@ -98,7 +100,7 @@ class ClusterTest {
 
     @Test
     void testInvalidMetaAndUpdate() {
-        Cluster cluster = createCluster();
+        Cluster cluster = createCluster(aliveTabletServersById);
         for (int i = 0; i < 10000; i++) {
             // mock invalid meta
             cluster =
@@ -130,7 +132,26 @@ class ClusterTest {
                                         NODES_IDS)));
     }
 
-    private Cluster createCluster() {
+    @Test
+    void testGetRandomTabletServer() {
+        Map<Integer, ServerNode> aliveTabletServersById = new HashMap<>();
+        for (int i = 0; i < 10; i++) {
+            aliveTabletServersById.put(
+                    i, new ServerNode(i, "localhost", 99 + i, 
ServerType.TABLET_SERVER));
+        }
+        Cluster cluster = createCluster(aliveTabletServersById);
+
+        Set<ServerNode> selectedNodes = new HashSet<>();
+        for (int i = 0; i < 10; i++) {
+            ServerNode serverNode = cluster.getRandomTabletServer();
+            assertThat(serverNode).isNotNull();
+            selectedNodes.add(serverNode);
+        }
+
+        assertThat(selectedNodes).hasSizeGreaterThan(1);
+    }
+
+    private Cluster createCluster(Map<Integer, ServerNode> 
aliveTabletServersById) {
         Map<PhysicalTablePath, List<BucketLocation>> 
tablePathToBucketLocations = new HashMap<>();
         tablePathToBucketLocations.put(
                 DATA1_PHYSICAL_TABLE_PATH,

Reply via email to