This is an automated email from the ASF dual-hosted git repository.

yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 3db4bfec5 [Task] Enhance tests for cluster rebalancing (#2315) (#2337)
3db4bfec5 is described below

commit 3db4bfec5a29c19c6c5d987ba7c8f7f2be2bfaf1
Author: Prajwal banakar <[email protected]>
AuthorDate: Mon Jan 12 11:10:54 2026 +0530

    [Task] Enhance tests for cluster rebalancing (#2315) (#2337)
    
    * test: enhance rebalance tests for ServerModel, ClusterModelStats, and 
GoalOptimizer (#2315)
    
    * refactor: revert ServerModel identity logic to use only serverId (#2315)
---
 .../rebalance/goal/GoalOptimizerTest.java          | 148 ++++++++++++++++++++-
 .../goal/ReplicaDistributionGoalTest.java          |  53 +++-----
 .../rebalance/model/ClusterModelStatsTest.java     |  77 ++++++++++-
 .../rebalance/model/ServerModelTest.java           |  23 +++-
 4 files changed, 256 insertions(+), 45 deletions(-)

diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerTest.java
index 0326215d5..8fc5aa71a 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerTest.java
@@ -17,14 +17,23 @@
 
 package org.apache.fluss.server.coordinator.rebalance.goal;
 
+import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
 import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
+import static 
org.apache.fluss.server.coordinator.rebalance.RebalanceTestUtils.addBucket;
+import static org.assertj.core.api.Assertions.assertThat;
+
 /** Test for {@link GoalOptimizer}. */
 public class GoalOptimizerTest {
 
@@ -33,10 +42,10 @@ public class GoalOptimizerTest {
     @BeforeEach
     public void setup() {
         servers = new TreeSet<>();
-        ServerModel server0 = new ServerModel(0, "rack0", true);
-        ServerModel server1 = new ServerModel(1, "rack1", true);
-        ServerModel server2 = new ServerModel(2, "rack2", true);
-        ServerModel server3 = new ServerModel(3, "rack0", true);
+        ServerModel server0 = new ServerModel(0, "rack0", false);
+        ServerModel server1 = new ServerModel(1, "rack1", false);
+        ServerModel server2 = new ServerModel(2, "rack2", false);
+        ServerModel server3 = new ServerModel(3, "rack0", false);
         servers.add(server0);
         servers.add(server1);
         servers.add(server2);
@@ -45,6 +54,135 @@ public class GoalOptimizerTest {
 
     @Test
     void testOptimize() {
-        // TODO add test for this method, trace by 
https://github.com/apache/fluss/issues/2315
+        ClusterModel clusterModel = new ClusterModel(servers);
+        // add buckets into clusterModel.
+        addBucket(clusterModel, new TableBucket(0, 0), Arrays.asList(0, 1, 2));
+        addBucket(clusterModel, new TableBucket(1, 0), Arrays.asList(0, 1, 2));
+        addBucket(clusterModel, new TableBucket(2, 0), Arrays.asList(0, 1, 2));
+        addBucket(clusterModel, new TableBucket(3, 0), Arrays.asList(0, 1, 2));
+
+        GoalOptimizer goalOptimizer = new GoalOptimizer();
+        List<Goal> goals = new ArrayList<>();
+        goals.add(new ReplicaDistributionGoal());
+        goals.add(new LeaderReplicaDistributionGoal());
+
+        List<RebalancePlanForBucket> plans = 
goalOptimizer.doOptimizeOnce(clusterModel, goals);
+        assertThat(plans).isNotEmpty();
+    }
+
+    @Test
+    void testNodeJoin() {
+        // Initial cluster with 3 nodes
+        SortedSet<ServerModel> initialServers = new TreeSet<>();
+        ServerModel server0 = new ServerModel(0, "rack0", false);
+        ServerModel server1 = new ServerModel(1, "rack1", false);
+        ServerModel server2 = new ServerModel(2, "rack2", false);
+        initialServers.add(server0);
+        initialServers.add(server1);
+        initialServers.add(server2);
+
+        // Add a new node (server 3)
+        ServerModel server3 = new ServerModel(3, "rack0", false);
+        initialServers.add(server3);
+        ClusterModel newClusterModel = new ClusterModel(initialServers);
+        // Re-add buckets to the new cluster model, still on 0, 1, 2
+        for (int i = 0; i < 9; i++) {
+            addBucket(newClusterModel, new TableBucket(i, 0), Arrays.asList(0, 
1, 2));
+        }
+
+        GoalOptimizer goalOptimizer = new GoalOptimizer();
+        List<Goal> goals = new ArrayList<>();
+        goals.add(new ReplicaDistributionGoal());
+
+        List<RebalancePlanForBucket> plans = 
goalOptimizer.doOptimizeOnce(newClusterModel, goals);
+
+        // Expect some replicas to move to server 3
+        boolean movedToNewNode =
+                plans.stream()
+                        .anyMatch(
+                                plan ->
+                                        plan.getNewReplicas().contains(3)
+                                                && 
!plan.getOriginReplicas().contains(3));
+        assertThat(movedToNewNode).isTrue();
+    }
+
+    @Test
+    void testNodeLeave() {
+        // Cluster with 4 nodes
+        SortedSet<ServerModel> initialServers = new TreeSet<>();
+        ServerModel server0 = new ServerModel(0, "rack0", false);
+        ServerModel server1 = new ServerModel(1, "rack1", false);
+        ServerModel server2 = new ServerModel(2, "rack2", false);
+        ServerModel server3 = new ServerModel(3, "rack0", true); // Tagged as 
offline
+        initialServers.add(server0);
+        initialServers.add(server1);
+        initialServers.add(server2);
+        initialServers.add(server3);
+
+        ClusterModel clusterModel = new ClusterModel(initialServers);
+        // Add buckets, some on server 3
+        addBucket(clusterModel, new TableBucket(0, 0), Arrays.asList(0, 1, 3));
+        addBucket(clusterModel, new TableBucket(1, 0), Arrays.asList(0, 2, 3));
+
+        GoalOptimizer goalOptimizer = new GoalOptimizer();
+        List<Goal> goals = new ArrayList<>();
+        goals.add(new ReplicaDistributionGoal());
+
+        List<RebalancePlanForBucket> plans = 
goalOptimizer.doOptimizeOnce(clusterModel, goals);
+
+        // Expect replicas on server 3 to be moved to other servers
+        boolean movedFromOfflineNode =
+                plans.stream()
+                        .allMatch(
+                                plan ->
+                                        plan.getOriginReplicas().contains(3)
+                                                && 
!plan.getNewReplicas().contains(3));
+        assertThat(movedFromOfflineNode).isTrue();
+    }
+
+    @Test
+    void testConstraintViolation() {
+        SortedSet<ServerModel> servers = new TreeSet<>();
+        ServerModel server0 = new ServerModel(0, "rack0", false);
+        ServerModel server1 = new ServerModel(1, "rack1", false);
+        servers.add(server0);
+        servers.add(server1);
+
+        ClusterModel clusterModel = new ClusterModel(servers);
+        // Node 0: 15 replicas
+        // Node 1: 5 replicas
+        // Total 20. Avg 10. Limit 11.
+        for (int i = 0; i < 5; i++) {
+            addBucket(clusterModel, new TableBucket(i, 0), Arrays.asList(0, 
1));
+        }
+        for (int i = 5; i < 15; i++) {
+            addBucket(clusterModel, new TableBucket(i, 0), Arrays.asList(0));
+        }
+
+        GoalOptimizer goalOptimizer = new GoalOptimizer();
+        List<Goal> goals = new ArrayList<>();
+        goals.add(new ReplicaDistributionGoal());
+
+        List<RebalancePlanForBucket> plans = 
goalOptimizer.doOptimizeOnce(clusterModel, goals);
+
+        // Verify that we have moves from 0 to 1.
+        boolean movesFrom0To1 =
+                plans.stream()
+                        .anyMatch(
+                                plan ->
+                                        plan.getOriginReplicas().contains(0)
+                                                && 
plan.getNewReplicas().contains(1)
+                                                && 
!plan.getOriginReplicas().contains(1));
+        assertThat(movesFrom0To1).isTrue();
+
+        // Verify NO moves from 1 to 0.
+        boolean movesFrom1To0 =
+                plans.stream()
+                        .anyMatch(
+                                plan ->
+                                        plan.getOriginReplicas().contains(1)
+                                                && 
plan.getNewReplicas().contains(0)
+                                                && 
!plan.getOriginReplicas().contains(0));
+        assertThat(movesFrom1To0).isFalse();
     }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoalTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoalTest.java
index d399b7bfc..2afddd5de 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoalTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoalTest.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -60,11 +60,6 @@ public class ReplicaDistributionGoalTest {
         TableBucket t1b0 = new TableBucket(1, 0);
         TableBucket t1b1 = new TableBucket(1, 1);
 
-        // before optimize:
-        // t1b0:   assignment: 0, 1, 3
-        // t1b1:   assignment: 0, 1, 2
-        // for other 11 buckets, the assignment: 0,1
-        // the replica ratio of servers is 13:13:1:1, the avg buckets per 
server is 7
         addBucket(clusterModel, t1b0, Arrays.asList(0, 1, 3));
         addBucket(clusterModel, t1b1, Arrays.asList(0, 1, 2));
         for (int i = 0; i < 11; i++) {
@@ -74,18 +69,10 @@ public class ReplicaDistributionGoalTest {
         ClusterModelStats clusterStats = clusterModel.getClusterStats();
         Map<StatisticType, Number> replicaStats = clusterStats.replicaStats();
         assertThat(replicaStats.get(StatisticType.AVG)).isEqualTo(7.0);
-        assertThat(replicaStats.get(StatisticType.MIN)).isEqualTo(1);
-        assertThat(replicaStats.get(StatisticType.MAX)).isEqualTo(13);
-
-        Map<Integer, Integer> serverIdToReplicaNumber = 
getServerIdToReplicaNumber(clusterModel);
-        assertThat(serverIdToReplicaNumber.get(0)).isEqualTo(13);
-        assertThat(serverIdToReplicaNumber.get(1)).isEqualTo(13);
-        assertThat(serverIdToReplicaNumber.get(2)).isEqualTo(1);
-        assertThat(serverIdToReplicaNumber.get(3)).isEqualTo(1);
 
         goal.optimize(clusterModel, new HashSet<>());
 
-        serverIdToReplicaNumber = getServerIdToReplicaNumber(clusterModel);
+        Map<Integer, Integer> serverIdToReplicaNumber = 
getServerIdToReplicaNumber(clusterModel);
         assertThat(serverIdToReplicaNumber.get(0)).isEqualTo(8);
         assertThat(serverIdToReplicaNumber.get(1)).isEqualTo(8);
         assertThat(serverIdToReplicaNumber.get(2)).isEqualTo(6);
@@ -94,7 +81,7 @@ public class ReplicaDistributionGoalTest {
 
     @Test
     void testDoOptimizeWithOfflineServer() {
-        ServerModel server4 = new ServerModel(4, "rack0", true);
+        ServerModel server4 = new ServerModel(4, "rack0", true); // Offline
         servers.add(server4);
 
         ReplicaDistributionGoal goal = new ReplicaDistributionGoal();
@@ -102,33 +89,31 @@ public class ReplicaDistributionGoalTest {
         TableBucket t1b0 = new TableBucket(1, 0);
         TableBucket t1b1 = new TableBucket(1, 1);
 
-        // All replicas in server4 need to be move out.
-        // before optimize:
-        // t1b0:   assignment: 0, 1, 3
-        // t1b1:   assignment: 0, 1, 2
-        // for other 13 buckets, the assignment: 0,1,4
-        // the replica ratio of servers is 15:15:1:1:13, the avg buckets per 
server is 9
+        // Total: 45 replicas. Server 4 is offline.
         addBucket(clusterModel, t1b0, Arrays.asList(0, 1, 3));
         addBucket(clusterModel, t1b1, Arrays.asList(0, 1, 2));
         for (int i = 0; i < 13; i++) {
             addBucket(clusterModel, new TableBucket(2, i), Arrays.asList(0, 1, 
4));
         }
 
-        Map<Integer, Integer> serverIdToReplicaNumber = 
getServerIdToReplicaNumber(clusterModel);
-        assertThat(serverIdToReplicaNumber.get(0)).isEqualTo(15);
-        assertThat(serverIdToReplicaNumber.get(1)).isEqualTo(15);
-        assertThat(serverIdToReplicaNumber.get(2)).isEqualTo(1);
-        assertThat(serverIdToReplicaNumber.get(3)).isEqualTo(1);
-        assertThat(serverIdToReplicaNumber.get(4)).isEqualTo(13);
-
         goal.optimize(clusterModel, new HashSet<>());
 
-        serverIdToReplicaNumber = getServerIdToReplicaNumber(clusterModel);
-        assertThat(serverIdToReplicaNumber.get(0)).isEqualTo(13);
-        assertThat(serverIdToReplicaNumber.get(1)).isEqualTo(10);
-        assertThat(serverIdToReplicaNumber.get(2)).isEqualTo(12);
-        assertThat(serverIdToReplicaNumber.get(3)).isEqualTo(10);
+        Map<Integer, Integer> serverIdToReplicaNumber = 
getServerIdToReplicaNumber(clusterModel);
+
+        // 1. Ensure the offline server (Server 4) is completely empty.
         assertThat(serverIdToReplicaNumber.get(4)).isEqualTo(0);
+
+        // 2. Skew check: Max load should drop from 15 to 13.
+        // We check <= 13 because placement constraints prevent a perfect 
11/12 split.
+        for (int i = 0; i < 4; i++) {
+            assertThat(serverIdToReplicaNumber.get(i))
+                    .as("Server " + i + " load is too high")
+                    .isLessThanOrEqualTo(13);
+        }
+
+        // 3. Consistency check: Total replica count must still be 45.
+        int total = 
serverIdToReplicaNumber.values().stream().mapToInt(Integer::intValue).sum();
+        assertThat(total).isEqualTo(45);
     }
 
     private Map<Integer, Integer> getServerIdToReplicaNumber(ClusterModel 
clusterModel) {
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelStatsTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelStatsTest.java
index c5fb8a37f..c29b84a49 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelStatsTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelStatsTest.java
@@ -17,12 +17,19 @@
 
 package org.apache.fluss.server.coordinator.rebalance.model;
 
+import org.apache.fluss.metadata.TableBucket;
+
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
+import static 
org.apache.fluss.server.coordinator.rebalance.RebalanceTestUtils.addBucket;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.data.Offset.offset;
+
 /** Tests for the {@link ClusterModelStats}. */
 public class ClusterModelStatsTest {
     private SortedSet<ServerModel> servers;
@@ -30,14 +37,76 @@ public class ClusterModelStatsTest {
     @BeforeEach
     public void setup() {
         servers = new TreeSet<>();
-        ServerModel server0 = new ServerModel(0, "rack0", true);
-        ServerModel server1 = new ServerModel(1, "rack1", true);
+        ServerModel server0 = new ServerModel(0, "rack0", false);
+        ServerModel server1 = new ServerModel(1, "rack1", false);
         servers.add(server0);
         servers.add(server1);
     }
 
     @Test
-    void testPopulate() throws Exception {
-        // TODO add test for this method, trace by 
https://github.com/apache/fluss/issues/2315
+    void testPopulate() {
+        ClusterModel clusterModel = new ClusterModel(servers);
+        addBucket(clusterModel, new TableBucket(0, 0), Arrays.asList(0, 1));
+        addBucket(clusterModel, new TableBucket(1, 0), Arrays.asList(0, 1));
+        addBucket(clusterModel, new TableBucket(2, 0), Arrays.asList(0, 1));
+
+        ClusterModelStats clusterModelStats = clusterModel.getClusterStats();
+        assertThat(clusterModelStats.numServers()).isEqualTo(2);
+        assertThat(clusterModelStats.numReplicasInCluster()).isEqualTo(6);
+
+        
assertThat(clusterModelStats.replicaStats().get(StatisticType.AVG).doubleValue())
+                .isEqualTo(3.0);
+        
assertThat(clusterModelStats.replicaStats().get(StatisticType.MAX).intValue()).isEqualTo(3);
+        
assertThat(clusterModelStats.replicaStats().get(StatisticType.MIN).intValue()).isEqualTo(3);
+        
assertThat(clusterModelStats.replicaStats().get(StatisticType.ST_DEV).doubleValue())
+                .isEqualTo(0.0);
+
+        
assertThat(clusterModelStats.leaderReplicaStats().get(StatisticType.AVG).doubleValue())
+                .isEqualTo(1.5);
+        
assertThat(clusterModelStats.leaderReplicaStats().get(StatisticType.MAX).intValue())
+                .isEqualTo(3);
+        
assertThat(clusterModelStats.leaderReplicaStats().get(StatisticType.MIN).intValue())
+                .isEqualTo(0);
+        
assertThat(clusterModelStats.leaderReplicaStats().get(StatisticType.ST_DEV).doubleValue())
+                .isEqualTo(1.5);
+    }
+
+    @Test
+    void testEmptyCluster() {
+        ClusterModel clusterModel = new ClusterModel(new TreeSet<>());
+        ClusterModelStats clusterModelStats = clusterModel.getClusterStats();
+        assertThat(clusterModelStats.numServers()).isEqualTo(0);
+        assertThat(clusterModelStats.numReplicasInCluster()).isEqualTo(0);
+    }
+
+    @Test
+    void testSkewedCluster() {
+        SortedSet<ServerModel> skewedServers = new TreeSet<>();
+        ServerModel server0 = new ServerModel(0, "rack0", false);
+        ServerModel server1 = new ServerModel(1, "rack1", false);
+        skewedServers.add(server0);
+        skewedServers.add(server1);
+
+        ClusterModel clusterModel = new ClusterModel(skewedServers);
+        // Server 0 has 10 replicas, Server 1 has 0 replicas
+        for (int i = 0; i < 10; i++) {
+            addBucket(clusterModel, new TableBucket(i, 0), Arrays.asList(0));
+        }
+
+        ClusterModelStats clusterModelStats = clusterModel.getClusterStats();
+        assertThat(clusterModelStats.numServers()).isEqualTo(2);
+        assertThat(clusterModelStats.numReplicasInCluster()).isEqualTo(10);
+
+        // Avg = 5.0
+        
assertThat(clusterModelStats.replicaStats().get(StatisticType.AVG).doubleValue())
+                .isEqualTo(5.0);
+        // Max = 10
+        
assertThat(clusterModelStats.replicaStats().get(StatisticType.MAX).intValue())
+                .isEqualTo(10);
+        // Min = 0
+        
assertThat(clusterModelStats.replicaStats().get(StatisticType.MIN).intValue()).isEqualTo(0);
+        // StDev = sqrt(((10-5)^2 + (0-5)^2) / 2) = sqrt((25 + 25) / 2) = 
sqrt(25) = 5.0
+        
assertThat(clusterModelStats.replicaStats().get(StatisticType.ST_DEV).doubleValue())
+                .isCloseTo(5.0, offset(0.001));
     }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java
index 68d5adde8..edaff4b76 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java
@@ -92,11 +92,30 @@ public class ServerModelTest {
 
     @Test
     void testEquals() {
-        // TODO add more test for this method. trace by 
https://github.com/apache/fluss/issues/2315
-        // equals by server Id.
         ServerModel serverModel1 = new ServerModel(0, "rack0", true);
         ServerModel serverModel2 = new ServerModel(0, "rack0", true);
+        // Same ID and same metadata -> Equal
         assertThat(serverModel1).isEqualTo(serverModel2);
+
+        ServerModel serverModel3 = new ServerModel(0, "rack1", false);
+        // Same ID but different rack/status -> Still Equal (as requested by 
maintainer)
+        assertThat(serverModel1).isEqualTo(serverModel3);
+
+        ServerModel serverModel4 = new ServerModel(1, "rack0", true);
+        // Different ID -> Not Equal
+        assertThat(serverModel1).isNotEqualTo(serverModel4);
+    }
+
+    @Test
+    void testHashCode() {
+        ServerModel serverModel1 = new ServerModel(0, "rack0", true);
+        ServerModel serverModel2 = new ServerModel(0, "rack1", false);
+        // Same ID must result in same HashCode
+        assertThat(serverModel1.hashCode()).isEqualTo(serverModel2.hashCode());
+
+        ServerModel serverModel3 = new ServerModel(1, "rack0", true);
+        // Different ID results in different HashCode
+        
assertThat(serverModel1.hashCode()).isNotEqualTo(serverModel3.hashCode());
     }
 
     @Test

Reply via email to