This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 65e4f372d [server] Support RackAwareGoal for rebalance (#2380)
65e4f372d is described below
commit 65e4f372def087c0bb813a34d5bf8049ed5123e4
Author: yunhong <[email protected]>
AuthorDate: Sun Feb 1 15:42:30 2026 +0800
[server] Support RackAwareGoal for rebalance (#2380)
---
.../fluss/client/admin/RackAwareClusterITCase.java | 110 ++++++++++++
.../apache/fluss/cluster/rebalance/GoalType.java | 10 +-
.../coordinator/rebalance/goal/GoalUtils.java | 18 ++
.../rebalance/goal/RackAwareAbstractGoal.java | 160 +++++++++++++++++
.../coordinator/rebalance/goal/RackAwareGoal.java | 142 +++++++++++++++
.../rebalance/goal/ReplicaDistributionGoal.java | 6 +-
.../coordinator/rebalance/model/BucketModel.java | 11 ++
.../coordinator/rebalance/model/ClusterModel.java | 37 ++++
.../coordinator/rebalance/model/RackModel.java | 12 ++
.../coordinator/rebalance/model/ServerModel.java | 2 +-
.../rebalance/goal/RackAwareGoalTest.java | 199 +++++++++++++++++++++
.../rebalance/model/ClusterModelTest.java | 15 ++
.../rebalance/model/ServerModelTest.java | 4 +-
.../server/testutils/FlussClusterExtension.java | 38 +++-
14 files changed, 753 insertions(+), 11 deletions(-)
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/RackAwareClusterITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/RackAwareClusterITCase.java
new file mode 100644
index 000000000..edf57fb32
--- /dev/null
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/RackAwareClusterITCase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.admin;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.server.zk.data.BucketAssignment;
+import org.apache.fluss.server.zk.data.TableAssignment;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.Optional;
+
+import static org.apache.fluss.client.admin.FlussAdminITCase.DEFAULT_SCHEMA;
+import static
org.apache.fluss.client.admin.FlussAdminITCase.DEFAULT_TABLE_PATH;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for rack aware cluster. */
+public class RackAwareClusterITCase {
+
+ @RegisterExtension
+ public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+ FlussClusterExtension.builder()
+ .setNumOfTabletServers(4)
+ .setRacks(new String[] {"rack-0", "rack-1", "rack-2",
"rack-0"})
+ .build();
+
+ protected Connection conn;
+ protected Admin admin;
+ protected Configuration clientConf;
+
+ @BeforeEach
+ protected void setup() throws Exception {
+ clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+ conn = ConnectionFactory.createConnection(clientConf);
+ admin = conn.getAdmin();
+ }
+
+ @AfterEach
+ protected void teardown() throws Exception {
+ if (admin != null) {
+ admin.close();
+ admin = null;
+ }
+
+ if (conn != null) {
+ conn.close();
+ conn = null;
+ }
+ }
+
+ @Test
+ void testCreateTableWithInsufficientRack() throws Exception {
+ TablePath tablePath =
+ TablePath.of(DEFAULT_TABLE_PATH.getDatabaseName(),
"t1-with-replica-factor-4");
+ // set replica factor to a number larger than available racks (ts-0:
rack-0, ts-1: rack-1,
+ // ts-2: rack-2)
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(DEFAULT_SCHEMA)
+ .comment("test table")
+ .customProperty("connector", "fluss")
+ .distributedBy(1, "id")
+
.property(ConfigOptions.TABLE_REPLICATION_FACTOR.key(), "4")
+ .build();
+ admin.createDatabase(DEFAULT_TABLE_PATH.getDatabaseName(),
DatabaseDescriptor.EMPTY, false)
+ .get();
+
+ // In this case, create table will success with two replicas on one
rack.
+ admin.createTable(tablePath, tableDescriptor, false).get();
+
+ TableInfo tableInfo = admin.getTableInfo(tablePath).get();
+ Optional<TableAssignment> assignmentOpt =
+ FLUSS_CLUSTER_EXTENSION
+ .getZooKeeperClient()
+ .getTableAssignment(tableInfo.getTableId());
+ assertThat(assignmentOpt.isPresent()).isTrue();
+ TableAssignment assignment = assignmentOpt.get();
+ BucketAssignment bucketAssignment = assignment.getBucketAssignment(0);
+
assertThat(bucketAssignment.getReplicas()).containsExactlyInAnyOrder(0, 1, 2,
3);
+
+ admin.dropTable(tablePath, false).get();
+ admin.dropDatabase(DEFAULT_TABLE_PATH.getDatabaseName(), false,
false).get();
+ }
+}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java
b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java
index 6fb27c3a7..7a01116c7 100644
---
a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java
+++
b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java
@@ -38,7 +38,13 @@ public enum GoalType {
* Goal to generate leadership movement and leader replica movement tasks
to ensure that the
* number of leader replicas on each tabletServer is near balanced.
*/
- LEADER_DISTRIBUTION(1);
+ LEADER_DISTRIBUTION(1),
+
+ /**
+ * Goal to generate replica movement tasks to ensure that the number of
replicas on each
+ * tabletServer is near balanced and the replicas are distributed across
racks.
+ */
+ RACK_AWARE(2);
public final int value;
@@ -51,6 +57,8 @@ public enum GoalType {
return REPLICA_DISTRIBUTION;
} else if (value == LEADER_DISTRIBUTION.value) {
return LEADER_DISTRIBUTION;
+ } else if (value == RACK_AWARE.value) {
+ return RACK_AWARE;
} else {
throw new IllegalArgumentException(
String.format(
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalUtils.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalUtils.java
index 3385964ce..652b2a231 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalUtils.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalUtils.java
@@ -20,6 +20,7 @@ package org.apache.fluss.server.coordinator.rebalance.goal;
import org.apache.fluss.cluster.rebalance.GoalType;
import org.apache.fluss.server.coordinator.rebalance.ActionType;
import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModelStats;
import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
@@ -36,6 +37,8 @@ public class GoalUtils {
return new ReplicaDistributionGoal();
case LEADER_DISTRIBUTION:
return new LeaderReplicaDistributionGoal();
+ case RACK_AWARE:
+ return new RackAwareGoal();
default:
throw new IllegalArgumentException("Unsupported goal type " +
goalType);
}
@@ -76,4 +79,19 @@ public class GoalUtils {
.map(ServerModel::id)
.collect(Collectors.toCollection(HashSet::new));
}
+
+ /** A convenience {@link Goal.ClusterModelStatsComparator} for typical
hard goals. */
+ public static class HardGoalStatsComparator implements
Goal.ClusterModelStatsComparator {
+ @Override
+ public int compare(ClusterModelStats stats1, ClusterModelStats stats2)
{
+ // Stats are irrelevant to a hard goal. The optimization would
already fail if the goal
+ // requirements are not met.
+ return 0;
+ }
+
+ @Override
+ public String explainLastComparison() {
+ return null;
+ }
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareAbstractGoal.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareAbstractGoal.java
new file mode 100644
index 000000000..bbdbf1532
--- /dev/null
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareAbstractGoal.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.exception.RebalanceFailureException;
+import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance;
+import org.apache.fluss.server.coordinator.rebalance.ActionType;
+import org.apache.fluss.server.coordinator.rebalance.RebalancingAction;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.SortedSet;
+
+import static
org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT;
+import static
org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.SERVER_REJECT;
+
+/** An abstract class for rack-aware goals. */
+public abstract class RackAwareAbstractGoal extends AbstractGoal {
+ private static final Logger LOG =
LoggerFactory.getLogger(RackAwareAbstractGoal.class);
+
+ @Override
+ public ClusterModelStatsComparator clusterModelStatsComparator() {
+ return new GoalUtils.HardGoalStatsComparator();
+ }
+
+ @Override
+ protected boolean selfSatisfied(ClusterModel clusterModel,
RebalancingAction action) {
+ return true;
+ }
+
+ /**
+ * Check whether the given action is acceptable by this goal. The
following actions are
+ * acceptable:
+ *
+ * <ul>
+ * <li>All leadership moves
+ * <li>Replica moves that do not violate {@link
+ * #doesReplicaMoveViolateActionAcceptance(ClusterModel,
ReplicaModel, ServerModel)}
+ * </ul>
+ *
+ * @param action Action to be checked for acceptance.
+ * @param clusterModel The state of the cluster.
+ * @return {@link ActionAcceptance#ACCEPT} if the action is acceptable by
this goal, {@link
+ * ActionAcceptance#SERVER_REJECT} if the action is rejected due to
violating rack awareness
+ * in the destination broker after moving source replica to
destination broker.
+ */
+ @Override
+ public ActionAcceptance actionAcceptance(RebalancingAction action,
ClusterModel clusterModel) {
+ switch (action.getActionType()) {
+ case LEADERSHIP_MOVEMENT:
+ return ACCEPT;
+ case REPLICA_MOVEMENT:
+ if (doesReplicaMoveViolateActionAcceptance(
+ clusterModel,
+ clusterModel
+ .server(action.getSourceServerId())
+ .replica(action.getTableBucket()),
+ clusterModel.server(action.getDestinationServerId())))
{
+ return SERVER_REJECT;
+ }
+ return ACCEPT;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported rebalance action " +
action.getActionType() + " is provided.");
+ }
+ }
+
+ /**
+ * Check whether the given replica move would violate the action
acceptance for this rack aware
+ * goal.
+ *
+ * @param clusterModel The state of the cluster.
+ * @param sourceReplica Source replica
+ * @param destServer Destination server to receive the given source
replica.
+ * @return {@code true} if the given replica move would violate action
acceptance (i.e. the move
+ * is not acceptable), {@code false} otherwise.
+ */
+ protected abstract boolean doesReplicaMoveViolateActionAcceptance(
+ ClusterModel clusterModel, ReplicaModel sourceReplica, ServerModel
destServer);
+
+ /**
+ * Rebalance the given serverModel without violating the constraints of
this rack aware goal and
+ * optimized goals.
+ *
+ * @param serverModel Server to be balanced.
+ * @param clusterModel The state of the cluster.
+ * @param optimizedGoals Optimized goals.
+ */
+ protected void rebalanceForServer(
+ ServerModel serverModel, ClusterModel clusterModel, Set<Goal>
optimizedGoals)
+ throws RebalanceFailureException {
+ // TODO maybe use a sorted replicas set
+ for (ReplicaModel replica : serverModel.replicas()) {
+ if (!serverModel.isOfflineTagged()
+ && shouldKeepInTheCurrentServer(replica, clusterModel)) {
+ continue;
+ }
+ // The relevant rack awareness condition is violated. Move replica
to an eligible
+ // serverModel
+ SortedSet<ServerModel> eligibleServers =
+ rackAwareEligibleServers(replica, clusterModel);
+ if (maybeApplyBalancingAction(
+ clusterModel,
+ replica,
+ eligibleServers,
+ ActionType.REPLICA_MOVEMENT,
+ optimizedGoals)
+ == null) {
+ LOG.debug(
+ "Cannot move replica {} to any serverModel in {}",
+ replica,
+ eligibleServers);
+ }
+ }
+ }
+
+ /**
+ * Check whether the given alive replica should stay in the current server
or be moved to
+ * another server to satisfy the specific requirements of the rack aware
goal in the given
+ * cluster state.
+ *
+ * @param replica An alive replica to check whether it should stay in the
current server.
+ * @param clusterModel The state of the cluster.
+ * @return {@code true} if the given alive replica should stay in the
current server, {@code
+ * false} otherwise.
+ */
+ protected abstract boolean shouldKeepInTheCurrentServer(
+ ReplicaModel replica, ClusterModel clusterModel);
+
+ /**
+ * Get a list of eligible servers for moving the given replica in the
given cluster to satisfy
+ * the specific requirements of the rack aware goal.
+ *
+ * @param replica Replica for which a set of rack aware eligible servers
are requested.
+ * @param clusterModel The state of the cluster.
+ * @return A list of rack aware eligible servers for the given replica in
the given cluster.
+ */
+ protected abstract SortedSet<ServerModel> rackAwareEligibleServers(
+ ReplicaModel replica, ClusterModel clusterModel);
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoal.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoal.java
new file mode 100644
index 000000000..ba1d4bd3b
--- /dev/null
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoal.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.exception.RebalanceFailureException;
+import org.apache.fluss.server.coordinator.rebalance.model.BucketModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.RackModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * Generate replica movement proposals to provide rack-aware replica
distribution, which ensure that
+ * all replicas of each bucket are assigned in a rack aware manner -- i.e. no
more than one replica
+ * of each bucket resides in the same rack.
+ */
+public class RackAwareGoal extends RackAwareAbstractGoal {
+
+ @Override
+ protected boolean doesReplicaMoveViolateActionAcceptance(
+ ClusterModel clusterModel, ReplicaModel sourceReplica, ServerModel
destServer) {
+ // Destination server cannot be in a rack that violates rack awareness.
+ BucketModel bucket = clusterModel.bucket(sourceReplica.tableBucket());
+ checkNotNull(bucket, "Bucket for replica " + sourceReplica + " is not
found");
+ Set<ServerModel> bucketServers = bucket.bucketServers();
+ bucketServers.remove(sourceReplica.server());
+
+ // If destination server exists on any of the rack of other replicas,
it violates the
+ // rack-awareness
+ return
bucketServers.stream().map(ServerModel::rack).anyMatch(destServer.rack()::equals);
+ }
+
+ /**
+ * This is a hard goal; hence, the proposals are not limited to dead
server replicas in case of
+ * self-healing. Sanity Check: There exists sufficient number of racks for
achieving
+ * rack-awareness.
+ *
+ * @param clusterModel The state of the cluster.
+ */
+ @Override
+ protected void initGoalState(ClusterModel clusterModel) throws
RebalanceFailureException {
+ // Sanity Check: not enough racks to satisfy rack awareness.
+ // Assumes number of racks doesn't exceed Integer.MAX_VALUE.
+ int numAvailableRacks =
+ (int)
+
clusterModel.racksContainServerWithoutOfflineTag().stream()
+ .map(RackModel::rack)
+ .distinct()
+ .count();
+ if (clusterModel.maxReplicationFactor() > numAvailableRacks) {
+ throw new RebalanceFailureException(
+ String.format(
+ "[%s] Insufficient number of racks to distribute
each replica (Current: %d, Needed: %d).",
+ name(), numAvailableRacks,
clusterModel.maxReplicationFactor()));
+ }
+ }
+
+ /**
+ * Update goal state. Sanity check: After completion of balancing, confirm
that replicas of each
+ * bucket reside at a separate rack.
+ *
+ * @param clusterModel The state of the cluster.
+ */
+ @Override
+ protected void updateGoalState(ClusterModel clusterModel) throws
RebalanceFailureException {
+ finish();
+ }
+
+ /**
+ * Get a list of rack aware eligible servers for the given replica in the
given cluster. A
+ * server is rack aware eligible for a given replica if the server resides
in a rack where no
+ * other server in the same rack contains a replica from the same bucket
of the given replica.
+ *
+ * @param replica Replica for which a set of rack aware eligible servers
are requested.
+ * @param clusterModel The state of the cluster.
+ * @return A list of rack aware eligible servers for the given replica in
the given cluster.
+ */
+ @Override
+ protected SortedSet<ServerModel> rackAwareEligibleServers(
+ ReplicaModel replica, ClusterModel clusterModel) {
+ // Populate bucket rackIds.
+ BucketModel bucket = clusterModel.bucket(replica.tableBucket());
+ checkNotNull(bucket, "Bucket for replica " + replica + " is not
found");
+ List<String> bucketRackIds =
+
bucket.bucketServers().stream().map(ServerModel::rack).collect(Collectors.toList());
+
+ // Remove rackId of the given replica, but if there is any other
replica from the bucket
+ // residing in the same cluster, keep its rackId in the list.
+ bucketRackIds.remove(replica.server().rack());
+
+ SortedSet<ServerModel> rackAwareEligibleServers =
+ new TreeSet<>(Comparator.comparingInt(ServerModel::id));
+ for (ServerModel server : clusterModel.aliveServers()) {
+ if (!bucketRackIds.contains(server.rack())) {
+ rackAwareEligibleServers.add(server);
+ }
+ }
+ // Return eligible servers.
+ return rackAwareEligibleServers;
+ }
+
+ @Override
+ protected boolean shouldKeepInTheCurrentServer(
+ ReplicaModel replica, ClusterModel clusterModel) {
+ // Rack awareness requires no more than one replica from a given
bucket residing in any
+ // rack in the cluster
+ String myRackId = replica.server().rack();
+ int myServerId = replica.serverId();
+ BucketModel bucket = clusterModel.bucket(replica.tableBucket());
+ checkNotNull(bucket, "Bucket for replica " + replica + " is not
found");
+ for (ServerModel bucketServer : bucket.bucketServers()) {
+ if (myRackId.equals(bucketServer.rack()) && myServerId !=
bucketServer.id()) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoal.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoal.java
index ae269474b..66e63b60e 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoal.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoal.java
@@ -61,9 +61,9 @@ public class ReplicaDistributionGoal extends
ReplicaDistributionAbstractGoal {
// TODO configurable.
/**
- * The maximum allowed extent of unbalance for replica leader replica
distribution. For example,
- * 1.10 means the highest leader replica count of a server should not be
1.10x of average leader
- * replica count of all alive tabletServers.
+ * The maximum allowed extent of unbalance for replica distribution. For
example, 1.10 means the
+ * highest replica count of a server should not be 1.10x of average
replica count of all alive
+ * tabletServers.
*/
private static final Double REPLICA_COUNT_REBALANCE_THRESHOLD = 1.10d;
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/BucketModel.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/BucketModel.java
index d64c464eb..9cf189776 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/BucketModel.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/BucketModel.java
@@ -62,6 +62,17 @@ public class BucketModel {
return bucketServers;
}
+ public List<ServerModel> followerServers() {
+ List<ServerModel> followerServers = new ArrayList<>();
+ replicas.forEach(
+ replica -> {
+ if (!replica.isLeader()) {
+ followerServers.add(replica.server());
+ }
+ });
+ return followerServers;
+ }
+
public boolean canAssignReplicaToServer(ServerModel candidateServer) {
return !ineligibleServers.contains(candidateServer);
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModel.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModel.java
index d55f6a3cb..8d481ecfd 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModel.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModel.java
@@ -53,9 +53,14 @@ public class ClusterModel {
private final SortedSet<ServerModel> servers;
private final Map<TableBucket, BucketModel> bucketsByTableBucket;
+ // An integer to keep track of the maximum replication factor that a
bucket was ever created
+ // with.
+ private int maxReplicationFactor;
+
public ClusterModel(SortedSet<ServerModel> servers) {
this.servers = servers;
this.bucketsByTableBucket = new HashMap<>();
+ this.maxReplicationFactor = 1;
this.aliveServers = new HashSet<>();
this.offlineServers = new TreeSet<>();
@@ -76,6 +81,13 @@ public class ClusterModel {
}
}
+ /**
+ * @return The maximum replication factor of a bucket that was added to
the cluster before.
+ */
+ public int maxReplicationFactor() {
+ return maxReplicationFactor;
+ }
+
public SortedSet<ServerModel> offlineServers() {
return offlineServers;
}
@@ -88,6 +100,15 @@ public class ClusterModel {
return Collections.unmodifiableSet(aliveServers);
}
+ /**
+ * @return Racks that contain a server without offline tag.
+ */
+ public Set<RackModel> racksContainServerWithoutOfflineTag() {
+ return racksById.values().stream()
+ .filter(RackModel::rackContainsServerWithoutOfflineTag)
+ .collect(Collectors.toSet());
+ }
+
public @Nullable BucketModel bucket(TableBucket tableBucket) {
return bucketsByTableBucket.get(tableBucket);
}
@@ -110,6 +131,20 @@ public class ClusterModel {
return bucketsByTableBucket.values().stream().mapToInt(p ->
p.replicas().size()).sum();
}
+ /**
+ * @return All the leader replicas in the cluster.
+ */
+ public Set<ReplicaModel> leaderReplicas() {
+ Set<ReplicaModel> leaderReplicas = new HashSet<>();
+ for (BucketModel bucket : bucketsByTableBucket.values()) {
+ ReplicaModel leader = bucket.leader();
+ if (leader != null) {
+ leaderReplicas.add(leader);
+ }
+ }
+ return leaderReplicas;
+ }
+
public int numLeaderReplicas() {
int numLeaderReplicas = 0;
for (BucketModel bucket : bucketsByTableBucket.values()) {
@@ -193,6 +228,8 @@ public class ClusterModel {
} else {
bucket.addFollower(replica, index);
}
+
+ maxReplicationFactor = Math.max(maxReplicationFactor,
bucket.replicas().size());
}
/**
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/RackModel.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/RackModel.java
index fdf9cbad8..d5a95bbef 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/RackModel.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/RackModel.java
@@ -59,6 +59,18 @@ public class RackModel {
return rack;
}
+ /**
+ * @return true if the rack contains a server without offline tag.
+ */
+ public boolean rackContainsServerWithoutOfflineTag() {
+ for (ServerModel server : servers.values()) {
+ if (!server.isOfflineTagged()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
@Nullable
ServerModel server(int serverId) {
return servers.get(serverId);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModel.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModel.java
index 8b853ba32..116d4b4e1 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModel.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModel.java
@@ -190,7 +190,7 @@ public class ServerModel implements Comparable<ServerModel>
{
@Override
public String toString() {
return String.format(
- "ServerModel[id=%s,rack=%s,isAlive=%s,replicaCount=%s]",
+
"ServerModel[id=%s,rack=%s,isOfflineTagged=%s,replicaCount=%s]",
serverId, rack, isOfflineTagged, replicas.size());
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoalTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoalTest.java
new file mode 100644
index 000000000..d73af6c1f
--- /dev/null
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoalTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
+import org.apache.fluss.exception.RebalanceFailureException;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static
org.apache.fluss.server.coordinator.rebalance.RebalanceTestUtils.addBucket;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link RackAwareGoal}. */
+public class RackAwareGoalTest {
+
+ @Test
+ void testReplicaNumExceedsRackNum() {
+ SortedSet<ServerModel> servers = new TreeSet<>();
+ servers.add(new ServerModel(0, "rack0", false));
+ servers.add(new ServerModel(1, "rack1", false));
+ // server2 offline.
+ servers.add(new ServerModel(2, "rack2", true));
+ ClusterModel clusterModel = new ClusterModel(servers);
+ TableBucket t1b0 = new TableBucket(1, 0);
+ addBucket(clusterModel, t1b0, Arrays.asList(0, 1, 2));
+
+ RackAwareGoal goal = new RackAwareGoal();
+ assertThatThrownBy(() -> goal.optimize(clusterModel,
Collections.singleton(goal)))
+ .isInstanceOf(RebalanceFailureException.class)
+ .hasMessage(
+ "[RackAwareGoal] Insufficient number of racks to
distribute each replica (Current: 2, Needed: 3).");
+ }
+
+ @Test
+ void testReplicaMove() {
+ SortedSet<ServerModel> servers = new TreeSet<>();
+ servers.add(new ServerModel(0, "rack0", false));
+ servers.add(new ServerModel(1, "rack1", false));
+ servers.add(new ServerModel(2, "rack2", false));
+ servers.add(new ServerModel(3, "rack0", false));
+ ClusterModel clusterModel = new ClusterModel(servers);
+
+ TableBucket t1b0 = new TableBucket(1, 0);
+ addBucket(clusterModel, t1b0, Arrays.asList(0, 1, 3));
+
+ // check the follower will be moved to server2.
+ RackAwareGoal goal = new RackAwareGoal();
+ GoalOptimizer goalOptimizer = new GoalOptimizer();
+ List<RebalancePlanForBucket> rebalancePlanForBuckets =
+ goalOptimizer.doOptimizeOnce(clusterModel,
Collections.singletonList(goal));
+ assertThat(rebalancePlanForBuckets).hasSize(1);
+ assertThat(rebalancePlanForBuckets.get(0))
+ .isEqualTo(
+ new RebalancePlanForBucket(
+ t1b0, 0, 2, Arrays.asList(0, 1, 3),
Arrays.asList(2, 1, 3)));
+ }
+
+ @Test
+ void testReplicaDistributionNotBalanceAcrossRackAndServer() {
+ // RackAwareGoal only requires that replicas of the same bucket cannot
be distributed on
+ // the same rack, but it does not care about the balance of replicas
between racks, nor does
+ // it care about the balance of replicas between servers.
+ ClusterModel clusterModel =
generateUnbalancedReplicaAcrossServerAndRack();
+ RackAwareGoal goal = new RackAwareGoal();
+ GoalOptimizer goalOptimizer = new GoalOptimizer();
+ List<RebalancePlanForBucket> rebalancePlanForBuckets =
+ goalOptimizer.doOptimizeOnce(clusterModel,
Collections.singletonList(goal));
+ assertThat(rebalancePlanForBuckets).hasSize(0);
+ }
+
+ @Test
+ void testReplicaDistributionBalanceAcrossServer() {
+ // the same input of
`testReplicaDistributionNotBalanceAcrossRackAndServer`, if we combine
+ // using RackAwareGoal and ReplicaDistributionGoal, the replica
distribution will be
+ // balanced across servers.
+ ClusterModel clusterModel =
generateUnbalancedReplicaAcrossServerAndRack();
+ RackAwareGoal rackAwareGoal = new RackAwareGoal();
+ ReplicaDistributionGoal replicaDistributionGoal = new
TestReplicaDistributionGoal();
+ GoalOptimizer goalOptimizer = new GoalOptimizer();
+ List<RebalancePlanForBucket> rebalancePlanForBuckets =
+ goalOptimizer.doOptimizeOnce(
+ clusterModel, Arrays.asList(rackAwareGoal,
replicaDistributionGoal));
+ // Realance result(ReplicaNum) from server side are all 2.
+ assertThat(rebalancePlanForBuckets).hasSize(2);
+ assertThat(rebalancePlanForBuckets.get(0))
+ .isEqualTo(
+ new RebalancePlanForBucket(
+ new TableBucket(1, 3),
+ 0,
+ 1,
+ Arrays.asList(0, 3, 5),
+ Arrays.asList(1, 3, 5)));
+ assertThat(rebalancePlanForBuckets.get(1))
+ .isEqualTo(
+ new RebalancePlanForBucket(
+ new TableBucket(1, 1),
+ 0,
+ 1,
+ Arrays.asList(0, 2, 5),
+ Arrays.asList(1, 3, 5)));
+ }
+
+ @Test
+ void testMoveActionWithSameRackWillNotBeAccepted() {
+ SortedSet<ServerModel> servers = new TreeSet<>();
+ servers.add(new ServerModel(0, "rack0", false));
+ servers.add(new ServerModel(1, "rack1", false));
+ servers.add(new ServerModel(2, "rack2", false));
+ ServerModel server3 = new ServerModel(3, "rack0", false);
+ servers.add(server3);
+ ServerModel server4 = new ServerModel(4, "rack3", false);
+ servers.add(server4);
+ ClusterModel clusterModel = new ClusterModel(servers);
+ TableBucket t1b0 = new TableBucket(1, 0);
+ addBucket(clusterModel, t1b0, Arrays.asList(0, 1, 2));
+
+ RackAwareGoal rackAwareGoal = new RackAwareGoal();
+ ReplicaModel sourceReplica = new ReplicaModel(t1b0,
clusterModel.server(2), false);
+
+ // server3 is in the same rack with leader replica in t1b0 bucket
model, so the move action
+ // will be rejected.
+ assertThat(
+ rackAwareGoal.doesReplicaMoveViolateActionAcceptance(
+ clusterModel, sourceReplica, server3))
+ .isTrue();
+
+ // server4 is in the different rack with all the replicas in t1b0
bucket model, so the move
+ // action will be accepted.
+ assertThat(
+ rackAwareGoal.doesReplicaMoveViolateActionAcceptance(
+ clusterModel, sourceReplica, server4))
+ .isFalse();
+ }
+
+ private ClusterModel generateUnbalancedReplicaAcrossServerAndRack() {
+ SortedSet<ServerModel> servers = new TreeSet<>();
+ servers.add(new ServerModel(0, "rack0", false));
+ servers.add(new ServerModel(1, "rack0", false));
+ servers.add(new ServerModel(2, "rack1", false));
+ servers.add(new ServerModel(3, "rack1", false));
+ servers.add(new ServerModel(4, "rack2", false));
+ servers.add(new ServerModel(5, "rack3", false));
+ ClusterModel clusterModel = new ClusterModel(servers);
+
+ // For the following case, RackAwareGoal will not remove any replicas
but the replica
+ // distribution is not balanced not only in racks but also in servers.
+ // t1b0 -> 0, 2, 4
+ // t1b1 -> 0, 2, 5
+ // t1b2 -> 0, 2, 4
+ // t1b3 -> 0, 3, 5
+
+ // Replica num from server side: server0: 4, server1: 0, server2: 3,
server3: 1, server4: 1,
+ // server5: 1
+ // Replica num from rack side: rack0: 4, rack1: 4, rack2: 2, rack3: 2
+ TableBucket t1b0 = new TableBucket(1, 0);
+ addBucket(clusterModel, t1b0, Arrays.asList(0, 2, 4));
+ TableBucket t1b1 = new TableBucket(1, 1);
+ addBucket(clusterModel, t1b1, Arrays.asList(0, 2, 5));
+ TableBucket t1b2 = new TableBucket(1, 2);
+ addBucket(clusterModel, t1b2, Arrays.asList(0, 2, 4));
+ TableBucket t1b3 = new TableBucket(1, 3);
+ addBucket(clusterModel, t1b3, Arrays.asList(0, 3, 5));
+ return clusterModel;
+ }
+
+ private static final class TestReplicaDistributionGoal extends
ReplicaDistributionGoal {
+
+ @Override
+ protected double balancePercentage() {
+ return 1.0d;
+ }
+ }
+}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelTest.java
index d375923c9..1908c94b7 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelTest.java
@@ -145,4 +145,19 @@ public class ClusterModelTest {
.hasMessageContaining(
"Requested replica 1 is not a replica of bucket
TableBucket{tableId=1, bucket=0}");
}
+
+ @Test
+ void testMaxReplicaFactor() {
+ ClusterModel clusterModel = new ClusterModel(servers);
+ assertThat(clusterModel.maxReplicationFactor()).isEqualTo(1);
+
+ clusterModel.createReplica(0, new TableBucket(1, 0), 0, true);
+ assertThat(clusterModel.maxReplicationFactor()).isEqualTo(1);
+ clusterModel.createReplica(1, new TableBucket(1, 0), 1, false);
+ assertThat(clusterModel.maxReplicationFactor()).isEqualTo(2);
+ clusterModel.createReplica(2, new TableBucket(1, 0), 2, false);
+ assertThat(clusterModel.maxReplicationFactor()).isEqualTo(3);
+ clusterModel.createReplica(0, new TableBucket(2, 0L, 0), 0, true);
+ assertThat(clusterModel.maxReplicationFactor()).isEqualTo(3);
+ }
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java
index edaff4b76..3b895fd19 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java
@@ -81,13 +81,13 @@ public class ServerModelTest {
void testToString() {
ServerModel serverModel = new ServerModel(0, "rack0", true);
assertThat(serverModel.toString())
-
.isEqualTo("ServerModel[id=0,rack=rack0,isAlive=true,replicaCount=0]");
+
.isEqualTo("ServerModel[id=0,rack=rack0,isOfflineTagged=true,replicaCount=0]");
serverModel.putReplica(
new TableBucket(1L, 0),
new ReplicaModel(new TableBucket(1L, 0), serverModel, false));
assertThat(serverModel.toString())
-
.isEqualTo("ServerModel[id=0,rack=rack0,isAlive=true,replicaCount=1]");
+
.isEqualTo("ServerModel[id=0,rack=rack0,isOfflineTagged=true,replicaCount=1]");
}
@Test
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
index 43b12943d..2f2ee3735 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
@@ -103,6 +103,7 @@ import static
org.apache.fluss.server.zk.ZooKeeperTestUtils.createZooKeeperClien
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
+import static org.apache.fluss.utils.Preconditions.checkArgument;
import static org.apache.fluss.utils.function.FunctionUtils.uncheckedFunction;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
@@ -134,6 +135,7 @@ public final class FlussClusterExtension
private final Map<Integer, ServerInfo> tabletServerInfos;
private final Configuration clusterConf;
private final Clock clock;
+ private final String[] racks;
/** Creates a new {@link Builder} for {@link FlussClusterExtension}. */
public static Builder builder() {
@@ -145,7 +147,8 @@ public final class FlussClusterExtension
String coordinatorServerListeners,
String tabletServerListeners,
Configuration clusterConf,
- Clock clock) {
+ Clock clock,
+ String[] racks) {
this.initialNumOfTabletServers = numOfTabletServers;
this.tabletServers = new HashMap<>(numOfTabletServers);
this.coordinatorServerListeners = coordinatorServerListeners;
@@ -153,6 +156,10 @@ public final class FlussClusterExtension
this.tabletServerInfos = new HashMap<>();
this.clusterConf = clusterConf;
this.clock = clock;
+ checkArgument(
+ racks != null && racks.length == numOfTabletServers,
+ "racks must be not null and have the same length as
numOfTabletServers");
+ this.racks = racks;
}
@Override
@@ -310,10 +317,17 @@ public final class FlussClusterExtension
private void startTabletServer(int serverId, @Nullable Configuration
overwriteConfig)
throws Exception {
+ String rackName;
+ if (racks.length <= serverId) {
+ rackName = "rack-" + serverId;
+ } else {
+ rackName = racks[serverId];
+ }
+
String dataDir = getDataDir(serverId);
Configuration tabletServerConf = new Configuration(clusterConf);
tabletServerConf.set(ConfigOptions.TABLET_SERVER_ID, serverId);
- tabletServerConf.set(ConfigOptions.TABLET_SERVER_RACK, "rack" +
serverId);
+ tabletServerConf.set(ConfigOptions.TABLET_SERVER_RACK, rackName);
tabletServerConf.set(ConfigOptions.DATA_DIR, dataDir);
tabletServerConf.setString(
ConfigOptions.ZOOKEEPER_ADDRESS,
zooKeeperServer.getConnectString());
@@ -329,7 +343,7 @@ public final class FlussClusterExtension
ServerInfo serverInfo =
new ServerInfo(
serverId,
- "rack" + serverId,
+ rackName,
tabletServer.getRpcServer().getBindEndpoints(),
ServerType.TABLET_SERVER);
@@ -932,6 +946,7 @@ public final class FlussClusterExtension
private String tabletServerListeners = DEFAULT_LISTENERS;
private String coordinatorServerListeners = DEFAULT_LISTENERS;
private Clock clock = SystemClock.getInstance();
+ private String[] racks = new String[] {"rack-0"};
private final Configuration clusterConf = new Configuration();
@@ -971,13 +986,28 @@ public final class FlussClusterExtension
return this;
}
+ /** Sets the racks of tablet servers. */
+ public Builder setRacks(String[] racks) {
+ this.racks = racks;
+ return this;
+ }
+
public FlussClusterExtension build() {
+ if (numOfTabletServers > 1 && racks.length == 1) {
+ String[] racks = new String[numOfTabletServers];
+ for (int i = 0; i < numOfTabletServers; i++) {
+ racks[i] = "rack-" + i;
+ }
+ this.racks = racks;
+ }
+
return new FlussClusterExtension(
numOfTabletServers,
coordinatorServerListeners,
tabletServerListeners,
clusterConf,
- clock);
+ clock,
+ racks);
}
}
}