This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 65e4f372d [server] Support RackAwareGoal for rebalance (#2380)
65e4f372d is described below

commit 65e4f372def087c0bb813a34d5bf8049ed5123e4
Author: yunhong <[email protected]>
AuthorDate: Sun Feb 1 15:42:30 2026 +0800

    [server] Support RackAwareGoal for rebalance (#2380)
---
 .../fluss/client/admin/RackAwareClusterITCase.java | 110 ++++++++++++
 .../apache/fluss/cluster/rebalance/GoalType.java   |  10 +-
 .../coordinator/rebalance/goal/GoalUtils.java      |  18 ++
 .../rebalance/goal/RackAwareAbstractGoal.java      | 160 +++++++++++++++++
 .../coordinator/rebalance/goal/RackAwareGoal.java  | 142 +++++++++++++++
 .../rebalance/goal/ReplicaDistributionGoal.java    |   6 +-
 .../coordinator/rebalance/model/BucketModel.java   |  11 ++
 .../coordinator/rebalance/model/ClusterModel.java  |  37 ++++
 .../coordinator/rebalance/model/RackModel.java     |  12 ++
 .../coordinator/rebalance/model/ServerModel.java   |   2 +-
 .../rebalance/goal/RackAwareGoalTest.java          | 199 +++++++++++++++++++++
 .../rebalance/model/ClusterModelTest.java          |  15 ++
 .../rebalance/model/ServerModelTest.java           |   4 +-
 .../server/testutils/FlussClusterExtension.java    |  38 +++-
 14 files changed, 753 insertions(+), 11 deletions(-)

diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/RackAwareClusterITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/RackAwareClusterITCase.java
new file mode 100644
index 000000000..edf57fb32
--- /dev/null
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/RackAwareClusterITCase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.admin;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.server.zk.data.BucketAssignment;
+import org.apache.fluss.server.zk.data.TableAssignment;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.Optional;
+
+import static org.apache.fluss.client.admin.FlussAdminITCase.DEFAULT_SCHEMA;
+import static 
org.apache.fluss.client.admin.FlussAdminITCase.DEFAULT_TABLE_PATH;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for rack aware cluster. */
+public class RackAwareClusterITCase {
+
+    @RegisterExtension
+    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+            FlussClusterExtension.builder()
+                    .setNumOfTabletServers(4)
+                    .setRacks(new String[] {"rack-0", "rack-1", "rack-2", 
"rack-0"})
+                    .build();
+
+    protected Connection conn;
+    protected Admin admin;
+    protected Configuration clientConf;
+
+    @BeforeEach
+    protected void setup() throws Exception {
+        clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+        conn = ConnectionFactory.createConnection(clientConf);
+        admin = conn.getAdmin();
+    }
+
+    @AfterEach
+    protected void teardown() throws Exception {
+        if (admin != null) {
+            admin.close();
+            admin = null;
+        }
+
+        if (conn != null) {
+            conn.close();
+            conn = null;
+        }
+    }
+
+    @Test
+    void testCreateTableWithInsufficientRack() throws Exception {
+        TablePath tablePath =
+                TablePath.of(DEFAULT_TABLE_PATH.getDatabaseName(), 
"t1-with-replica-factor-4");
+        // set replica factor to a number larger than available racks (ts-0: 
rack-0, ts-1: rack-1,
+        // ts-2: rack-2)
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(DEFAULT_SCHEMA)
+                        .comment("test table")
+                        .customProperty("connector", "fluss")
+                        .distributedBy(1, "id")
+                        
.property(ConfigOptions.TABLE_REPLICATION_FACTOR.key(), "4")
+                        .build();
+        admin.createDatabase(DEFAULT_TABLE_PATH.getDatabaseName(), 
DatabaseDescriptor.EMPTY, false)
+                .get();
+
+        // In this case, create table will success with two replicas on one 
rack.
+        admin.createTable(tablePath, tableDescriptor, false).get();
+
+        TableInfo tableInfo = admin.getTableInfo(tablePath).get();
+        Optional<TableAssignment> assignmentOpt =
+                FLUSS_CLUSTER_EXTENSION
+                        .getZooKeeperClient()
+                        .getTableAssignment(tableInfo.getTableId());
+        assertThat(assignmentOpt.isPresent()).isTrue();
+        TableAssignment assignment = assignmentOpt.get();
+        BucketAssignment bucketAssignment = assignment.getBucketAssignment(0);
+        
assertThat(bucketAssignment.getReplicas()).containsExactlyInAnyOrder(0, 1, 2, 
3);
+
+        admin.dropTable(tablePath, false).get();
+        admin.dropDatabase(DEFAULT_TABLE_PATH.getDatabaseName(), false, 
false).get();
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java 
b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java
index 6fb27c3a7..7a01116c7 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java
@@ -38,7 +38,13 @@ public enum GoalType {
      * Goal to generate leadership movement and leader replica movement tasks 
to ensure that the
      * number of leader replicas on each tabletServer is near balanced.
      */
-    LEADER_DISTRIBUTION(1);
+    LEADER_DISTRIBUTION(1),
+
+    /**
+     * Goal to generate replica movement tasks to ensure that the number of 
replicas on each
+     * tabletServer is near balanced and the replicas are distributed across 
racks.
+     */
+    RACK_AWARE(2);
 
     public final int value;
 
@@ -51,6 +57,8 @@ public enum GoalType {
             return REPLICA_DISTRIBUTION;
         } else if (value == LEADER_DISTRIBUTION.value) {
             return LEADER_DISTRIBUTION;
+        } else if (value == RACK_AWARE.value) {
+            return RACK_AWARE;
         } else {
             throw new IllegalArgumentException(
                     String.format(
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalUtils.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalUtils.java
index 3385964ce..652b2a231 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalUtils.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalUtils.java
@@ -20,6 +20,7 @@ package org.apache.fluss.server.coordinator.rebalance.goal;
 import org.apache.fluss.cluster.rebalance.GoalType;
 import org.apache.fluss.server.coordinator.rebalance.ActionType;
 import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModelStats;
 import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
 import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
 
@@ -36,6 +37,8 @@ public class GoalUtils {
                 return new ReplicaDistributionGoal();
             case LEADER_DISTRIBUTION:
                 return new LeaderReplicaDistributionGoal();
+            case RACK_AWARE:
+                return new RackAwareGoal();
             default:
                 throw new IllegalArgumentException("Unsupported goal type " + 
goalType);
         }
@@ -76,4 +79,19 @@ public class GoalUtils {
                 .map(ServerModel::id)
                 .collect(Collectors.toCollection(HashSet::new));
     }
+
+    /** A convenience {@link Goal.ClusterModelStatsComparator} for typical 
hard goals. */
+    public static class HardGoalStatsComparator implements 
Goal.ClusterModelStatsComparator {
+        @Override
+        public int compare(ClusterModelStats stats1, ClusterModelStats stats2) 
{
+            // Stats are irrelevant to a hard goal. The optimization would 
already fail if the goal
+            // requirements are not met.
+            return 0;
+        }
+
+        @Override
+        public String explainLastComparison() {
+            return null;
+        }
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareAbstractGoal.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareAbstractGoal.java
new file mode 100644
index 000000000..bbdbf1532
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareAbstractGoal.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.exception.RebalanceFailureException;
+import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance;
+import org.apache.fluss.server.coordinator.rebalance.ActionType;
+import org.apache.fluss.server.coordinator.rebalance.RebalancingAction;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.SortedSet;
+
+import static 
org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT;
+import static 
org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.SERVER_REJECT;
+
+/** An abstract class for rack-aware goals. */
+public abstract class RackAwareAbstractGoal extends AbstractGoal {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RackAwareAbstractGoal.class);
+
+    @Override
+    public ClusterModelStatsComparator clusterModelStatsComparator() {
+        return new GoalUtils.HardGoalStatsComparator();
+    }
+
+    @Override
+    protected boolean selfSatisfied(ClusterModel clusterModel, 
RebalancingAction action) {
+        return true;
+    }
+
+    /**
+     * Check whether the given action is acceptable by this goal. The 
following actions are
+     * acceptable:
+     *
+     * <ul>
+     *   <li>All leadership moves
+     *   <li>Replica moves that do not violate {@link
+     *       #doesReplicaMoveViolateActionAcceptance(ClusterModel, 
ReplicaModel, ServerModel)}
+     * </ul>
+     *
+     * @param action Action to be checked for acceptance.
+     * @param clusterModel The state of the cluster.
+     * @return {@link ActionAcceptance#ACCEPT} if the action is acceptable by 
this goal, {@link
+     *     ActionAcceptance#SERVER_REJECT} if the action is rejected due to 
violating rack awareness
+     *     in the destination broker after moving source replica to 
destination broker.
+     */
+    @Override
+    public ActionAcceptance actionAcceptance(RebalancingAction action, 
ClusterModel clusterModel) {
+        switch (action.getActionType()) {
+            case LEADERSHIP_MOVEMENT:
+                return ACCEPT;
+            case REPLICA_MOVEMENT:
+                if (doesReplicaMoveViolateActionAcceptance(
+                        clusterModel,
+                        clusterModel
+                                .server(action.getSourceServerId())
+                                .replica(action.getTableBucket()),
+                        clusterModel.server(action.getDestinationServerId()))) 
{
+                    return SERVER_REJECT;
+                }
+                return ACCEPT;
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported rebalance action " + 
action.getActionType() + " is provided.");
+        }
+    }
+
+    /**
+     * Check whether the given replica move would violate the action 
acceptance for this rack aware
+     * goal.
+     *
+     * @param clusterModel The state of the cluster.
+     * @param sourceReplica Source replica
+     * @param destServer Destination server to receive the given source 
replica.
+     * @return {@code true} if the given replica move would violate action 
acceptance (i.e. the move
+     *     is not acceptable), {@code false} otherwise.
+     */
+    protected abstract boolean doesReplicaMoveViolateActionAcceptance(
+            ClusterModel clusterModel, ReplicaModel sourceReplica, ServerModel 
destServer);
+
+    /**
+     * Rebalance the given serverModel without violating the constraints of 
this rack aware goal and
+     * optimized goals.
+     *
+     * @param serverModel Server to be balanced.
+     * @param clusterModel The state of the cluster.
+     * @param optimizedGoals Optimized goals.
+     */
+    protected void rebalanceForServer(
+            ServerModel serverModel, ClusterModel clusterModel, Set<Goal> 
optimizedGoals)
+            throws RebalanceFailureException {
+        // TODO maybe use a sorted replicas set
+        for (ReplicaModel replica : serverModel.replicas()) {
+            if (!serverModel.isOfflineTagged()
+                    && shouldKeepInTheCurrentServer(replica, clusterModel)) {
+                continue;
+            }
+            // The relevant rack awareness condition is violated. Move replica 
to an eligible
+            // serverModel
+            SortedSet<ServerModel> eligibleServers =
+                    rackAwareEligibleServers(replica, clusterModel);
+            if (maybeApplyBalancingAction(
+                            clusterModel,
+                            replica,
+                            eligibleServers,
+                            ActionType.REPLICA_MOVEMENT,
+                            optimizedGoals)
+                    == null) {
+                LOG.debug(
+                        "Cannot move replica {} to any serverModel in {}",
+                        replica,
+                        eligibleServers);
+            }
+        }
+    }
+
+    /**
+     * Check whether the given alive replica should stay in the current server 
or be moved to
+     * another server to satisfy the specific requirements of the rack aware 
goal in the given
+     * cluster state.
+     *
+     * @param replica An alive replica to check whether it should stay in the 
current server.
+     * @param clusterModel The state of the cluster.
+     * @return {@code true} if the given alive replica should stay in the 
current server, {@code
+     *     false} otherwise.
+     */
+    protected abstract boolean shouldKeepInTheCurrentServer(
+            ReplicaModel replica, ClusterModel clusterModel);
+
+    /**
+     * Get a list of eligible servers for moving the given replica in the 
given cluster to satisfy
+     * the specific requirements of the rack aware goal.
+     *
+     * @param replica Replica for which a set of rack aware eligible servers 
are requested.
+     * @param clusterModel The state of the cluster.
+     * @return A list of rack aware eligible servers for the given replica in 
the given cluster.
+     */
+    protected abstract SortedSet<ServerModel> rackAwareEligibleServers(
+            ReplicaModel replica, ClusterModel clusterModel);
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoal.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoal.java
new file mode 100644
index 000000000..ba1d4bd3b
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoal.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.exception.RebalanceFailureException;
+import org.apache.fluss.server.coordinator.rebalance.model.BucketModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.RackModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * Generate replica movement proposals to provide rack-aware replica 
distribution, which ensure that
+ * all replicas of each bucket are assigned in a rack aware manner -- i.e. no 
more than one replica
+ * of each bucket resides in the same rack.
+ */
+public class RackAwareGoal extends RackAwareAbstractGoal {
+
+    @Override
+    protected boolean doesReplicaMoveViolateActionAcceptance(
+            ClusterModel clusterModel, ReplicaModel sourceReplica, ServerModel 
destServer) {
+        // Destination server cannot be in a rack that violates rack awareness.
+        BucketModel bucket = clusterModel.bucket(sourceReplica.tableBucket());
+        checkNotNull(bucket, "Bucket for replica " + sourceReplica + " is not 
found");
+        Set<ServerModel> bucketServers = bucket.bucketServers();
+        bucketServers.remove(sourceReplica.server());
+
+        // If destination server exists on any of the rack of other replicas, 
it violates the
+        // rack-awareness
+        return 
bucketServers.stream().map(ServerModel::rack).anyMatch(destServer.rack()::equals);
+    }
+
+    /**
+     * This is a hard goal; hence, the proposals are not limited to dead 
server replicas in case of
+     * self-healing. Sanity Check: There exists sufficient number of racks for 
achieving
+     * rack-awareness.
+     *
+     * @param clusterModel The state of the cluster.
+     */
+    @Override
+    protected void initGoalState(ClusterModel clusterModel) throws 
RebalanceFailureException {
+        // Sanity Check: not enough racks to satisfy rack awareness.
+        // Assumes number of racks doesn't exceed Integer.MAX_VALUE.
+        int numAvailableRacks =
+                (int)
+                        
clusterModel.racksContainServerWithoutOfflineTag().stream()
+                                .map(RackModel::rack)
+                                .distinct()
+                                .count();
+        if (clusterModel.maxReplicationFactor() > numAvailableRacks) {
+            throw new RebalanceFailureException(
+                    String.format(
+                            "[%s] Insufficient number of racks to distribute 
each replica (Current: %d, Needed: %d).",
+                            name(), numAvailableRacks, 
clusterModel.maxReplicationFactor()));
+        }
+    }
+
+    /**
+     * Update goal state. Sanity check: After completion of balancing, confirm 
that replicas of each
+     * bucket reside at a separate rack.
+     *
+     * @param clusterModel The state of the cluster.
+     */
+    @Override
+    protected void updateGoalState(ClusterModel clusterModel) throws 
RebalanceFailureException {
+        finish();
+    }
+
+    /**
+     * Get a list of rack aware eligible servers for the given replica in the 
given cluster. A
+     * server is rack aware eligible for a given replica if the server resides 
in a rack where no
+     * other server in the same rack contains a replica from the same bucket 
of the given replica.
+     *
+     * @param replica Replica for which a set of rack aware eligible servers 
are requested.
+     * @param clusterModel The state of the cluster.
+     * @return A list of rack aware eligible servers for the given replica in 
the given cluster.
+     */
+    @Override
+    protected SortedSet<ServerModel> rackAwareEligibleServers(
+            ReplicaModel replica, ClusterModel clusterModel) {
+        // Populate bucket rackIds.
+        BucketModel bucket = clusterModel.bucket(replica.tableBucket());
+        checkNotNull(bucket, "Bucket for replica " + replica + " is not 
found");
+        List<String> bucketRackIds =
+                
bucket.bucketServers().stream().map(ServerModel::rack).collect(Collectors.toList());
+
+        // Remove rackId of the given replica, but if there is any other 
replica from the bucket
+        // residing in the  same cluster, keep its rackId in the list.
+        bucketRackIds.remove(replica.server().rack());
+
+        SortedSet<ServerModel> rackAwareEligibleServers =
+                new TreeSet<>(Comparator.comparingInt(ServerModel::id));
+        for (ServerModel server : clusterModel.aliveServers()) {
+            if (!bucketRackIds.contains(server.rack())) {
+                rackAwareEligibleServers.add(server);
+            }
+        }
+        // Return eligible servers.
+        return rackAwareEligibleServers;
+    }
+
+    @Override
+    protected boolean shouldKeepInTheCurrentServer(
+            ReplicaModel replica, ClusterModel clusterModel) {
+        // Rack awareness requires no more than one replica from a given 
bucket residing in any
+        // rack in the cluster
+        String myRackId = replica.server().rack();
+        int myServerId = replica.serverId();
+        BucketModel bucket = clusterModel.bucket(replica.tableBucket());
+        checkNotNull(bucket, "Bucket for replica " + replica + " is not 
found");
+        for (ServerModel bucketServer : bucket.bucketServers()) {
+            if (myRackId.equals(bucketServer.rack()) && myServerId != 
bucketServer.id()) {
+                return false;
+            }
+        }
+        return true;
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoal.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoal.java
index ae269474b..66e63b60e 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoal.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoal.java
@@ -61,9 +61,9 @@ public class ReplicaDistributionGoal extends 
ReplicaDistributionAbstractGoal {
 
     // TODO configurable.
     /**
-     * The maximum allowed extent of unbalance for replica leader replica 
distribution. For example,
-     * 1.10 means the highest leader replica count of a server should not be 
1.10x of average leader
-     * replica count of all alive tabletServers.
+     * The maximum allowed extent of unbalance for replica distribution. For 
example, 1.10 means the
+     * highest replica count of a server should not be 1.10x of average 
replica count of all alive
+     * tabletServers.
      */
     private static final Double REPLICA_COUNT_REBALANCE_THRESHOLD = 1.10d;
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/BucketModel.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/BucketModel.java
index d64c464eb..9cf189776 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/BucketModel.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/BucketModel.java
@@ -62,6 +62,17 @@ public class BucketModel {
         return bucketServers;
     }
 
+    public List<ServerModel> followerServers() {
+        List<ServerModel> followerServers = new ArrayList<>();
+        replicas.forEach(
+                replica -> {
+                    if (!replica.isLeader()) {
+                        followerServers.add(replica.server());
+                    }
+                });
+        return followerServers;
+    }
+
     public boolean canAssignReplicaToServer(ServerModel candidateServer) {
         return !ineligibleServers.contains(candidateServer);
     }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModel.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModel.java
index d55f6a3cb..8d481ecfd 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModel.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModel.java
@@ -53,9 +53,14 @@ public class ClusterModel {
     private final SortedSet<ServerModel> servers;
     private final Map<TableBucket, BucketModel> bucketsByTableBucket;
 
+    // An integer to keep track of the maximum replication factor that a 
bucket was ever created
+    // with.
+    private int maxReplicationFactor;
+
     public ClusterModel(SortedSet<ServerModel> servers) {
         this.servers = servers;
         this.bucketsByTableBucket = new HashMap<>();
+        this.maxReplicationFactor = 1;
 
         this.aliveServers = new HashSet<>();
         this.offlineServers = new TreeSet<>();
@@ -76,6 +81,13 @@ public class ClusterModel {
         }
     }
 
+    /**
+     * @return The maximum replication factor of a bucket that was added to 
the cluster before.
+     */
+    public int maxReplicationFactor() {
+        return maxReplicationFactor;
+    }
+
     public SortedSet<ServerModel> offlineServers() {
         return offlineServers;
     }
@@ -88,6 +100,15 @@ public class ClusterModel {
         return Collections.unmodifiableSet(aliveServers);
     }
 
+    /**
+     * @return Racks that contain a server without offline tag.
+     */
+    public Set<RackModel> racksContainServerWithoutOfflineTag() {
+        return racksById.values().stream()
+                .filter(RackModel::rackContainsServerWithoutOfflineTag)
+                .collect(Collectors.toSet());
+    }
+
     public @Nullable BucketModel bucket(TableBucket tableBucket) {
         return bucketsByTableBucket.get(tableBucket);
     }
@@ -110,6 +131,20 @@ public class ClusterModel {
         return bucketsByTableBucket.values().stream().mapToInt(p -> 
p.replicas().size()).sum();
     }
 
+    /**
+     * @return All the leader replicas in the cluster.
+     */
+    public Set<ReplicaModel> leaderReplicas() {
+        Set<ReplicaModel> leaderReplicas = new HashSet<>();
+        for (BucketModel bucket : bucketsByTableBucket.values()) {
+            ReplicaModel leader = bucket.leader();
+            if (leader != null) {
+                leaderReplicas.add(leader);
+            }
+        }
+        return leaderReplicas;
+    }
+
     public int numLeaderReplicas() {
         int numLeaderReplicas = 0;
         for (BucketModel bucket : bucketsByTableBucket.values()) {
@@ -193,6 +228,8 @@ public class ClusterModel {
         } else {
             bucket.addFollower(replica, index);
         }
+
+        maxReplicationFactor = Math.max(maxReplicationFactor, 
bucket.replicas().size());
     }
 
     /**
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/RackModel.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/RackModel.java
index fdf9cbad8..d5a95bbef 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/RackModel.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/RackModel.java
@@ -59,6 +59,18 @@ public class RackModel {
         return rack;
     }
 
+    /**
+     * @return true if the rack contains a server without offline tag.
+     */
+    public boolean rackContainsServerWithoutOfflineTag() {
+        for (ServerModel server : servers.values()) {
+            if (!server.isOfflineTagged()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     @Nullable
     ServerModel server(int serverId) {
         return servers.get(serverId);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModel.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModel.java
index 8b853ba32..116d4b4e1 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModel.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModel.java
@@ -190,7 +190,7 @@ public class ServerModel implements Comparable<ServerModel> 
{
     @Override
     public String toString() {
         return String.format(
-                "ServerModel[id=%s,rack=%s,isAlive=%s,replicaCount=%s]",
+                
"ServerModel[id=%s,rack=%s,isOfflineTagged=%s,replicaCount=%s]",
                 serverId, rack, isOfflineTagged, replicas.size());
     }
 
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoalTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoalTest.java
new file mode 100644
index 000000000..d73af6c1f
--- /dev/null
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoalTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
+import org.apache.fluss.exception.RebalanceFailureException;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static 
org.apache.fluss.server.coordinator.rebalance.RebalanceTestUtils.addBucket;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link RackAwareGoal}. */
+public class RackAwareGoalTest {
+
+    @Test
+    void testReplicaNumExceedsRackNum() {
+        SortedSet<ServerModel> servers = new TreeSet<>();
+        servers.add(new ServerModel(0, "rack0", false));
+        servers.add(new ServerModel(1, "rack1", false));
+        // server2 offline.
+        servers.add(new ServerModel(2, "rack2", true));
+        ClusterModel clusterModel = new ClusterModel(servers);
+        TableBucket t1b0 = new TableBucket(1, 0);
+        addBucket(clusterModel, t1b0, Arrays.asList(0, 1, 2));
+
+        RackAwareGoal goal = new RackAwareGoal();
+        assertThatThrownBy(() -> goal.optimize(clusterModel, 
Collections.singleton(goal)))
+                .isInstanceOf(RebalanceFailureException.class)
+                .hasMessage(
+                        "[RackAwareGoal] Insufficient number of racks to 
distribute each replica (Current: 2, Needed: 3).");
+    }
+
+    @Test
+    void testReplicaMove() {
+        SortedSet<ServerModel> servers = new TreeSet<>();
+        servers.add(new ServerModel(0, "rack0", false));
+        servers.add(new ServerModel(1, "rack1", false));
+        servers.add(new ServerModel(2, "rack2", false));
+        servers.add(new ServerModel(3, "rack0", false));
+        ClusterModel clusterModel = new ClusterModel(servers);
+
+        TableBucket t1b0 = new TableBucket(1, 0);
+        addBucket(clusterModel, t1b0, Arrays.asList(0, 1, 3));
+
+        // check the follower will be moved to server2.
+        RackAwareGoal goal = new RackAwareGoal();
+        GoalOptimizer goalOptimizer = new GoalOptimizer();
+        List<RebalancePlanForBucket> rebalancePlanForBuckets =
+                goalOptimizer.doOptimizeOnce(clusterModel, 
Collections.singletonList(goal));
+        assertThat(rebalancePlanForBuckets).hasSize(1);
+        assertThat(rebalancePlanForBuckets.get(0))
+                .isEqualTo(
+                        new RebalancePlanForBucket(
+                                t1b0, 0, 2, Arrays.asList(0, 1, 3), 
Arrays.asList(2, 1, 3)));
+    }
+
+    @Test
+    void testReplicaDistributionNotBalanceAcrossRackAndServer() {
+        // RackAwareGoal only requires that replicas of the same bucket cannot 
be distributed on
+        // the same rack, but it does not care about the balance of replicas 
between racks, nor does
+        // it care about the balance of replicas between servers.
+        ClusterModel clusterModel = 
generateUnbalancedReplicaAcrossServerAndRack();
+        RackAwareGoal goal = new RackAwareGoal();
+        GoalOptimizer goalOptimizer = new GoalOptimizer();
+        List<RebalancePlanForBucket> rebalancePlanForBuckets =
+                goalOptimizer.doOptimizeOnce(clusterModel, 
Collections.singletonList(goal));
+        assertThat(rebalancePlanForBuckets).hasSize(0);
+    }
+
+    @Test
+    void testReplicaDistributionBalanceAcrossServer() {
+        // the same input of 
`testReplicaDistributionNotBalanceAcrossRackAndServer`, if we combine
+        // using RackAwareGoal and ReplicaDistributionGoal, the replica 
distribution will be
+        // balanced across servers.
+        ClusterModel clusterModel = 
generateUnbalancedReplicaAcrossServerAndRack();
+        RackAwareGoal rackAwareGoal = new RackAwareGoal();
+        ReplicaDistributionGoal replicaDistributionGoal = new 
TestReplicaDistributionGoal();
+        GoalOptimizer goalOptimizer = new GoalOptimizer();
+        List<RebalancePlanForBucket> rebalancePlanForBuckets =
+                goalOptimizer.doOptimizeOnce(
+                        clusterModel, Arrays.asList(rackAwareGoal, 
replicaDistributionGoal));
+        // Realance result(ReplicaNum) from server side are all 2.
+        assertThat(rebalancePlanForBuckets).hasSize(2);
+        assertThat(rebalancePlanForBuckets.get(0))
+                .isEqualTo(
+                        new RebalancePlanForBucket(
+                                new TableBucket(1, 3),
+                                0,
+                                1,
+                                Arrays.asList(0, 3, 5),
+                                Arrays.asList(1, 3, 5)));
+        assertThat(rebalancePlanForBuckets.get(1))
+                .isEqualTo(
+                        new RebalancePlanForBucket(
+                                new TableBucket(1, 1),
+                                0,
+                                1,
+                                Arrays.asList(0, 2, 5),
+                                Arrays.asList(1, 3, 5)));
+    }
+
+    @Test
+    void testMoveActionWithSameRackWillNotBeAccepted() {
+        SortedSet<ServerModel> servers = new TreeSet<>();
+        servers.add(new ServerModel(0, "rack0", false));
+        servers.add(new ServerModel(1, "rack1", false));
+        servers.add(new ServerModel(2, "rack2", false));
+        ServerModel server3 = new ServerModel(3, "rack0", false);
+        servers.add(server3);
+        ServerModel server4 = new ServerModel(4, "rack3", false);
+        servers.add(server4);
+        ClusterModel clusterModel = new ClusterModel(servers);
+        TableBucket t1b0 = new TableBucket(1, 0);
+        addBucket(clusterModel, t1b0, Arrays.asList(0, 1, 2));
+
+        RackAwareGoal rackAwareGoal = new RackAwareGoal();
+        ReplicaModel sourceReplica = new ReplicaModel(t1b0, 
clusterModel.server(2), false);
+
+        // server3 is in the same rack with leader replica in t1b0 bucket 
model, so the move action
+        // will be rejected.
+        assertThat(
+                        rackAwareGoal.doesReplicaMoveViolateActionAcceptance(
+                                clusterModel, sourceReplica, server3))
+                .isTrue();
+
+        // server4 is in the different rack with all the replicas in t1b0 
bucket model, so the move
+        // action will be accepted.
+        assertThat(
+                        rackAwareGoal.doesReplicaMoveViolateActionAcceptance(
+                                clusterModel, sourceReplica, server4))
+                .isFalse();
+    }
+
+    private ClusterModel generateUnbalancedReplicaAcrossServerAndRack() {
+        SortedSet<ServerModel> servers = new TreeSet<>();
+        servers.add(new ServerModel(0, "rack0", false));
+        servers.add(new ServerModel(1, "rack0", false));
+        servers.add(new ServerModel(2, "rack1", false));
+        servers.add(new ServerModel(3, "rack1", false));
+        servers.add(new ServerModel(4, "rack2", false));
+        servers.add(new ServerModel(5, "rack3", false));
+        ClusterModel clusterModel = new ClusterModel(servers);
+
+        // For the following case, RackAwareGoal will not remove any replicas 
but the replica
+        // distribution is not balanced not only in racks but also in servers.
+        // t1b0 -> 0, 2, 4
+        // t1b1 -> 0, 2, 5
+        // t1b2 -> 0, 2, 4
+        // t1b3 -> 0, 3, 5
+
+        // Replica num from server side: server0: 4, server1: 0, server2: 3, 
server3: 1, server4: 1,
+        // server5: 1
+        // Replica num from rack side: rack0: 4, rack1: 4, rack2: 2, rack3: 2
+        TableBucket t1b0 = new TableBucket(1, 0);
+        addBucket(clusterModel, t1b0, Arrays.asList(0, 2, 4));
+        TableBucket t1b1 = new TableBucket(1, 1);
+        addBucket(clusterModel, t1b1, Arrays.asList(0, 2, 5));
+        TableBucket t1b2 = new TableBucket(1, 2);
+        addBucket(clusterModel, t1b2, Arrays.asList(0, 2, 4));
+        TableBucket t1b3 = new TableBucket(1, 3);
+        addBucket(clusterModel, t1b3, Arrays.asList(0, 3, 5));
+        return clusterModel;
+    }
+
+    private static final class TestReplicaDistributionGoal extends 
ReplicaDistributionGoal {
+
+        @Override
+        protected double balancePercentage() {
+            return 1.0d;
+        }
+    }
+}
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelTest.java
index d375923c9..1908c94b7 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelTest.java
@@ -145,4 +145,19 @@ public class ClusterModelTest {
                 .hasMessageContaining(
                         "Requested replica 1 is not a replica of bucket 
TableBucket{tableId=1, bucket=0}");
     }
+
+    @Test
+    void testMaxReplicaFactor() {
+        ClusterModel clusterModel = new ClusterModel(servers);
+        assertThat(clusterModel.maxReplicationFactor()).isEqualTo(1);
+
+        clusterModel.createReplica(0, new TableBucket(1, 0), 0, true);
+        assertThat(clusterModel.maxReplicationFactor()).isEqualTo(1);
+        clusterModel.createReplica(1, new TableBucket(1, 0), 1, false);
+        assertThat(clusterModel.maxReplicationFactor()).isEqualTo(2);
+        clusterModel.createReplica(2, new TableBucket(1, 0), 2, false);
+        assertThat(clusterModel.maxReplicationFactor()).isEqualTo(3);
+        clusterModel.createReplica(0, new TableBucket(2, 0L, 0), 0, true);
+        assertThat(clusterModel.maxReplicationFactor()).isEqualTo(3);
+    }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java
index edaff4b76..3b895fd19 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java
@@ -81,13 +81,13 @@ public class ServerModelTest {
     void testToString() {
         ServerModel serverModel = new ServerModel(0, "rack0", true);
         assertThat(serverModel.toString())
-                
.isEqualTo("ServerModel[id=0,rack=rack0,isAlive=true,replicaCount=0]");
+                
.isEqualTo("ServerModel[id=0,rack=rack0,isOfflineTagged=true,replicaCount=0]");
 
         serverModel.putReplica(
                 new TableBucket(1L, 0),
                 new ReplicaModel(new TableBucket(1L, 0), serverModel, false));
         assertThat(serverModel.toString())
-                
.isEqualTo("ServerModel[id=0,rack=rack0,isAlive=true,replicaCount=1]");
+                
.isEqualTo("ServerModel[id=0,rack=rack0,isOfflineTagged=true,replicaCount=1]");
     }
 
     @Test
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
index 43b12943d..2f2ee3735 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
@@ -103,6 +103,7 @@ import static 
org.apache.fluss.server.zk.ZooKeeperTestUtils.createZooKeeperClien
 import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
 import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
 import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
+import static org.apache.fluss.utils.Preconditions.checkArgument;
 import static org.apache.fluss.utils.function.FunctionUtils.uncheckedFunction;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
@@ -134,6 +135,7 @@ public final class FlussClusterExtension
     private final Map<Integer, ServerInfo> tabletServerInfos;
     private final Configuration clusterConf;
     private final Clock clock;
+    private final String[] racks;
 
     /** Creates a new {@link Builder} for {@link FlussClusterExtension}. */
     public static Builder builder() {
@@ -145,7 +147,8 @@ public final class FlussClusterExtension
             String coordinatorServerListeners,
             String tabletServerListeners,
             Configuration clusterConf,
-            Clock clock) {
+            Clock clock,
+            String[] racks) {
         this.initialNumOfTabletServers = numOfTabletServers;
         this.tabletServers = new HashMap<>(numOfTabletServers);
         this.coordinatorServerListeners = coordinatorServerListeners;
@@ -153,6 +156,10 @@ public final class FlussClusterExtension
         this.tabletServerInfos = new HashMap<>();
         this.clusterConf = clusterConf;
         this.clock = clock;
+        checkArgument(
+                racks != null && racks.length == numOfTabletServers,
+                "racks must be not null and have the same length as 
numOfTabletServers");
+        this.racks = racks;
     }
 
     @Override
@@ -310,10 +317,17 @@ public final class FlussClusterExtension
 
     private void startTabletServer(int serverId, @Nullable Configuration 
overwriteConfig)
             throws Exception {
+        String rackName;
+        if (racks.length <= serverId) {
+            rackName = "rack-" + serverId;
+        } else {
+            rackName = racks[serverId];
+        }
+
         String dataDir = getDataDir(serverId);
         Configuration tabletServerConf = new Configuration(clusterConf);
         tabletServerConf.set(ConfigOptions.TABLET_SERVER_ID, serverId);
-        tabletServerConf.set(ConfigOptions.TABLET_SERVER_RACK, "rack" + 
serverId);
+        tabletServerConf.set(ConfigOptions.TABLET_SERVER_RACK, rackName);
         tabletServerConf.set(ConfigOptions.DATA_DIR, dataDir);
         tabletServerConf.setString(
                 ConfigOptions.ZOOKEEPER_ADDRESS, 
zooKeeperServer.getConnectString());
@@ -329,7 +343,7 @@ public final class FlussClusterExtension
         ServerInfo serverInfo =
                 new ServerInfo(
                         serverId,
-                        "rack" + serverId,
+                        rackName,
                         tabletServer.getRpcServer().getBindEndpoints(),
                         ServerType.TABLET_SERVER);
 
@@ -932,6 +946,7 @@ public final class FlussClusterExtension
         private String tabletServerListeners = DEFAULT_LISTENERS;
         private String coordinatorServerListeners = DEFAULT_LISTENERS;
         private Clock clock = SystemClock.getInstance();
+        private String[] racks = new String[] {"rack-0"};
 
         private final Configuration clusterConf = new Configuration();
 
@@ -971,13 +986,28 @@ public final class FlussClusterExtension
             return this;
         }
 
+        /** Sets the racks of tablet servers. */
+        public Builder setRacks(String[] racks) {
+            this.racks = racks;
+            return this;
+        }
+
         public FlussClusterExtension build() {
+            if (numOfTabletServers > 1 && racks.length == 1) {
+                String[] racks = new String[numOfTabletServers];
+                for (int i = 0; i < numOfTabletServers; i++) {
+                    racks[i] = "rack-" + i;
+                }
+                this.racks = racks;
+            }
+
             return new FlussClusterExtension(
                     numOfTabletServers,
                     coordinatorServerListeners,
                     tabletServerListeners,
                     clusterConf,
-                    clock);
+                    clock,
+                    racks);
         }
     }
 }


Reply via email to