wuchong commented on code in PR #2380: URL: https://github.com/apache/fluss/pull/2380#discussion_r2740118340
########## fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoalTest.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + +import static org.apache.fluss.server.coordinator.rebalance.RebalanceTestUtils.addBucket; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link RackAwareGoal}. */ +public class RackAwareGoalTest { + + @Test + void testReplicaNumExceedsRackNum() { + SortedSet<ServerModel> servers = new TreeSet<>(); + servers.add(new ServerModel(0, "rack0", false)); + servers.add(new ServerModel(1, "rack1", false)); + // server2 offline. + servers.add(new ServerModel(2, "rack2", true)); + ClusterModel clusterModel = new ClusterModel(servers); + TableBucket t1b0 = new TableBucket(1, 0); + addBucket(clusterModel, t1b0, Arrays.asList(0, 1, 2)); + + RackAwareGoal goal = new RackAwareGoal(); + assertThatThrownBy(() -> goal.optimize(clusterModel, Collections.singleton(goal))) + .isInstanceOf(RebalanceFailureException.class) + .hasMessage( + "[RackAwareGoal] Insufficient number of racks to distribute each replica (Current: 2, Needed: 3)."); + } + + @Test + void testReplicaMove() { + SortedSet<ServerModel> servers = new TreeSet<>(); + servers.add(new ServerModel(0, "rack0", false)); + servers.add(new ServerModel(1, "rack1", false)); + servers.add(new ServerModel(2, "rack2", false)); + servers.add(new ServerModel(3, "rack0", false)); + ClusterModel clusterModel = new ClusterModel(servers); + + TableBucket t1b0 = new TableBucket(1, 0); + addBucket(clusterModel, t1b0, Arrays.asList(0, 1, 3)); + + // check the follower will be moved to server2. + RackAwareGoal goal = new RackAwareGoal(); + GoalOptimizer goalOptimizer = new GoalOptimizer(); + List<RebalancePlanForBucket> rebalancePlanForBuckets = + goalOptimizer.doOptimizeOnce(clusterModel, Collections.singletonList(goal)); + assertThat(rebalancePlanForBuckets).hasSize(1); + assertThat(rebalancePlanForBuckets.get(0)) + .isEqualTo( + new RebalancePlanForBucket( + t1b0, 0, 2, Arrays.asList(0, 1, 3), Arrays.asList(2, 1, 3))); + } + + @Test + void testReplicaDistributionNotBalanceAcrossRackAndServer() { + // RackAwareGoal only requires that replicas of the same bucket cannot be distributed on + // the same rack, but it does not care about the balance of replicas between racks, nor does + // it care about the balance of replicas between servers. + ClusterModel clusterModel = generateUnbalancedReplicaAcrossServerAndRack(); + RackAwareGoal goal = new RackAwareGoal(); + GoalOptimizer goalOptimizer = new GoalOptimizer(); + List<RebalancePlanForBucket> rebalancePlanForBuckets = + goalOptimizer.doOptimizeOnce(clusterModel, Collections.singletonList(goal)); + assertThat(rebalancePlanForBuckets).hasSize(0); + } + + @Test + void testReplicaDistributionBalanceAcrossServer() { + // the same input of `testReplicaDistributionNotBalanceAcrossRackAndServer`, if we combine + // using RackAwareGoal and ReplicaDistributionGoal, the replica distribution will be + // balanced across servers. + ClusterModel clusterModel = generateUnbalancedReplicaAcrossServerAndRack(); + RackAwareGoal rackAwareGoal = new RackAwareGoal(); + ReplicaDistributionGoal replicaDistributionGoal = new ReplicaDistributionGoal(); + GoalOptimizer goalOptimizer = new GoalOptimizer(); + List<RebalancePlanForBucket> rebalancePlanForBuckets = + goalOptimizer.doOptimizeOnce( + clusterModel, Arrays.asList(rackAwareGoal, replicaDistributionGoal)); + // Realance result(ReplicaNum) from server side: server0: 3, server1: 1, server2: 3, + // server3: 1, server4: 1, server5: 1 Review Comment: The rebalance result should be evenly distributed and rack-aware, but the current result is not. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoal.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.server.coordinator.rebalance.model.BucketModel; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.RackModel; +import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; + +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** + * Generate replica movement proposals to provide rack-aware replica distribution, which ensure that + * all replicas of each bucket are assigned in a rack aware manner -- i.e. no more than one replica + * of each bucket resides in the same rack. + */ +public class RackAwareGoal extends RackAwareAbstractGoal { + + @Override + protected boolean doesReplicaMoveViolateActionAcceptance( + ClusterModel clusterModel, ReplicaModel sourceReplica, ServerModel destServer) { + // Destination server cannot be in a rack that violates rack awareness. + BucketModel bucket = clusterModel.bucket(sourceReplica.tableBucket()); + checkNotNull(bucket, "Bucket for replica " + sourceReplica + " is not found"); + Set<ServerModel> bucketServers = bucket.bucketServers(); + bucketServers.remove(sourceReplica.server()); + + // If destination server exists on any of the rack of other replicas, it violates the + // rack-awareness + return bucketServers.stream().map(ServerModel::rack).anyMatch(destServer.rack()::equals); + } + + /** + * This is a hard goal; hence, the proposals are not limited to dead server replicas in case of + * self-healing. Sanity Check: There exists sufficient number of racks for achieving + * rack-awareness. + * + * @param clusterModel The state of the cluster. + */ + @Override + protected void initGoalState(ClusterModel clusterModel) throws RebalanceFailureException { + // Sanity Check: not enough racks to satisfy rack awareness. + // Assumes number of racks doesn't exceed Integer.MAX_VALUE. + int numAvailableRacks = + (int) + clusterModel.racksContainServerWithoutOfflineTag().stream() + .map(RackModel::rack) + .distinct() + .count(); + if (clusterModel.maxReplicationFactor() > numAvailableRacks) { + throw new RebalanceFailureException( + String.format( + "[%s] Insufficient number of racks to distribute each replica (Current: %d, Needed: %d).", + name(), numAvailableRacks, clusterModel.maxReplicationFactor())); + } + } + + /** + * Update goal state. Sanity check: After completion of balancing, confirm that replicas of each + * bucket reside at a separate rack. + * + * @param clusterModel The state of the cluster. + */ + @Override + protected void updateGoalState(ClusterModel clusterModel) throws RebalanceFailureException { + // One pass is sufficient to satisfy or alert impossibility of this goal. + // Sanity check to confirm that the final distribution is rack aware. + ensureRackAware(clusterModel); + finish(); + } + + /** + * Rack-awareness violations can be resolved with replica movements. + * + * @param server Server to be rebalanced. + * @param clusterModel The state of the cluster. + * @param optimizedGoals Optimized goals. + */ + @Override + protected void rebalanceForServer( + ServerModel server, ClusterModel clusterModel, Set<Goal> optimizedGoals) + throws RebalanceFailureException { + rebalanceForServer(server, clusterModel, optimizedGoals, true); + } + + /** + * Get a list of rack aware eligible servers for the given replica in the given cluster. A + * server is rack aware eligible for a given replica if the server resides in a rack where no + * other server in the same rack contains a replica from the same bucket of the given replica. + * + * @param replica Replica for which a set of rack aware eligible servers are requested. + * @param clusterModel The state of the cluster. + * @return A list of rack aware eligible servers for the given replica in the given cluster. + */ + @Override + protected SortedSet<ServerModel> rackAwareEligibleServers( + ReplicaModel replica, ClusterModel clusterModel) { + // Populate bucket rackIds. + BucketModel bucket = clusterModel.bucket(replica.tableBucket()); + checkNotNull(bucket, "Bucket for replica " + replica + " is not found"); + List<String> bucketRackIds = + bucket.bucketServers().stream().map(ServerModel::rack).collect(Collectors.toList()); + + // Remove rackId of the given replica, but if there is any other replica from the bucket + // residing in the same cluster, keep its rackId in the list. + bucketRackIds.remove(replica.server().rack()); + + SortedSet<ServerModel> rackAwareEligibleServers = + new TreeSet<>(Comparator.comparingInt(ServerModel::id)); + for (ServerModel server : clusterModel.aliveServers()) { + if (!bucketRackIds.contains(server.rack())) { + rackAwareEligibleServers.add(server); + } + } + // Return eligible servers. + return rackAwareEligibleServers; + } + + @Override + protected boolean shouldKeepInTheCurrentServer( + ReplicaModel replica, ClusterModel clusterModel) { + // Rack awareness requires no more than one replica from a given bucket residing in any + // rack in the cluster + String myRackId = replica.server().rack(); + int myServerId = replica.serverId(); + BucketModel bucket = clusterModel.bucket(replica.tableBucket()); + checkNotNull(bucket, "Bucket for replica " + replica + " is not found"); + for (ServerModel bucketServer : bucket.bucketServers()) { + if (myRackId.equals(bucketServer.rack()) && myServerId != bucketServer.id()) { + return false; + } + } + return true; + } + + private void ensureRackAware(ClusterModel clusterModel) throws RebalanceFailureException { + for (ReplicaModel leader : clusterModel.leaderReplicas()) { + Set<String> replicaServersRackIds = new HashSet<>(); + BucketModel bucket = clusterModel.bucket(leader.tableBucket()); + checkNotNull(bucket, "Bucket for replica " + leader + " is not found"); + Set<ServerModel> followerServers = new HashSet<>(bucket.followerServers()); + + // Add rackId of replicas. + for (ServerModel followerServer : followerServers) { + replicaServersRackIds.add(followerServer.rack()); + } + replicaServersRackIds.add(leader.server().rack()); + if (replicaServersRackIds.size() != (followerServers.size() + 1)) { + throw new RebalanceFailureException( + String.format( + "[%s] Bucket %s is not rack-aware. Leader (%s) and follower server (%s).", Review Comment: Improve the error message? We should at least print the server id and rack, because this is a rack unaware error. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoal.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.server.coordinator.rebalance.model.BucketModel; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.RackModel; +import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; + +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** + * Generate replica movement proposals to provide rack-aware replica distribution, which ensure that + * all replicas of each bucket are assigned in a rack aware manner -- i.e. no more than one replica + * of each bucket resides in the same rack. + */ +public class RackAwareGoal extends RackAwareAbstractGoal { + + @Override + protected boolean doesReplicaMoveViolateActionAcceptance( + ClusterModel clusterModel, ReplicaModel sourceReplica, ServerModel destServer) { + // Destination server cannot be in a rack that violates rack awareness. + BucketModel bucket = clusterModel.bucket(sourceReplica.tableBucket()); + checkNotNull(bucket, "Bucket for replica " + sourceReplica + " is not found"); + Set<ServerModel> bucketServers = bucket.bucketServers(); + bucketServers.remove(sourceReplica.server()); + + // If destination server exists on any of the rack of other replicas, it violates the + // rack-awareness + return bucketServers.stream().map(ServerModel::rack).anyMatch(destServer.rack()::equals); + } + + /** + * This is a hard goal; hence, the proposals are not limited to dead server replicas in case of + * self-healing. Sanity Check: There exists sufficient number of racks for achieving + * rack-awareness. + * + * @param clusterModel The state of the cluster. + */ + @Override + protected void initGoalState(ClusterModel clusterModel) throws RebalanceFailureException { + // Sanity Check: not enough racks to satisfy rack awareness. + // Assumes number of racks doesn't exceed Integer.MAX_VALUE. + int numAvailableRacks = + (int) + clusterModel.racksContainServerWithoutOfflineTag().stream() + .map(RackModel::rack) + .distinct() + .count(); + if (clusterModel.maxReplicationFactor() > numAvailableRacks) { + throw new RebalanceFailureException( + String.format( + "[%s] Insufficient number of racks to distribute each replica (Current: %d, Needed: %d).", + name(), numAvailableRacks, clusterModel.maxReplicationFactor())); + } + } + + /** + * Update goal state. Sanity check: After completion of balancing, confirm that replicas of each + * bucket reside at a separate rack. + * + * @param clusterModel The state of the cluster. + */ + @Override + protected void updateGoalState(ClusterModel clusterModel) throws RebalanceFailureException { + // One pass is sufficient to satisfy or alert impossibility of this goal. + // Sanity check to confirm that the final distribution is rack aware. + ensureRackAware(clusterModel); + finish(); + } + + /** + * Rack-awareness violations can be resolved with replica movements. + * + * @param server Server to be rebalanced. + * @param clusterModel The state of the cluster. + * @param optimizedGoals Optimized goals. + */ + @Override + protected void rebalanceForServer( + ServerModel server, ClusterModel clusterModel, Set<Goal> optimizedGoals) + throws RebalanceFailureException { + rebalanceForServer(server, clusterModel, optimizedGoals, true); + } + + /** + * Get a list of rack aware eligible servers for the given replica in the given cluster. A + * server is rack aware eligible for a given replica if the server resides in a rack where no + * other server in the same rack contains a replica from the same bucket of the given replica. + * + * @param replica Replica for which a set of rack aware eligible servers are requested. + * @param clusterModel The state of the cluster. + * @return A list of rack aware eligible servers for the given replica in the given cluster. + */ + @Override + protected SortedSet<ServerModel> rackAwareEligibleServers( + ReplicaModel replica, ClusterModel clusterModel) { + // Populate bucket rackIds. + BucketModel bucket = clusterModel.bucket(replica.tableBucket()); + checkNotNull(bucket, "Bucket for replica " + replica + " is not found"); + List<String> bucketRackIds = + bucket.bucketServers().stream().map(ServerModel::rack).collect(Collectors.toList()); + + // Remove rackId of the given replica, but if there is any other replica from the bucket + // residing in the same cluster, keep its rackId in the list. + bucketRackIds.remove(replica.server().rack()); + + SortedSet<ServerModel> rackAwareEligibleServers = + new TreeSet<>(Comparator.comparingInt(ServerModel::id)); + for (ServerModel server : clusterModel.aliveServers()) { + if (!bucketRackIds.contains(server.rack())) { + rackAwareEligibleServers.add(server); + } + } + // Return eligible servers. + return rackAwareEligibleServers; + } + + @Override + protected boolean shouldKeepInTheCurrentServer( + ReplicaModel replica, ClusterModel clusterModel) { + // Rack awareness requires no more than one replica from a given bucket residing in any + // rack in the cluster + String myRackId = replica.server().rack(); + int myServerId = replica.serverId(); + BucketModel bucket = clusterModel.bucket(replica.tableBucket()); + checkNotNull(bucket, "Bucket for replica " + replica + " is not found"); + for (ServerModel bucketServer : bucket.bucketServers()) { + if (myRackId.equals(bucketServer.rack()) && myServerId != bucketServer.id()) { + return false; + } + } + return true; + } + + private void ensureRackAware(ClusterModel clusterModel) throws RebalanceFailureException { + for (ReplicaModel leader : clusterModel.leaderReplicas()) { + Set<String> replicaServersRackIds = new HashSet<>(); + BucketModel bucket = clusterModel.bucket(leader.tableBucket()); + checkNotNull(bucket, "Bucket for replica " + leader + " is not found"); + Set<ServerModel> followerServers = new HashSet<>(bucket.followerServers()); Review Comment: There is no need to convert into `Set`? If replica is assigned to the same server, the `followerServers` will be smaller than exepcted. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareDistributionGoal.java: ########## @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.server.coordinator.rebalance.model.BucketModel; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; + +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Stream; + +import static java.util.Collections.max; +import static java.util.Collections.min; +import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.aliveServers; +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** + * Generate replica movement proposals to evenly distribute replicas over alive racks not excluded + * for replica moves. + * + * <p>This is a relaxed version of {@link RackAwareGoal}. Contrary to {@link RackAwareGoal}, as long + * as replicas of each bucket can achieve a perfectly even distribution across the racks, this goal + * lets placement of multiple replicas of a bucket into a single rack. + * + * <p>For example, suppose a table with 1 bucket has 4 replicas in a cluster with 2 racks. Then the + * following distribution will be acceptable by this goal (but would be unacceptable by {@link + * RackAwareGoal}): + * + * <pre> + * Rack A | Rack B + * ----------------------------------------------------- + * Server1-rack A replica-0 | Server2-rack B + * Server3-rack A | Server4-rack B replica-1 + * Server5-rack A replica-2 | Server6-rack B + * Server7-rack A | Server8-rack B replica-3 + * </pre> + * + * <p>However, this goal will yield an {@link RebalanceFailureException} for the same bucket in the + * following cluster due to the lack of a second server to place a replica of this bucket in {@code + * Rack B}: + * + * <pre> + * Rack A | Rack B + * ----------------------------------------------------- + * Server1-rack A replica-0 | Server2-rack B replica-1 + * Server3-rack A replica-3 | + * Server5-rack A replica-2 | + * </pre> + */ +public class RackAwareDistributionGoal extends RackAwareAbstractGoal { Review Comment: We don't need this for now. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoal.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.server.coordinator.rebalance.model.BucketModel; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.RackModel; +import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; + +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** + * Generate replica movement proposals to provide rack-aware replica distribution, which ensure that + * all replicas of each bucket are assigned in a rack aware manner -- i.e. no more than one replica + * of each bucket resides in the same rack. + */ +public class RackAwareGoal extends RackAwareAbstractGoal { + + @Override + protected boolean doesReplicaMoveViolateActionAcceptance( + ClusterModel clusterModel, ReplicaModel sourceReplica, ServerModel destServer) { + // Destination server cannot be in a rack that violates rack awareness. + BucketModel bucket = clusterModel.bucket(sourceReplica.tableBucket()); + checkNotNull(bucket, "Bucket for replica " + sourceReplica + " is not found"); + Set<ServerModel> bucketServers = bucket.bucketServers(); + bucketServers.remove(sourceReplica.server()); + + // If destination server exists on any of the rack of other replicas, it violates the + // rack-awareness + return bucketServers.stream().map(ServerModel::rack).anyMatch(destServer.rack()::equals); + } + + /** + * This is a hard goal; hence, the proposals are not limited to dead server replicas in case of + * self-healing. Sanity Check: There exists sufficient number of racks for achieving + * rack-awareness. + * + * @param clusterModel The state of the cluster. + */ + @Override + protected void initGoalState(ClusterModel clusterModel) throws RebalanceFailureException { + // Sanity Check: not enough racks to satisfy rack awareness. + // Assumes number of racks doesn't exceed Integer.MAX_VALUE. + int numAvailableRacks = + (int) + clusterModel.racksContainServerWithoutOfflineTag().stream() + .map(RackModel::rack) + .distinct() + .count(); + if (clusterModel.maxReplicationFactor() > numAvailableRacks) { + throw new RebalanceFailureException( + String.format( + "[%s] Insufficient number of racks to distribute each replica (Current: %d, Needed: %d).", + name(), numAvailableRacks, clusterModel.maxReplicationFactor())); + } + } + + /** + * Update goal state. Sanity check: After completion of balancing, confirm that replicas of each + * bucket reside at a separate rack. + * + * @param clusterModel The state of the cluster. + */ + @Override + protected void updateGoalState(ClusterModel clusterModel) throws RebalanceFailureException { + // One pass is sufficient to satisfy or alert impossibility of this goal. + // Sanity check to confirm that the final distribution is rack aware. + ensureRackAware(clusterModel); + finish(); + } + + /** + * Rack-awareness violations can be resolved with replica movements. + * + * @param server Server to be rebalanced. + * @param clusterModel The state of the cluster. + * @param optimizedGoals Optimized goals. + */ + @Override + protected void rebalanceForServer( + ServerModel server, ClusterModel clusterModel, Set<Goal> optimizedGoals) + throws RebalanceFailureException { + rebalanceForServer(server, clusterModel, optimizedGoals, true); Review Comment: We should just move replicas when the replica is not rack aware. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
