wuchong commented on code in PR #2380:
URL: https://github.com/apache/fluss/pull/2380#discussion_r2740118340


##########
fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoalTest.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
+import org.apache.fluss.exception.RebalanceFailureException;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static 
org.apache.fluss.server.coordinator.rebalance.RebalanceTestUtils.addBucket;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link RackAwareGoal}. */
+public class RackAwareGoalTest {
+
+    @Test
+    void testReplicaNumExceedsRackNum() {
+        SortedSet<ServerModel> servers = new TreeSet<>();
+        servers.add(new ServerModel(0, "rack0", false));
+        servers.add(new ServerModel(1, "rack1", false));
+        // server2 offline.
+        servers.add(new ServerModel(2, "rack2", true));
+        ClusterModel clusterModel = new ClusterModel(servers);
+        TableBucket t1b0 = new TableBucket(1, 0);
+        addBucket(clusterModel, t1b0, Arrays.asList(0, 1, 2));
+
+        RackAwareGoal goal = new RackAwareGoal();
+        assertThatThrownBy(() -> goal.optimize(clusterModel, 
Collections.singleton(goal)))
+                .isInstanceOf(RebalanceFailureException.class)
+                .hasMessage(
+                        "[RackAwareGoal] Insufficient number of racks to 
distribute each replica (Current: 2, Needed: 3).");
+    }
+
+    @Test
+    void testReplicaMove() {
+        SortedSet<ServerModel> servers = new TreeSet<>();
+        servers.add(new ServerModel(0, "rack0", false));
+        servers.add(new ServerModel(1, "rack1", false));
+        servers.add(new ServerModel(2, "rack2", false));
+        servers.add(new ServerModel(3, "rack0", false));
+        ClusterModel clusterModel = new ClusterModel(servers);
+
+        TableBucket t1b0 = new TableBucket(1, 0);
+        addBucket(clusterModel, t1b0, Arrays.asList(0, 1, 3));
+
+        // check the follower will be moved to server2.
+        RackAwareGoal goal = new RackAwareGoal();
+        GoalOptimizer goalOptimizer = new GoalOptimizer();
+        List<RebalancePlanForBucket> rebalancePlanForBuckets =
+                goalOptimizer.doOptimizeOnce(clusterModel, 
Collections.singletonList(goal));
+        assertThat(rebalancePlanForBuckets).hasSize(1);
+        assertThat(rebalancePlanForBuckets.get(0))
+                .isEqualTo(
+                        new RebalancePlanForBucket(
+                                t1b0, 0, 2, Arrays.asList(0, 1, 3), 
Arrays.asList(2, 1, 3)));
+    }
+
+    @Test
+    void testReplicaDistributionNotBalanceAcrossRackAndServer() {
+        // RackAwareGoal only requires that replicas of the same bucket cannot 
be distributed on
+        // the same rack, but it does not care about the balance of replicas 
between racks, nor does
+        // it care about the balance of replicas between servers.
+        ClusterModel clusterModel = 
generateUnbalancedReplicaAcrossServerAndRack();
+        RackAwareGoal goal = new RackAwareGoal();
+        GoalOptimizer goalOptimizer = new GoalOptimizer();
+        List<RebalancePlanForBucket> rebalancePlanForBuckets =
+                goalOptimizer.doOptimizeOnce(clusterModel, 
Collections.singletonList(goal));
+        assertThat(rebalancePlanForBuckets).hasSize(0);
+    }
+
+    @Test
+    void testReplicaDistributionBalanceAcrossServer() {
+        // the same input of 
`testReplicaDistributionNotBalanceAcrossRackAndServer`, if we combine
+        // using RackAwareGoal and ReplicaDistributionGoal, the replica 
distribution will be
+        // balanced across servers.
+        ClusterModel clusterModel = 
generateUnbalancedReplicaAcrossServerAndRack();
+        RackAwareGoal rackAwareGoal = new RackAwareGoal();
+        ReplicaDistributionGoal replicaDistributionGoal = new 
ReplicaDistributionGoal();
+        GoalOptimizer goalOptimizer = new GoalOptimizer();
+        List<RebalancePlanForBucket> rebalancePlanForBuckets =
+                goalOptimizer.doOptimizeOnce(
+                        clusterModel, Arrays.asList(rackAwareGoal, 
replicaDistributionGoal));
+        // Realance result(ReplicaNum) from server side: server0: 3, server1: 
1, server2: 3,
+        // server3: 1, server4: 1, server5: 1

Review Comment:
   The rebalance result should be evenly distributed and rack-aware, but the 
current result is not.
   



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoal.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.exception.RebalanceFailureException;
+import org.apache.fluss.server.coordinator.rebalance.model.BucketModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.RackModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
+
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * Generate replica movement proposals to provide rack-aware replica 
distribution, which ensure that
+ * all replicas of each bucket are assigned in a rack aware manner -- i.e. no 
more than one replica
+ * of each bucket resides in the same rack.
+ */
+public class RackAwareGoal extends RackAwareAbstractGoal {
+
+    @Override
+    protected boolean doesReplicaMoveViolateActionAcceptance(
+            ClusterModel clusterModel, ReplicaModel sourceReplica, ServerModel 
destServer) {
+        // Destination server cannot be in a rack that violates rack awareness.
+        BucketModel bucket = clusterModel.bucket(sourceReplica.tableBucket());
+        checkNotNull(bucket, "Bucket for replica " + sourceReplica + " is not 
found");
+        Set<ServerModel> bucketServers = bucket.bucketServers();
+        bucketServers.remove(sourceReplica.server());
+
+        // If destination server exists on any of the rack of other replicas, 
it violates the
+        // rack-awareness
+        return 
bucketServers.stream().map(ServerModel::rack).anyMatch(destServer.rack()::equals);
+    }
+
+    /**
+     * This is a hard goal; hence, the proposals are not limited to dead 
server replicas in case of
+     * self-healing. Sanity Check: There exists sufficient number of racks for 
achieving
+     * rack-awareness.
+     *
+     * @param clusterModel The state of the cluster.
+     */
+    @Override
+    protected void initGoalState(ClusterModel clusterModel) throws 
RebalanceFailureException {
+        // Sanity Check: not enough racks to satisfy rack awareness.
+        // Assumes number of racks doesn't exceed Integer.MAX_VALUE.
+        int numAvailableRacks =
+                (int)
+                        
clusterModel.racksContainServerWithoutOfflineTag().stream()
+                                .map(RackModel::rack)
+                                .distinct()
+                                .count();
+        if (clusterModel.maxReplicationFactor() > numAvailableRacks) {
+            throw new RebalanceFailureException(
+                    String.format(
+                            "[%s] Insufficient number of racks to distribute 
each replica (Current: %d, Needed: %d).",
+                            name(), numAvailableRacks, 
clusterModel.maxReplicationFactor()));
+        }
+    }
+
+    /**
+     * Update goal state. Sanity check: After completion of balancing, confirm 
that replicas of each
+     * bucket reside at a separate rack.
+     *
+     * @param clusterModel The state of the cluster.
+     */
+    @Override
+    protected void updateGoalState(ClusterModel clusterModel) throws 
RebalanceFailureException {
+        // One pass is sufficient to satisfy or alert impossibility of this 
goal.
+        // Sanity check to confirm that the final distribution is rack aware.
+        ensureRackAware(clusterModel);
+        finish();
+    }
+
+    /**
+     * Rack-awareness violations can be resolved with replica movements.
+     *
+     * @param server Server to be rebalanced.
+     * @param clusterModel The state of the cluster.
+     * @param optimizedGoals Optimized goals.
+     */
+    @Override
+    protected void rebalanceForServer(
+            ServerModel server, ClusterModel clusterModel, Set<Goal> 
optimizedGoals)
+            throws RebalanceFailureException {
+        rebalanceForServer(server, clusterModel, optimizedGoals, true);
+    }
+
+    /**
+     * Get a list of rack aware eligible servers for the given replica in the 
given cluster. A
+     * server is rack aware eligible for a given replica if the server resides 
in a rack where no
+     * other server in the same rack contains a replica from the same bucket 
of the given replica.
+     *
+     * @param replica Replica for which a set of rack aware eligible servers 
are requested.
+     * @param clusterModel The state of the cluster.
+     * @return A list of rack aware eligible servers for the given replica in 
the given cluster.
+     */
+    @Override
+    protected SortedSet<ServerModel> rackAwareEligibleServers(
+            ReplicaModel replica, ClusterModel clusterModel) {
+        // Populate bucket rackIds.
+        BucketModel bucket = clusterModel.bucket(replica.tableBucket());
+        checkNotNull(bucket, "Bucket for replica " + replica + " is not 
found");
+        List<String> bucketRackIds =
+                
bucket.bucketServers().stream().map(ServerModel::rack).collect(Collectors.toList());
+
+        // Remove rackId of the given replica, but if there is any other 
replica from the bucket
+        // residing in the  same cluster, keep its rackId in the list.
+        bucketRackIds.remove(replica.server().rack());
+
+        SortedSet<ServerModel> rackAwareEligibleServers =
+                new TreeSet<>(Comparator.comparingInt(ServerModel::id));
+        for (ServerModel server : clusterModel.aliveServers()) {
+            if (!bucketRackIds.contains(server.rack())) {
+                rackAwareEligibleServers.add(server);
+            }
+        }
+        // Return eligible servers.
+        return rackAwareEligibleServers;
+    }
+
+    @Override
+    protected boolean shouldKeepInTheCurrentServer(
+            ReplicaModel replica, ClusterModel clusterModel) {
+        // Rack awareness requires no more than one replica from a given 
bucket residing in any
+        // rack in the cluster
+        String myRackId = replica.server().rack();
+        int myServerId = replica.serverId();
+        BucketModel bucket = clusterModel.bucket(replica.tableBucket());
+        checkNotNull(bucket, "Bucket for replica " + replica + " is not 
found");
+        for (ServerModel bucketServer : bucket.bucketServers()) {
+            if (myRackId.equals(bucketServer.rack()) && myServerId != 
bucketServer.id()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void ensureRackAware(ClusterModel clusterModel) throws 
RebalanceFailureException {
+        for (ReplicaModel leader : clusterModel.leaderReplicas()) {
+            Set<String> replicaServersRackIds = new HashSet<>();
+            BucketModel bucket = clusterModel.bucket(leader.tableBucket());
+            checkNotNull(bucket, "Bucket for replica " + leader + " is not 
found");
+            Set<ServerModel> followerServers = new 
HashSet<>(bucket.followerServers());
+
+            // Add rackId of replicas.
+            for (ServerModel followerServer : followerServers) {
+                replicaServersRackIds.add(followerServer.rack());
+            }
+            replicaServersRackIds.add(leader.server().rack());
+            if (replicaServersRackIds.size() != (followerServers.size() + 1)) {
+                throw new RebalanceFailureException(
+                        String.format(
+                                "[%s] Bucket %s is not rack-aware. Leader (%s) 
and follower server (%s).",

Review Comment:
   Improve the error message? We should at least print the server id and rack, 
because this is a rack unaware error. 



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoal.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.exception.RebalanceFailureException;
+import org.apache.fluss.server.coordinator.rebalance.model.BucketModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.RackModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
+
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * Generate replica movement proposals to provide rack-aware replica 
distribution, which ensure that
+ * all replicas of each bucket are assigned in a rack aware manner -- i.e. no 
more than one replica
+ * of each bucket resides in the same rack.
+ */
+public class RackAwareGoal extends RackAwareAbstractGoal {
+
+    @Override
+    protected boolean doesReplicaMoveViolateActionAcceptance(
+            ClusterModel clusterModel, ReplicaModel sourceReplica, ServerModel 
destServer) {
+        // Destination server cannot be in a rack that violates rack awareness.
+        BucketModel bucket = clusterModel.bucket(sourceReplica.tableBucket());
+        checkNotNull(bucket, "Bucket for replica " + sourceReplica + " is not 
found");
+        Set<ServerModel> bucketServers = bucket.bucketServers();
+        bucketServers.remove(sourceReplica.server());
+
+        // If destination server exists on any of the rack of other replicas, 
it violates the
+        // rack-awareness
+        return 
bucketServers.stream().map(ServerModel::rack).anyMatch(destServer.rack()::equals);
+    }
+
+    /**
+     * This is a hard goal; hence, the proposals are not limited to dead 
server replicas in case of
+     * self-healing. Sanity Check: There exists sufficient number of racks for 
achieving
+     * rack-awareness.
+     *
+     * @param clusterModel The state of the cluster.
+     */
+    @Override
+    protected void initGoalState(ClusterModel clusterModel) throws 
RebalanceFailureException {
+        // Sanity Check: not enough racks to satisfy rack awareness.
+        // Assumes number of racks doesn't exceed Integer.MAX_VALUE.
+        int numAvailableRacks =
+                (int)
+                        
clusterModel.racksContainServerWithoutOfflineTag().stream()
+                                .map(RackModel::rack)
+                                .distinct()
+                                .count();
+        if (clusterModel.maxReplicationFactor() > numAvailableRacks) {
+            throw new RebalanceFailureException(
+                    String.format(
+                            "[%s] Insufficient number of racks to distribute 
each replica (Current: %d, Needed: %d).",
+                            name(), numAvailableRacks, 
clusterModel.maxReplicationFactor()));
+        }
+    }
+
+    /**
+     * Update goal state. Sanity check: After completion of balancing, confirm 
that replicas of each
+     * bucket reside at a separate rack.
+     *
+     * @param clusterModel The state of the cluster.
+     */
+    @Override
+    protected void updateGoalState(ClusterModel clusterModel) throws 
RebalanceFailureException {
+        // One pass is sufficient to satisfy or alert impossibility of this 
goal.
+        // Sanity check to confirm that the final distribution is rack aware.
+        ensureRackAware(clusterModel);
+        finish();
+    }
+
+    /**
+     * Rack-awareness violations can be resolved with replica movements.
+     *
+     * @param server Server to be rebalanced.
+     * @param clusterModel The state of the cluster.
+     * @param optimizedGoals Optimized goals.
+     */
+    @Override
+    protected void rebalanceForServer(
+            ServerModel server, ClusterModel clusterModel, Set<Goal> 
optimizedGoals)
+            throws RebalanceFailureException {
+        rebalanceForServer(server, clusterModel, optimizedGoals, true);
+    }
+
+    /**
+     * Get a list of rack aware eligible servers for the given replica in the 
given cluster. A
+     * server is rack aware eligible for a given replica if the server resides 
in a rack where no
+     * other server in the same rack contains a replica from the same bucket 
of the given replica.
+     *
+     * @param replica Replica for which a set of rack aware eligible servers 
are requested.
+     * @param clusterModel The state of the cluster.
+     * @return A list of rack aware eligible servers for the given replica in 
the given cluster.
+     */
+    @Override
+    protected SortedSet<ServerModel> rackAwareEligibleServers(
+            ReplicaModel replica, ClusterModel clusterModel) {
+        // Populate bucket rackIds.
+        BucketModel bucket = clusterModel.bucket(replica.tableBucket());
+        checkNotNull(bucket, "Bucket for replica " + replica + " is not 
found");
+        List<String> bucketRackIds =
+                
bucket.bucketServers().stream().map(ServerModel::rack).collect(Collectors.toList());
+
+        // Remove rackId of the given replica, but if there is any other 
replica from the bucket
+        // residing in the  same cluster, keep its rackId in the list.
+        bucketRackIds.remove(replica.server().rack());
+
+        SortedSet<ServerModel> rackAwareEligibleServers =
+                new TreeSet<>(Comparator.comparingInt(ServerModel::id));
+        for (ServerModel server : clusterModel.aliveServers()) {
+            if (!bucketRackIds.contains(server.rack())) {
+                rackAwareEligibleServers.add(server);
+            }
+        }
+        // Return eligible servers.
+        return rackAwareEligibleServers;
+    }
+
+    @Override
+    protected boolean shouldKeepInTheCurrentServer(
+            ReplicaModel replica, ClusterModel clusterModel) {
+        // Rack awareness requires no more than one replica from a given 
bucket residing in any
+        // rack in the cluster
+        String myRackId = replica.server().rack();
+        int myServerId = replica.serverId();
+        BucketModel bucket = clusterModel.bucket(replica.tableBucket());
+        checkNotNull(bucket, "Bucket for replica " + replica + " is not 
found");
+        for (ServerModel bucketServer : bucket.bucketServers()) {
+            if (myRackId.equals(bucketServer.rack()) && myServerId != 
bucketServer.id()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void ensureRackAware(ClusterModel clusterModel) throws 
RebalanceFailureException {
+        for (ReplicaModel leader : clusterModel.leaderReplicas()) {
+            Set<String> replicaServersRackIds = new HashSet<>();
+            BucketModel bucket = clusterModel.bucket(leader.tableBucket());
+            checkNotNull(bucket, "Bucket for replica " + leader + " is not 
found");
+            Set<ServerModel> followerServers = new 
HashSet<>(bucket.followerServers());

Review Comment:
   There is no need to convert into `Set`? If replica is assigned to the same 
server, the `followerServers` will be smaller than exepcted. 



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareDistributionGoal.java:
##########
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.exception.RebalanceFailureException;
+import org.apache.fluss.server.coordinator.rebalance.model.BucketModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Stream;
+
+import static java.util.Collections.max;
+import static java.util.Collections.min;
+import static 
org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.aliveServers;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * Generate replica movement proposals to evenly distribute replicas over 
alive racks not excluded
+ * for replica moves.
+ *
+ * <p>This is a relaxed version of {@link RackAwareGoal}. Contrary to {@link 
RackAwareGoal}, as long
+ * as replicas of each bucket can achieve a perfectly even distribution across 
the racks, this goal
+ * lets placement of multiple replicas of a bucket into a single rack.
+ *
+ * <p>For example, suppose a table with 1 bucket has 4 replicas in a cluster 
with 2 racks. Then the
+ * following distribution will be acceptable by this goal (but would be 
unacceptable by {@link
+ * RackAwareGoal}):
+ *
+ * <pre>
+ * Rack A                    | Rack B
+ * -----------------------------------------------------
+ * Server1-rack A  replica-0 | Server2-rack B
+ * Server3-rack A            | Server4-rack B replica-1
+ * Server5-rack A  replica-2 | Server6-rack B
+ * Server7-rack A            | Server8-rack B replica-3
+ * </pre>
+ *
+ * <p>However, this goal will yield an {@link RebalanceFailureException} for 
the same bucket in the
+ * following cluster due to the lack of a second server to place a replica of 
this bucket in {@code
+ * Rack B}:
+ *
+ * <pre>
+ * Rack A                    | Rack B
+ * -----------------------------------------------------
+ * Server1-rack A  replica-0 | Server2-rack B replica-1
+ * Server3-rack A  replica-3 |
+ * Server5-rack A  replica-2 |
+ * </pre>
+ */
+public class RackAwareDistributionGoal extends RackAwareAbstractGoal {

Review Comment:
   We don't need this for now.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoal.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.exception.RebalanceFailureException;
+import org.apache.fluss.server.coordinator.rebalance.model.BucketModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.RackModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
+
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * Generate replica movement proposals to provide rack-aware replica 
distribution, which ensure that
+ * all replicas of each bucket are assigned in a rack aware manner -- i.e. no 
more than one replica
+ * of each bucket resides in the same rack.
+ */
+public class RackAwareGoal extends RackAwareAbstractGoal {
+
+    @Override
+    protected boolean doesReplicaMoveViolateActionAcceptance(
+            ClusterModel clusterModel, ReplicaModel sourceReplica, ServerModel 
destServer) {
+        // Destination server cannot be in a rack that violates rack awareness.
+        BucketModel bucket = clusterModel.bucket(sourceReplica.tableBucket());
+        checkNotNull(bucket, "Bucket for replica " + sourceReplica + " is not 
found");
+        Set<ServerModel> bucketServers = bucket.bucketServers();
+        bucketServers.remove(sourceReplica.server());
+
+        // If destination server exists on any of the rack of other replicas, 
it violates the
+        // rack-awareness
+        return 
bucketServers.stream().map(ServerModel::rack).anyMatch(destServer.rack()::equals);
+    }
+
+    /**
+     * This is a hard goal; hence, the proposals are not limited to dead 
server replicas in case of
+     * self-healing. Sanity Check: There exists sufficient number of racks for 
achieving
+     * rack-awareness.
+     *
+     * @param clusterModel The state of the cluster.
+     */
+    @Override
+    protected void initGoalState(ClusterModel clusterModel) throws 
RebalanceFailureException {
+        // Sanity Check: not enough racks to satisfy rack awareness.
+        // Assumes number of racks doesn't exceed Integer.MAX_VALUE.
+        int numAvailableRacks =
+                (int)
+                        
clusterModel.racksContainServerWithoutOfflineTag().stream()
+                                .map(RackModel::rack)
+                                .distinct()
+                                .count();
+        if (clusterModel.maxReplicationFactor() > numAvailableRacks) {
+            throw new RebalanceFailureException(
+                    String.format(
+                            "[%s] Insufficient number of racks to distribute 
each replica (Current: %d, Needed: %d).",
+                            name(), numAvailableRacks, 
clusterModel.maxReplicationFactor()));
+        }
+    }
+
+    /**
+     * Update goal state. Sanity check: After completion of balancing, confirm 
that replicas of each
+     * bucket reside at a separate rack.
+     *
+     * @param clusterModel The state of the cluster.
+     */
+    @Override
+    protected void updateGoalState(ClusterModel clusterModel) throws 
RebalanceFailureException {
+        // One pass is sufficient to satisfy or alert impossibility of this 
goal.
+        // Sanity check to confirm that the final distribution is rack aware.
+        ensureRackAware(clusterModel);
+        finish();
+    }
+
+    /**
+     * Rack-awareness violations can be resolved with replica movements.
+     *
+     * @param server Server to be rebalanced.
+     * @param clusterModel The state of the cluster.
+     * @param optimizedGoals Optimized goals.
+     */
+    @Override
+    protected void rebalanceForServer(
+            ServerModel server, ClusterModel clusterModel, Set<Goal> 
optimizedGoals)
+            throws RebalanceFailureException {
+        rebalanceForServer(server, clusterModel, optimizedGoals, true);

Review Comment:
   We should just move replicas when the replica is not rack aware. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to