This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 05458a39a [common] Change Cluster#getRandomTabletServer() to really
random retrieve tabletServer (#1719)
05458a39a is described below
commit 05458a39afd37b01b5110c4f6d6d87a28aa047fb
Author: yunhong <[email protected]>
AuthorDate: Sun Sep 21 21:38:57 2025 +0800
[common] Change Cluster#getRandomTabletServer() to really random retrieve
tabletServer (#1719)
---
.../java/org/apache/fluss/cluster/Cluster.java | 8 +++++-
.../java/org/apache/fluss/cluster/ClusterTest.java | 29 +++++++++++++++++++---
2 files changed, 32 insertions(+), 5 deletions(-)
diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
b/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
index c5528c8f2..9aef86261 100644
--- a/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
+++ b/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
@@ -34,6 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
/**
* An immutable representation of a subset of the server nodes, tables, and
buckets and schemas in
@@ -209,7 +210,12 @@ public final class Cluster {
public ServerNode getRandomTabletServer() {
// TODO this method need to get one tablet server according to the
load.
List<ServerNode> serverNodes = new
ArrayList<>(aliveTabletServersById.values());
- return !serverNodes.isEmpty() ? serverNodes.get(0) : null;
+ if (serverNodes.isEmpty()) {
+ return null;
+ }
+
+ int index = ThreadLocalRandom.current().nextInt(serverNodes.size());
+ return serverNodes.get(index);
}
/** Get the list of available buckets for this table/partition. */
diff --git
a/fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java
b/fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java
index ff810c3dc..943163e44 100644
--- a/fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java
@@ -28,8 +28,10 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
@@ -68,7 +70,7 @@ class ClusterTest {
@Test
void testReturnModifiableCollections() {
- Cluster cluster = createCluster();
+ Cluster cluster = createCluster(aliveTabletServersById);
assertThatThrownBy(() -> cluster.getAliveTabletServers().put(1,
NODES[3]))
.isInstanceOf(UnsupportedOperationException.class);
assertThatThrownBy(
@@ -87,7 +89,7 @@ class ClusterTest {
@Test
void testGetTable() {
- Cluster cluster = createCluster();
+ Cluster cluster = createCluster(aliveTabletServersById);
assertThat(cluster.getTable(DATA1_TABLE_PATH).get()).isEqualTo(DATA1_TABLE_INFO);
assertThat(cluster.getTable(DATA2_TABLE_PATH).get()).isEqualTo(DATA2_TABLE_INFO);
assertThat(cluster.getSchema(DATA1_TABLE_PATH).get())
@@ -98,7 +100,7 @@ class ClusterTest {
@Test
void testInvalidMetaAndUpdate() {
- Cluster cluster = createCluster();
+ Cluster cluster = createCluster(aliveTabletServersById);
for (int i = 0; i < 10000; i++) {
// mock invalid meta
cluster =
@@ -130,7 +132,26 @@ class ClusterTest {
NODES_IDS)));
}
- private Cluster createCluster() {
+ @Test
+ void testGetRandomTabletServer() {
+ Map<Integer, ServerNode> aliveTabletServersById = new HashMap<>();
+ for (int i = 0; i < 10; i++) {
+ aliveTabletServersById.put(
+ i, new ServerNode(i, "localhost", 99 + i,
ServerType.TABLET_SERVER));
+ }
+ Cluster cluster = createCluster(aliveTabletServersById);
+
+ Set<ServerNode> selectedNodes = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ ServerNode serverNode = cluster.getRandomTabletServer();
+ assertThat(serverNode).isNotNull();
+ selectedNodes.add(serverNode);
+ }
+
+ assertThat(selectedNodes).hasSizeGreaterThan(1);
+ }
+
+ private Cluster createCluster(Map<Integer, ServerNode>
aliveTabletServersById) {
Map<PhysicalTablePath, List<BucketLocation>>
tablePathToBucketLocations = new HashMap<>();
tablePathToBucketLocations.put(
DATA1_PHYSICAL_TABLE_PATH,