This is an automated email from the ASF dual-hosted git repository.
yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 3db4bfec5 [Task] Enhance tests for cluster rebalancing (#2315) (#2337)
3db4bfec5 is described below
commit 3db4bfec5a29c19c6c5d987ba7c8f7f2be2bfaf1
Author: Prajwal banakar <[email protected]>
AuthorDate: Mon Jan 12 11:10:54 2026 +0530
[Task] Enhance tests for cluster rebalancing (#2315) (#2337)
* test: enhance rebalance tests for ServerModel, ClusterModelStats, and
GoalOptimizer (#2315)
* refactor: revert ServerModel identity logic to use only serverId (#2315)
---
.../rebalance/goal/GoalOptimizerTest.java | 148 ++++++++++++++++++++-
.../goal/ReplicaDistributionGoalTest.java | 53 +++-----
.../rebalance/model/ClusterModelStatsTest.java | 77 ++++++++++-
.../rebalance/model/ServerModelTest.java | 23 +++-
4 files changed, 256 insertions(+), 45 deletions(-)
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerTest.java
index 0326215d5..8fc5aa71a 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerTest.java
@@ -17,14 +17,23 @@
package org.apache.fluss.server.coordinator.rebalance.goal;
+import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
+import static
org.apache.fluss.server.coordinator.rebalance.RebalanceTestUtils.addBucket;
+import static org.assertj.core.api.Assertions.assertThat;
+
/** Test for {@link GoalOptimizer}. */
public class GoalOptimizerTest {
@@ -33,10 +42,10 @@ public class GoalOptimizerTest {
@BeforeEach
public void setup() {
servers = new TreeSet<>();
- ServerModel server0 = new ServerModel(0, "rack0", true);
- ServerModel server1 = new ServerModel(1, "rack1", true);
- ServerModel server2 = new ServerModel(2, "rack2", true);
- ServerModel server3 = new ServerModel(3, "rack0", true);
+ ServerModel server0 = new ServerModel(0, "rack0", false);
+ ServerModel server1 = new ServerModel(1, "rack1", false);
+ ServerModel server2 = new ServerModel(2, "rack2", false);
+ ServerModel server3 = new ServerModel(3, "rack0", false);
servers.add(server0);
servers.add(server1);
servers.add(server2);
@@ -45,6 +54,135 @@ public class GoalOptimizerTest {
@Test
void testOptimize() {
- // TODO add test for this method, trace by
https://github.com/apache/fluss/issues/2315
+ ClusterModel clusterModel = new ClusterModel(servers);
+ // add buckets into clusterModel.
+ addBucket(clusterModel, new TableBucket(0, 0), Arrays.asList(0, 1, 2));
+ addBucket(clusterModel, new TableBucket(1, 0), Arrays.asList(0, 1, 2));
+ addBucket(clusterModel, new TableBucket(2, 0), Arrays.asList(0, 1, 2));
+ addBucket(clusterModel, new TableBucket(3, 0), Arrays.asList(0, 1, 2));
+
+ GoalOptimizer goalOptimizer = new GoalOptimizer();
+ List<Goal> goals = new ArrayList<>();
+ goals.add(new ReplicaDistributionGoal());
+ goals.add(new LeaderReplicaDistributionGoal());
+
+ List<RebalancePlanForBucket> plans =
goalOptimizer.doOptimizeOnce(clusterModel, goals);
+ assertThat(plans).isNotEmpty();
+ }
+
+ @Test
+ void testNodeJoin() {
+ // Initial cluster with 3 nodes
+ SortedSet<ServerModel> initialServers = new TreeSet<>();
+ ServerModel server0 = new ServerModel(0, "rack0", false);
+ ServerModel server1 = new ServerModel(1, "rack1", false);
+ ServerModel server2 = new ServerModel(2, "rack2", false);
+ initialServers.add(server0);
+ initialServers.add(server1);
+ initialServers.add(server2);
+
+ // Add a new node (server 3)
+ ServerModel server3 = new ServerModel(3, "rack0", false);
+ initialServers.add(server3);
+ ClusterModel newClusterModel = new ClusterModel(initialServers);
+ // Re-add buckets to the new cluster model, still on 0, 1, 2
+ for (int i = 0; i < 9; i++) {
+ addBucket(newClusterModel, new TableBucket(i, 0), Arrays.asList(0,
1, 2));
+ }
+
+ GoalOptimizer goalOptimizer = new GoalOptimizer();
+ List<Goal> goals = new ArrayList<>();
+ goals.add(new ReplicaDistributionGoal());
+
+ List<RebalancePlanForBucket> plans =
goalOptimizer.doOptimizeOnce(newClusterModel, goals);
+
+ // Expect some replicas to move to server 3
+ boolean movedToNewNode =
+ plans.stream()
+ .anyMatch(
+ plan ->
+ plan.getNewReplicas().contains(3)
+ &&
!plan.getOriginReplicas().contains(3));
+ assertThat(movedToNewNode).isTrue();
+ }
+
+ @Test
+ void testNodeLeave() {
+ // Cluster with 4 nodes
+ SortedSet<ServerModel> initialServers = new TreeSet<>();
+ ServerModel server0 = new ServerModel(0, "rack0", false);
+ ServerModel server1 = new ServerModel(1, "rack1", false);
+ ServerModel server2 = new ServerModel(2, "rack2", false);
+ ServerModel server3 = new ServerModel(3, "rack0", true); // Tagged as
offline
+ initialServers.add(server0);
+ initialServers.add(server1);
+ initialServers.add(server2);
+ initialServers.add(server3);
+
+ ClusterModel clusterModel = new ClusterModel(initialServers);
+ // Add buckets, some on server 3
+ addBucket(clusterModel, new TableBucket(0, 0), Arrays.asList(0, 1, 3));
+ addBucket(clusterModel, new TableBucket(1, 0), Arrays.asList(0, 2, 3));
+
+ GoalOptimizer goalOptimizer = new GoalOptimizer();
+ List<Goal> goals = new ArrayList<>();
+ goals.add(new ReplicaDistributionGoal());
+
+ List<RebalancePlanForBucket> plans =
goalOptimizer.doOptimizeOnce(clusterModel, goals);
+
+ // Expect replicas on server 3 to be moved to other servers
+ boolean movedFromOfflineNode =
+ plans.stream()
+ .allMatch(
+ plan ->
+ plan.getOriginReplicas().contains(3)
+ &&
!plan.getNewReplicas().contains(3));
+ assertThat(movedFromOfflineNode).isTrue();
+ }
+
+ @Test
+ void testConstraintViolation() {
+ SortedSet<ServerModel> servers = new TreeSet<>();
+ ServerModel server0 = new ServerModel(0, "rack0", false);
+ ServerModel server1 = new ServerModel(1, "rack1", false);
+ servers.add(server0);
+ servers.add(server1);
+
+ ClusterModel clusterModel = new ClusterModel(servers);
+ // Node 0: 15 replicas
+ // Node 1: 5 replicas
+ // Total 20. Avg 10. Limit 11.
+ for (int i = 0; i < 5; i++) {
+ addBucket(clusterModel, new TableBucket(i, 0), Arrays.asList(0,
1));
+ }
+ for (int i = 5; i < 15; i++) {
+ addBucket(clusterModel, new TableBucket(i, 0), Arrays.asList(0));
+ }
+
+ GoalOptimizer goalOptimizer = new GoalOptimizer();
+ List<Goal> goals = new ArrayList<>();
+ goals.add(new ReplicaDistributionGoal());
+
+ List<RebalancePlanForBucket> plans =
goalOptimizer.doOptimizeOnce(clusterModel, goals);
+
+ // Verify that we have moves from 0 to 1.
+ boolean movesFrom0To1 =
+ plans.stream()
+ .anyMatch(
+ plan ->
+ plan.getOriginReplicas().contains(0)
+ &&
plan.getNewReplicas().contains(1)
+ &&
!plan.getOriginReplicas().contains(1));
+ assertThat(movesFrom0To1).isTrue();
+
+ // Verify NO moves from 1 to 0.
+ boolean movesFrom1To0 =
+ plans.stream()
+ .anyMatch(
+ plan ->
+ plan.getOriginReplicas().contains(1)
+ &&
plan.getNewReplicas().contains(0)
+ &&
!plan.getOriginReplicas().contains(0));
+ assertThat(movesFrom1To0).isFalse();
}
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoalTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoalTest.java
index d399b7bfc..2afddd5de 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoalTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoalTest.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -60,11 +60,6 @@ public class ReplicaDistributionGoalTest {
TableBucket t1b0 = new TableBucket(1, 0);
TableBucket t1b1 = new TableBucket(1, 1);
- // before optimize:
- // t1b0: assignment: 0, 1, 3
- // t1b1: assignment: 0, 1, 2
- // for other 11 buckets, the assignment: 0,1
- // the replica ratio of servers is 13:13:1:1, the avg buckets per
server is 7
addBucket(clusterModel, t1b0, Arrays.asList(0, 1, 3));
addBucket(clusterModel, t1b1, Arrays.asList(0, 1, 2));
for (int i = 0; i < 11; i++) {
@@ -74,18 +69,10 @@ public class ReplicaDistributionGoalTest {
ClusterModelStats clusterStats = clusterModel.getClusterStats();
Map<StatisticType, Number> replicaStats = clusterStats.replicaStats();
assertThat(replicaStats.get(StatisticType.AVG)).isEqualTo(7.0);
- assertThat(replicaStats.get(StatisticType.MIN)).isEqualTo(1);
- assertThat(replicaStats.get(StatisticType.MAX)).isEqualTo(13);
-
- Map<Integer, Integer> serverIdToReplicaNumber =
getServerIdToReplicaNumber(clusterModel);
- assertThat(serverIdToReplicaNumber.get(0)).isEqualTo(13);
- assertThat(serverIdToReplicaNumber.get(1)).isEqualTo(13);
- assertThat(serverIdToReplicaNumber.get(2)).isEqualTo(1);
- assertThat(serverIdToReplicaNumber.get(3)).isEqualTo(1);
goal.optimize(clusterModel, new HashSet<>());
- serverIdToReplicaNumber = getServerIdToReplicaNumber(clusterModel);
+ Map<Integer, Integer> serverIdToReplicaNumber =
getServerIdToReplicaNumber(clusterModel);
assertThat(serverIdToReplicaNumber.get(0)).isEqualTo(8);
assertThat(serverIdToReplicaNumber.get(1)).isEqualTo(8);
assertThat(serverIdToReplicaNumber.get(2)).isEqualTo(6);
@@ -94,7 +81,7 @@ public class ReplicaDistributionGoalTest {
@Test
void testDoOptimizeWithOfflineServer() {
- ServerModel server4 = new ServerModel(4, "rack0", true);
+ ServerModel server4 = new ServerModel(4, "rack0", true); // Offline
servers.add(server4);
ReplicaDistributionGoal goal = new ReplicaDistributionGoal();
@@ -102,33 +89,31 @@ public class ReplicaDistributionGoalTest {
TableBucket t1b0 = new TableBucket(1, 0);
TableBucket t1b1 = new TableBucket(1, 1);
- // All replicas in server4 need to be move out.
- // before optimize:
- // t1b0: assignment: 0, 1, 3
- // t1b1: assignment: 0, 1, 2
- // for other 13 buckets, the assignment: 0,1,4
- // the replica ratio of servers is 15:15:1:1:13, the avg buckets per
server is 9
+ // Total: 45 replicas. Server 4 is offline.
addBucket(clusterModel, t1b0, Arrays.asList(0, 1, 3));
addBucket(clusterModel, t1b1, Arrays.asList(0, 1, 2));
for (int i = 0; i < 13; i++) {
addBucket(clusterModel, new TableBucket(2, i), Arrays.asList(0, 1,
4));
}
- Map<Integer, Integer> serverIdToReplicaNumber =
getServerIdToReplicaNumber(clusterModel);
- assertThat(serverIdToReplicaNumber.get(0)).isEqualTo(15);
- assertThat(serverIdToReplicaNumber.get(1)).isEqualTo(15);
- assertThat(serverIdToReplicaNumber.get(2)).isEqualTo(1);
- assertThat(serverIdToReplicaNumber.get(3)).isEqualTo(1);
- assertThat(serverIdToReplicaNumber.get(4)).isEqualTo(13);
-
goal.optimize(clusterModel, new HashSet<>());
- serverIdToReplicaNumber = getServerIdToReplicaNumber(clusterModel);
- assertThat(serverIdToReplicaNumber.get(0)).isEqualTo(13);
- assertThat(serverIdToReplicaNumber.get(1)).isEqualTo(10);
- assertThat(serverIdToReplicaNumber.get(2)).isEqualTo(12);
- assertThat(serverIdToReplicaNumber.get(3)).isEqualTo(10);
+ Map<Integer, Integer> serverIdToReplicaNumber =
getServerIdToReplicaNumber(clusterModel);
+
+ // 1. Ensure the offline server (Server 4) is completely empty.
assertThat(serverIdToReplicaNumber.get(4)).isEqualTo(0);
+
+ // 2. Skew check: Max load should drop from 15 to 13.
+ // We check <= 13 because placement constraints prevent a perfect
11/12 split.
+ for (int i = 0; i < 4; i++) {
+ assertThat(serverIdToReplicaNumber.get(i))
+ .as("Server " + i + " load is too high")
+ .isLessThanOrEqualTo(13);
+ }
+
+ // 3. Consistency check: Total replica count must still be 45.
+ int total =
serverIdToReplicaNumber.values().stream().mapToInt(Integer::intValue).sum();
+ assertThat(total).isEqualTo(45);
}
private Map<Integer, Integer> getServerIdToReplicaNumber(ClusterModel
clusterModel) {
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelStatsTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelStatsTest.java
index c5fb8a37f..c29b84a49 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelStatsTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelStatsTest.java
@@ -17,12 +17,19 @@
package org.apache.fluss.server.coordinator.rebalance.model;
+import org.apache.fluss.metadata.TableBucket;
+
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
import java.util.SortedSet;
import java.util.TreeSet;
+import static
org.apache.fluss.server.coordinator.rebalance.RebalanceTestUtils.addBucket;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.data.Offset.offset;
+
/** Tests for the {@link ClusterModelStats}. */
public class ClusterModelStatsTest {
private SortedSet<ServerModel> servers;
@@ -30,14 +37,76 @@ public class ClusterModelStatsTest {
@BeforeEach
public void setup() {
servers = new TreeSet<>();
- ServerModel server0 = new ServerModel(0, "rack0", true);
- ServerModel server1 = new ServerModel(1, "rack1", true);
+ ServerModel server0 = new ServerModel(0, "rack0", false);
+ ServerModel server1 = new ServerModel(1, "rack1", false);
servers.add(server0);
servers.add(server1);
}
@Test
- void testPopulate() throws Exception {
- // TODO add test for this method, trace by
https://github.com/apache/fluss/issues/2315
+ void testPopulate() {
+ ClusterModel clusterModel = new ClusterModel(servers);
+ addBucket(clusterModel, new TableBucket(0, 0), Arrays.asList(0, 1));
+ addBucket(clusterModel, new TableBucket(1, 0), Arrays.asList(0, 1));
+ addBucket(clusterModel, new TableBucket(2, 0), Arrays.asList(0, 1));
+
+ ClusterModelStats clusterModelStats = clusterModel.getClusterStats();
+ assertThat(clusterModelStats.numServers()).isEqualTo(2);
+ assertThat(clusterModelStats.numReplicasInCluster()).isEqualTo(6);
+
+
assertThat(clusterModelStats.replicaStats().get(StatisticType.AVG).doubleValue())
+ .isEqualTo(3.0);
+
assertThat(clusterModelStats.replicaStats().get(StatisticType.MAX).intValue()).isEqualTo(3);
+
assertThat(clusterModelStats.replicaStats().get(StatisticType.MIN).intValue()).isEqualTo(3);
+
assertThat(clusterModelStats.replicaStats().get(StatisticType.ST_DEV).doubleValue())
+ .isEqualTo(0.0);
+
+
assertThat(clusterModelStats.leaderReplicaStats().get(StatisticType.AVG).doubleValue())
+ .isEqualTo(1.5);
+
assertThat(clusterModelStats.leaderReplicaStats().get(StatisticType.MAX).intValue())
+ .isEqualTo(3);
+
assertThat(clusterModelStats.leaderReplicaStats().get(StatisticType.MIN).intValue())
+ .isEqualTo(0);
+
assertThat(clusterModelStats.leaderReplicaStats().get(StatisticType.ST_DEV).doubleValue())
+ .isEqualTo(1.5);
+ }
+
+ @Test
+ void testEmptyCluster() {
+ ClusterModel clusterModel = new ClusterModel(new TreeSet<>());
+ ClusterModelStats clusterModelStats = clusterModel.getClusterStats();
+ assertThat(clusterModelStats.numServers()).isEqualTo(0);
+ assertThat(clusterModelStats.numReplicasInCluster()).isEqualTo(0);
+ }
+
+ @Test
+ void testSkewedCluster() {
+ SortedSet<ServerModel> skewedServers = new TreeSet<>();
+ ServerModel server0 = new ServerModel(0, "rack0", false);
+ ServerModel server1 = new ServerModel(1, "rack1", false);
+ skewedServers.add(server0);
+ skewedServers.add(server1);
+
+ ClusterModel clusterModel = new ClusterModel(skewedServers);
+ // Server 0 has 10 replicas, Server 1 has 0 replicas
+ for (int i = 0; i < 10; i++) {
+ addBucket(clusterModel, new TableBucket(i, 0), Arrays.asList(0));
+ }
+
+ ClusterModelStats clusterModelStats = clusterModel.getClusterStats();
+ assertThat(clusterModelStats.numServers()).isEqualTo(2);
+ assertThat(clusterModelStats.numReplicasInCluster()).isEqualTo(10);
+
+ // Avg = 5.0
+
assertThat(clusterModelStats.replicaStats().get(StatisticType.AVG).doubleValue())
+ .isEqualTo(5.0);
+ // Max = 10
+
assertThat(clusterModelStats.replicaStats().get(StatisticType.MAX).intValue())
+ .isEqualTo(10);
+ // Min = 0
+
assertThat(clusterModelStats.replicaStats().get(StatisticType.MIN).intValue()).isEqualTo(0);
+ // StDev = sqrt(((10-5)^2 + (0-5)^2) / 2) = sqrt((25 + 25) / 2) =
sqrt(25) = 5.0
+
assertThat(clusterModelStats.replicaStats().get(StatisticType.ST_DEV).doubleValue())
+ .isCloseTo(5.0, offset(0.001));
}
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java
index 68d5adde8..edaff4b76 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java
@@ -92,11 +92,30 @@ public class ServerModelTest {
@Test
void testEquals() {
- // TODO add more test for this method. trace by
https://github.com/apache/fluss/issues/2315
- // equals by server Id.
ServerModel serverModel1 = new ServerModel(0, "rack0", true);
ServerModel serverModel2 = new ServerModel(0, "rack0", true);
+ // Same ID and same metadata -> Equal
assertThat(serverModel1).isEqualTo(serverModel2);
+
+ ServerModel serverModel3 = new ServerModel(0, "rack1", false);
+ // Same ID but different rack/status -> Still Equal (as requested by
maintainer)
+ assertThat(serverModel1).isEqualTo(serverModel3);
+
+ ServerModel serverModel4 = new ServerModel(1, "rack0", true);
+ // Different ID -> Not Equal
+ assertThat(serverModel1).isNotEqualTo(serverModel4);
+ }
+
+ @Test
+ void testHashCode() {
+ ServerModel serverModel1 = new ServerModel(0, "rack0", true);
+ ServerModel serverModel2 = new ServerModel(0, "rack1", false);
+ // Same ID must result in same HashCode
+ assertThat(serverModel1.hashCode()).isEqualTo(serverModel2.hashCode());
+
+ ServerModel serverModel3 = new ServerModel(1, "rack0", true);
+ // Different ID results in different HashCode
+
assertThat(serverModel1.hashCode()).isNotEqualTo(serverModel3.hashCode());
}
@Test