wuchong commented on code in PR #1452: URL: https://github.com/apache/fluss/pull/1452#discussion_r2667587121
########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/RebalanceProcedure.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.procedure; + +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.cluster.rebalance.GoalType; +import org.apache.fluss.cluster.rebalance.RebalancePlan; +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** + * Procedure to trigger rebalance. + * + * <p>This procedure allows triggering rebalance with different goals. See {@link + * Admin#rebalance(List, boolean)} for more details. + * + * <p>Usage examples: + * + * <pre> + * -- Trigger rebalance with REPLICA_DISTRIBUTION goal + * CALL sys.rebalance('REPLICA_DISTRIBUTION'); + * -- Trigger rebalance with REPLICA_DISTRIBUTION and LEADER_DISTRIBUTION goals + * CALL sys.rebalance('REPLICA_DISTRIBUTION;LEADER_DISTRIBUTION'); + * + * -- Trigger rebalance without dry run + * CALL sys.rebalance('REPLICA_DISTRIBUTION', false); + * + * -- Trigger rebalance with dry run + * CALL sys.rebalance('REPLICA_DISTRIBUTION', true); + * </pre> + */ +public class RebalanceProcedure extends ProcedureBase { + + /** + * As flink call don't support input a nested type like 'ARRAY'. So priorityGoals is defined as + * a String type, and different goals are split by ';'. + */ + @ProcedureHint( + argument = { + @ArgumentHint(name = "priorityGoals", type = @DataTypeHint("STRING")), + @ArgumentHint(name = "dryRun", type = @DataTypeHint("BOOLEAN"), isOptional = true) + }) + public String[] call(ProcedureContext context, String priorityGoals, @Nullable Boolean dryRun) + throws Exception { + List<GoalType> goalTypes = validateAndGetPriorityGoals(priorityGoals); + RebalancePlan rebalancePlan = admin.rebalance(goalTypes, dryRun != null && dryRun).get(); + return planToString(rebalancePlan); + } + + private static String[] planToString(RebalancePlan plan) { + List<String> result = new ArrayList<>(); + result.add("Rebalance id: " + plan.getRebalanceId()); + result.add("Detail rebalance plan:"); + plan.getPlanForBucketMap().values().stream() + .map(RebalancePlanForBucket::toString) + .forEach(result::add); + return result.toArray(new String[0]); + } + + private static List<GoalType> validateAndGetPriorityGoals(String priorityGoals) { + if (priorityGoals == null || priorityGoals.trim().isEmpty()) { + throw new IllegalArgumentException( + "priority goals cannot be null or empty. You can specify one goal as 'REPLICA_DISTRIBUTION' or " + + "specify multi goals as 'REPLICA_DISTRIBUTION;LEADER_DISTRIBUTION' (split by ';')"); + } + + priorityGoals = priorityGoals.trim(); + String[] splitGoals = priorityGoals.split(";"); Review Comment: Use commas `,` uniformly as separators for list. We should update `org.apache.fluss.flink.procedure.RemoveServerTagProcedure#validateAndGetTabletServers` to use commans as well. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ListRebalanceProcessProcedure.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.procedure; + +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.cluster.rebalance.RebalanceProgress; +import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceStatus; +import org.apache.fluss.metadata.TableBucket; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +import javax.annotation.Nullable; + +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Procedure to list rebalance progress. + * + * <p>This procedure allows querying rebalance progress. See {@link + * Admin#listRebalanceProgress(String)} for more details. + * + * <p>Usage examples: + * + * <pre> + * -- List the rebalance progress without rebalance id + * CALL sys.list_rebalance_process(); + * + * -- List the rebalance progress with rebalance id + * CALL sys.list_rebalance_process('xxx_xxx_xxx'); + * </pre> + */ +public class ListRebalanceProcessProcedure extends ProcedureBase { Review Comment: should be `sys.list_rebalance_progress`, but I suggest to simplify it into `sys.list_rebalance(..)` ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModel.java: ########## @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.model; + +import org.apache.fluss.metadata.TableBucket; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.stream.Collectors; + +/** + * A class that holds the information of the cluster for rebalance.The information including live + * tabletServers, bucket distribution, tabletServer tag etc. + * + * <p>Currently, the clusterModel can only be created by a rebalance request. It's used as the input + * of the GoalOptimizer to generate the rebalance plan for load rebalance. + */ +public class ClusterModel { + // TODO ClusterModel can be implemented in incremental mode, dynamically modified when there are + // events such as table create, table delete, server offline, etc. Currently designed to read + // coordinatorContext and generate it directly + + private final Map<String, RackModel> racksById; + private final Map<Integer, RackModel> serverIdToRack; + private final Set<ServerModel> aliveServers; + private final SortedSet<ServerModel> offlineServers; + private final SortedSet<ServerModel> servers; + private final Map<TableBucket, BucketModel> bucketsByTableBucket; + + public ClusterModel(SortedSet<ServerModel> servers) { + this.servers = servers; + this.bucketsByTableBucket = new HashMap<>(); + + this.aliveServers = new HashSet<>(); + this.offlineServers = new TreeSet<>(); + for (ServerModel serverModel : servers) { + if (serverModel.isAlive()) { + aliveServers.add(serverModel); + } else { + offlineServers.add(serverModel); + } + } + + this.racksById = new HashMap<>(); + this.serverIdToRack = new HashMap<>(); + for (ServerModel serverModel : servers) { + RackModel rackModel = racksById.computeIfAbsent(serverModel.rack(), RackModel::new); + rackModel.addServer(serverModel); + serverIdToRack.put(serverModel.id(), rackModel); + } + } + + public SortedSet<ServerModel> offlineServers() { + return offlineServers; + } + + public SortedSet<ServerModel> servers() { + return servers; + } + + public Set<ServerModel> aliveServers() { + return Collections.unmodifiableSet(aliveServers); + } + + public @Nullable BucketModel bucket(TableBucket tableBucket) { + return bucketsByTableBucket.get(tableBucket); + } + + public RackModel rack(String rack) { + return racksById.get(rack); + } + + public @Nullable ServerModel server(int serverId) { + RackModel rack = serverIdToRack.get(serverId); + return rack == null ? null : rack.server(serverId); + } + + /** Populate the analysis stats with this cluster. */ + public ClusterModelStats getClusterStats() { + return (new ClusterModelStats()).populate(this); + } + + public int numReplicas() { + return bucketsByTableBucket.values().stream().mapToInt(p -> p.replicas().size()).sum(); + } + + public int numLeaderReplicas() { + int numLeaderReplicas = 0; + for (BucketModel bucket : bucketsByTableBucket.values()) { + numLeaderReplicas += bucket.leader() != null ? 1 : 0; + } + return numLeaderReplicas; + } + + public SortedMap<Long, List<BucketModel>> getBucketsByTable() { + SortedMap<Long, List<BucketModel>> bucketsByTable = new TreeMap<>(); + for (Long tableId : tables()) { + bucketsByTable.put(tableId, new ArrayList<>()); + } + for (Map.Entry<TableBucket, BucketModel> entry : bucketsByTableBucket.entrySet()) { + bucketsByTable.get(entry.getKey().getTableId()).add(entry.getValue()); + } + return bucketsByTable; + } + + public Set<Long> tables() { + Set<Long> tables = new HashSet<>(); + + for (RackModel rack : racksById.values()) { + tables.addAll(rack.tables()); + } + return tables; + } + + /** + * Get the distribution of replicas in the cluster at the point of call. + * + * @return A map from tableBucket to the list of replicas. the first element is the leader, the + * rest are followers. + */ + public Map<TableBucket, List<Integer>> getReplicaDistribution() { + Map<TableBucket, List<Integer>> replicaDistribution = new HashMap<>(); + for (Map.Entry<TableBucket, BucketModel> entry : bucketsByTableBucket.entrySet()) { + TableBucket tableBucket = entry.getKey(); + BucketModel bucket = entry.getValue(); + List<Integer> replicaIds = + bucket.replicas().stream() + .map(r -> r.server().id()) + .collect(Collectors.toList()); + replicaDistribution.put(tableBucket, replicaIds); + } + return replicaDistribution; + } + + public Map<TableBucket, Integer> getLeaderDistribution() { + Map<TableBucket, Integer> leaderDistribution = new HashMap<>(); + for (Map.Entry<TableBucket, BucketModel> entry : bucketsByTableBucket.entrySet()) { + TableBucket tableBucket = entry.getKey(); + BucketModel bucket = entry.getValue(); + + ReplicaModel replicaModel = bucket.leader(); + if (replicaModel == null) { + continue; + } + + leaderDistribution.put(tableBucket, replicaModel.server().id()); + } + return leaderDistribution; + } + + public void createReplica(int serverId, TableBucket tableBucket, int index, boolean isLeader) { + ServerModel server = server(serverId); + if (server == null) { + throw new IllegalArgumentException("Server is not in the cluster."); + } + + ReplicaModel replica = new ReplicaModel(tableBucket, server, isLeader); + server.putReplica(tableBucket, replica); + + if (!bucketsByTableBucket.containsKey(tableBucket)) { + bucketsByTableBucket.put(tableBucket, new BucketModel(tableBucket, offlineServers())); + } + + BucketModel bucket = bucketsByTableBucket.get(tableBucket); + if (isLeader) { + bucket.addLeader(replica, index); + } else { + bucket.addFollower(replica, index); + } + } + + /** + * Relocate leadership from source server to destination server. + * + * <ul> + * <li>1. Removes leadership from source replica. + * <li>2. Adds this leadership to the destination replica. + * <li>3. Updates the leader and list of followers of the bucket. + * </ul> + */ + public boolean relocateLeadership( + TableBucket tableBucket, int sourceServerId, int desServerId) { + // Sanity check to see if the source replica is the leader. + BucketModel bucket = bucketsByTableBucket.get(tableBucket); + ReplicaModel sourceReplica = bucket.replica(sourceServerId); + if (!sourceReplica.isLeader()) { + return false; + } + + // Sanity check to see if the destination replica is a follower. + ReplicaModel desReplica = bucket.replica(desServerId); + if (desReplica.isLeader()) { + throw new IllegalArgumentException( + "Cannot relocate leadership of bucket " + + tableBucket + + " from server " + + sourceServerId + + " to server " + + desServerId + + " because the destination replica is a leader."); + } + + ServerModel sourceServer = server(sourceServerId); + if (sourceServer == null) { + throw new IllegalArgumentException("Source server is not in the cluster."); + } + sourceServer.makeFollower(tableBucket); + + ServerModel destServer = server(desServerId); + if (destServer == null) { + throw new IllegalArgumentException("Destination server is not in the cluster."); + } + destServer.makeLeader(tableBucket); + + // Update the leader and list of followers of the bucket. + bucket.relocateLeadership(desReplica); + return true; + } + + /** + * Relocate replica from source server to destination server. + * + * <ul> + * <li>1. Removes the replica from source server. + * <li>2. Set the server of the removed replica as the dest server + * <li>3. Add this replica to the dest server. + * </ul> + */ + public void relocateReplica(TableBucket tableBucket, int sourceServerId, int destServerId) { + // Removes the replica from the source server. + ReplicaModel replica = removeReplica(sourceServerId, tableBucket); + if (replica == null) { + throw new IllegalArgumentException("Replica is not in the cluster."); + } + + // Updates the tabletServer of the removed replicas with dest server. + replica.setServer(server(destServerId)); + + // Add this replica back to destination rack and server. + String rack = replica.server().rack(); + rack(rack).addReplica(replica); + } + + private @Nullable ReplicaModel removeReplica(int serverId, TableBucket tableBucket) { + for (RackModel rack : racksById.values()) { + ReplicaModel removedReplica = rack.removeReplica(serverId, tableBucket); + if (removedReplica != null) { + return removedReplica; + } + } + return null; Review Comment: Why not directly `server(serverId).removeReplica(tableBucket)`? This will be `O(1)` vs. `O(n)`. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java: ########## @@ -1112,6 +1161,354 @@ private RemoveServerTagResponse processRemoveServerTag(RemoveServerTagEvent even return removeServerTagResponse; } + private RebalanceResponse processRebalance(RebalanceEvent rebalanceEvent) { + boolean isDryRun = rebalanceEvent.isDryRun(); + RebalancePlan rebalancePlan; + long startTime = System.currentTimeMillis(); + try { + rebalancePlan = + rebalanceManager.generateRebalancePlan(rebalanceEvent.getGoalsByPriority()); + } catch (Exception e) { + throw new RebalanceFailureException("Failed to generate rebalance plan.", e); Review Comment: Add `e.getMessage()` into the `RebalanceFailureException` message, otherwise, the root message will be truncate after network transfer. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java: ########## @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceProgress; +import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceStatus; +import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.exception.NoRebalanceInProgressException; +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.coordinator.CoordinatorContext; +import org.apache.fluss.server.coordinator.CoordinatorEventProcessor; +import org.apache.fluss.server.coordinator.rebalance.goal.Goal; +import org.apache.fluss.server.coordinator.rebalance.goal.GoalOptimizer; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.RackModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; +import org.apache.fluss.server.metadata.ServerInfo; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.server.zk.data.RebalancePlan; +import org.apache.fluss.utils.MapUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.ArrayDeque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.CANCELED; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NOT_STARTED; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NO_TASK; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.REBALANCING; +import static org.apache.fluss.utils.Preconditions.checkArgument; +import static org.apache.fluss.utils.Preconditions.checkNotNull; +import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock; +import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock; + +/** + * A rebalance manager to generate rebalance plan, and execution rebalance plan. + * + * <p>This manager can only be used in {@link CoordinatorEventProcessor} as a single threaded model. + */ +@ThreadSafe +public class RebalanceManager { + + private static final Logger LOG = LoggerFactory.getLogger(RebalanceManager.class); + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final ZooKeeperClient zkClient; + private final CoordinatorEventProcessor eventProcessor; + + /** A queue of in progress table bucket to rebalance. */ + @GuardedBy("lock") + private final Queue<TableBucket> inProgressRebalanceTasksQueue = new ArrayDeque<>(); + + /** A mapping from table bucket to rebalance status of pending and running tasks. */ + @GuardedBy("lock") + private final Map<TableBucket, RebalanceResultForBucket> inProgressRebalanceTasks = + MapUtils.newConcurrentHashMap(); + + /** A mapping from table bucket to rebalance status of failed or completed tasks. */ + @GuardedBy("lock") + private final Map<TableBucket, RebalanceResultForBucket> finishedRebalanceTasks = + MapUtils.newConcurrentHashMap(); + + private final GoalOptimizer goalOptimizer; + private volatile long registerTime; + private volatile RebalanceStatus rebalanceStatus = NO_TASK; + private volatile @Nullable String currentRebalanceId; + private volatile boolean isClosed = false; + + public RebalanceManager(CoordinatorEventProcessor eventProcessor, ZooKeeperClient zkClient) { + this.eventProcessor = eventProcessor; + this.zkClient = zkClient; + this.goalOptimizer = new GoalOptimizer(); + } + + public void startup() { + LOG.info("Start up rebalance manager."); + initialize(); + } + + private void initialize() { Review Comment: 1. I think we should split the `registerRebalance` into 2 methods: - `registerRebalance(plan)` which just register to zk node - `startRebalance(plan)` which add the plan to `RebalanceManager` and triggers the task and change the status into `REBALANCING`. 2. During `initialize`, if we have plan node in zk, we should just call `startRebalance(plan)` and skip zk register again. During client rebalance, we should call both. 3. during `initialize`, if the plan is not finished (completed, failed, cancelled), we should start and continue the plan, just add the plan into the `RebalanceManager` and triggers task. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModel.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.model; + +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePartition; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** A class that holds the information of the tabletServer for rebalance. */ +public class ServerModel implements Comparable<ServerModel> { + + private final int serverId; + private final boolean isAlive; + private final String rack; + private final Set<ReplicaModel> replicas; + /** A map for tracking (tableId) -> (BucketId -> replica) for none-partitioned table. */ + private final Map<Long, Map<Integer, ReplicaModel>> tableReplicas; + + /** A map for tracking (tableId, partitionId) -> (BucketId -> replica) for partitioned table. */ + private final Map<TablePartition, Map<Integer, ReplicaModel>> tablePartitionReplicas; + + public ServerModel(int serverId, String rack, boolean isAlive) { + this.serverId = serverId; + this.rack = rack; + this.isAlive = isAlive; + this.replicas = new HashSet<>(); + this.tableReplicas = new HashMap<>(); + this.tablePartitionReplicas = new HashMap<>(); + } + + public int id() { + return serverId; + } + + public String rack() { + return rack; + } + + public boolean isAlive() { Review Comment: better to call `isOfflineTagged`, because `isAlive` makes feel like this is `CoordinatorContext#liveTabletServerSet`, however, they are totally different things. ########## fluss-client/src/test/java/org/apache/fluss/client/admin/RebalanceITCase.java: ########## @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.admin; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.cluster.rebalance.GoalType; +import org.apache.fluss.cluster.rebalance.RebalancePlan; +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceProgress; +import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceStatus; +import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.NoRebalanceInProgressException; +import org.apache.fluss.metadata.DatabaseDescriptor; +import org.apache.fluss.metadata.PartitionSpec; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.replica.ReplicaManager; +import org.apache.fluss.server.testutils.FlussClusterExtension; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + +import static org.apache.fluss.record.TestData.DATA1_SCHEMA; +import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR; +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** ITCase for rebalance. */ +public class RebalanceITCase { + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = + FlussClusterExtension.builder() + .setNumOfTabletServers(4) + .setClusterConf(initConfig()) + .build(); + + private Connection conn; + private Admin admin; + + @BeforeEach + protected void setup() throws Exception { + conn = ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig()); + admin = conn.getAdmin(); + } + + @AfterEach + protected void teardown() throws Exception { + FLUSS_CLUSTER_EXTENSION.getZooKeeperClient().deleteRebalancePlan(); + + if (admin != null) { + admin.close(); + admin = null; + } + + if (conn != null) { + conn.close(); + conn = null; + } + } + + // TODO add test for primary key table, trace by https://github.com/apache/fluss/issues/2315 + + @Test + void testRebalanceForLogTable() throws Exception { + String dbName = "db-balance"; + admin.createDatabase(dbName, DatabaseDescriptor.EMPTY, false).get(); + + // add server tag PERMANENT_OFFLINE for server 3, this will avoid to generate bucket + // assignment on server 3 when create table. + admin.addServerTag(Collections.singletonList(3), ServerTag.PERMANENT_OFFLINE).get(); + + // create some none partitioned log table. + for (int i = 0; i < 2; i++) { + long tableId = + createTable( + new TablePath(dbName, "test-rebalance_table-" + i), + DATA1_TABLE_DESCRIPTOR, + false); + FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId); + } + + // create one partitioned table with two partitions. + TableDescriptor partitionedDescriptor = + TableDescriptor.builder() + .schema(DATA1_SCHEMA) + .distributedBy(3) + .partitionedBy("b") + .build(); + TablePath tablePath = new TablePath(dbName, "test-rebalance_partitioned_table1"); + long tableId = createTable(tablePath, partitionedDescriptor, false); + for (int j = 0; j < 2; j++) { + PartitionSpec partitionSpec = + new PartitionSpec(Collections.singletonMap("b", String.valueOf(j))); + admin.createPartition(tablePath, partitionSpec, false).get(); + long partitionId = + admin.listPartitionInfos(tablePath, partitionSpec) + .get() + .get(0) + .getPartitionId(); + FLUSS_CLUSTER_EXTENSION.waitUntilTablePartitionReady(tableId, partitionId); + } + + // verify before rebalance. As we use unbalance assignment, all replicas will not be + // location on servers 3. + for (int i = 0; i < 3; i++) { + ReplicaManager replicaManager = + FLUSS_CLUSTER_EXTENSION.getTabletServerById(i).getReplicaManager(); + assertThat(replicaManager.onlineReplicas().count()).isGreaterThan(0); + assertThat(replicaManager.leaderCount()).isGreaterThan(0); + } + ReplicaManager replicaManager3 = + FLUSS_CLUSTER_EXTENSION.getTabletServerById(3).getReplicaManager(); + assertThat(replicaManager3.onlineReplicas().count()).isEqualTo(0); + assertThat(replicaManager3.leaderCount()).isEqualTo(0); + + // remove tag after crated table. + admin.removeServerTag(Collections.singletonList(3), ServerTag.PERMANENT_OFFLINE).get(); + + // trigger rebalance with goal set[ReplicaDistributionGoal, LeaderReplicaDistributionGoal] + RebalancePlan rebalancePlan = + admin.rebalance( + Arrays.asList( + GoalType.REPLICA_DISTRIBUTION, + GoalType.LEADER_DISTRIBUTION), + false) + .get(); + + retry( + Duration.ofMinutes(2), + () -> { + RebalanceProgress progress = admin.listRebalanceProgress(null).get(); + assertPlan(rebalancePlan, progress); + }); + for (int i = 0; i < 4; i++) { + ReplicaManager replicaManager = + FLUSS_CLUSTER_EXTENSION.getTabletServerById(i).getReplicaManager(); + // average will be 9 + assertThat(replicaManager.onlineReplicas().count()).isBetween(6L, 12L); + long leaderCount = replicaManager.leaderCount(); + // average will be 3 + assertThat(leaderCount).isBetween(1L, 6L); + } + + // add server tag PERMANENT_OFFLINE for server 0, trigger all leader and replica removed + // from server 3. + admin.addServerTag(Collections.singletonList(0), ServerTag.PERMANENT_OFFLINE).get(); + RebalancePlan rebalancePlan2 = + admin.rebalance( + Arrays.asList( + GoalType.REPLICA_DISTRIBUTION, + GoalType.LEADER_DISTRIBUTION), + false) + .get(); + retry( + Duration.ofMinutes(2), + () -> { + RebalanceProgress progress = admin.listRebalanceProgress(null).get(); + assertPlan(rebalancePlan2, progress); + }); + ReplicaManager replicaManager0 = + FLUSS_CLUSTER_EXTENSION.getTabletServerById(0).getReplicaManager(); + assertThat(replicaManager0.onlineReplicas().count()).isEqualTo(0); + assertThat(replicaManager0.leaderCount()).isEqualTo(0); + for (int i = 1; i < 4; i++) { + ReplicaManager replicaManager = + FLUSS_CLUSTER_EXTENSION.getTabletServerById(i).getReplicaManager(); + // average will be 12 + assertThat(replicaManager.onlineReplicas().count()).isBetween(10L, 14L); + long leaderCount = replicaManager.leaderCount(); + // average will be 4 + assertThat(leaderCount).isBetween(2L, 7L); + } + + // clean the server tag. + admin.removeServerTag(Collections.singletonList(0), ServerTag.PERMANENT_OFFLINE).get(); + } + + @Test + void testListRebalanceProcess() throws Exception { + RebalanceProgress rebalanceProgress = admin.listRebalanceProgress(null).get(); + assertThat(rebalanceProgress.progress()).isEqualTo(-1d); + assertThat(rebalanceProgress.status()).isEqualTo(RebalanceStatus.NO_TASK); + assertThat(rebalanceProgress.progressForBucketMap()).isEmpty(); + + String dbName = "db-rebalance-list"; + admin.createDatabase(dbName, DatabaseDescriptor.EMPTY, false).get(); + + // add server tag PERMANENT_OFFLINE for server 3, this will avoid to generate bucket + // assignment on server 3 when create table. + admin.addServerTag(Collections.singletonList(3), ServerTag.PERMANENT_OFFLINE).get(); + + // create some none partitioned log table. + for (int i = 0; i < 6; i++) { + long tableId = + createTable( + new TablePath(dbName, "test-rebalance_table-" + i), + DATA1_TABLE_DESCRIPTOR, + false); + FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId); + } + + // remove tag after crated table. + admin.removeServerTag(Collections.singletonList(3), ServerTag.PERMANENT_OFFLINE).get(); + + // trigger rebalance with goal set[ReplicaDistributionGoal, LeaderReplicaDistributionGoal] + RebalancePlan rebalancePlan = + admin.rebalance( + Arrays.asList( + GoalType.REPLICA_DISTRIBUTION, + GoalType.LEADER_DISTRIBUTION), + false) + .get(); + retry( + Duration.ofMinutes(2), + () -> { + RebalanceProgress progress = + admin.listRebalanceProgress(rebalancePlan.getRebalanceId()).get(); + assertThat(progress.progress()).isEqualTo(1d); + assertThat(progress.status()).isEqualTo(RebalanceStatus.COMPLETED); + Map<TableBucket, RebalanceResultForBucket> processForBuckets = + progress.progressForBucketMap(); + Map<TableBucket, RebalancePlanForBucket> planForBuckets = + rebalancePlan.getPlanForBucketMap(); + assertThat(planForBuckets.size()).isEqualTo(processForBuckets.size()); + for (TableBucket tableBucket : planForBuckets.keySet()) { + RebalanceResultForBucket processForBucket = + processForBuckets.get(tableBucket); + assertThat(processForBucket.status()).isEqualTo(RebalanceStatus.COMPLETED); + assertThat(processForBucket.plan()) + .isEqualTo(planForBuckets.get(tableBucket)); + } + }); + + // cancel rebalance. + admin.cancelRebalance(rebalancePlan.getRebalanceId()).get(); + + RebalanceProgress progress = + admin.listRebalanceProgress(rebalancePlan.getRebalanceId()).get(); + assertThat(progress.progress()).isEqualTo(1d); + assertThat(progress.status()).isEqualTo(RebalanceStatus.CANCELED); Review Comment: why a compelted rebalacne can be canceled? Acatually, all finished (cancelled, completed, failed) status, can not be changed anymore. ########## fluss-rpc/src/main/proto/FlussApi.proto: ########## @@ -598,17 +598,22 @@ message RebalanceRequest { } message RebalanceResponse { - repeated PbRebalancePlanForTable table_plan = 1; + required string rebalance_id = 1; + repeated PbRebalancePlanForTable table_plan = 2; } message ListRebalanceProgressRequest { + optional string rebalance_id = 1; } message ListRebalanceProgressResponse { - repeated PbRebalanceProgressForTable table_progress = 1; + required int32 rebalance_status = 1; + optional string rebalance_id = 2; Review Comment: nit: move `rebalance_id` to the first field to keep align with other rebalance messages. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ReBalancingAction.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance; + +import org.apache.fluss.metadata.TableBucket; + +/** Represents the load rebalancing operation over a replica for Fluss Load GoalOptimizer. */ +public class ReBalancingAction { Review Comment: ```suggestion public class RebalancingAction { ``` ########## fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatus.java: ########## @@ -19,17 +19,26 @@ import org.apache.fluss.annotation.PublicEvolving; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + /** * Rebalance status. * * @since 0.9 */ @PublicEvolving public enum RebalanceStatus { + NO_TASK(0), Review Comment: Is the `NO_TASK` means there is no rebalance in the cluster? Using a `NO_TASK` to indicate this looks hack to me, because `RebalanceStatus` is a status of rebalance, if there is no rebalance, then there is no rebalance status. Can we use `null` for no rebalance in `RebalanceManager`? And maybe we can change `Admin`: ```java CompletableFuture<RebalanceProgress> listRebalanceProgress(@Nullable String rebalanceId); ``` to ``` CompletableFuture<Optional<RebalanceProgress>> listRebalanceProgress(@Nullable String rebalanceId); ``` to represent there is no rebalance progress, instead of using a nullable reblance id or status. ########## fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java: ########## @@ -900,6 +905,78 @@ void testSchemaChange() throws Exception { 3, new TableMetadata(tableInfo2, Collections.emptyList()))); } + @Test + void testDoBucketReassignment() throws Exception { + zookeeperClient.registerTabletServer( + 3, + new TabletServerRegistration( + "rack3", + Collections.singletonList( + new Endpoint("host3", 1001, DEFAULT_LISTENER_NAME)), + System.currentTimeMillis())); + + initCoordinatorChannel(); + TablePath t1 = TablePath.of(defaultDatabase, "test_bucket_reassignment_table"); + // Mock un-balanced table assignment. + Map<Integer, BucketAssignment> bucketAssignments = new HashMap<>(); + bucketAssignments.put(0, BucketAssignment.of(0, 1, 3)); + TableAssignment tableAssignment = new TableAssignment(bucketAssignments); + long t1Id = + metadataManager.createTable( + t1, CoordinatorEventProcessorTest.TEST_TABLE, tableAssignment, false); + TableBucket tb0 = new TableBucket(t1Id, 0); + verifyIsr(tb0, 0, Arrays.asList(0, 1, 3)); + + // trigger bucket reassignment for tb0: + // bucket0 -> (0, 1, 2) + Map<TableBucket, RebalancePlanForBucket> rebalancePlan = new HashMap<>(); + RebalancePlanForBucket planForBucket0 = + new RebalancePlanForBucket( + tb0, 0, 0, Arrays.asList(0, 1, 3), Arrays.asList(0, 1, 2)); + + rebalancePlan.put(tb0, planForBucket0); + // try to execute. + eventProcessor + .getRebalanceManager() + .registerRebalance("rebalance-task-jdsds1", rebalancePlan); + + // Mock to finish rebalance tasks, in production case, this need to be trigged by receiving + // AdjustIsrRequest. + Map<TableBucket, LeaderAndIsr> leaderAndIsrMap = new HashMap<>(); + CompletableFuture<AdjustIsrResponse> respCallback = new CompletableFuture<>(); + + // This isr list equals originReplicas + addingReplicas. the bucket epoch is 1. + leaderAndIsrMap.put(tb0, new LeaderAndIsr(0, 0, Arrays.asList(0, 1, 2, 3), 0, 1)); + eventProcessor + .getCoordinatorEventManager() + .put(new AdjustIsrReceivedEvent(leaderAndIsrMap, respCallback)); + respCallback.get(); + verifyIsr(tb0, 0, Arrays.asList(0, 1, 2)); + + // clean up the tablet server 3 + ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupPath(ZkData.ServerIdZNode.path(3)); + } + + private void verifyIsr(TableBucket tb, int expectedLeader, List<Integer> expectedIsr) + throws Exception { + LeaderAndIsr leaderAndIsr = + waitValue( + () -> fromCtx((ctx) -> ctx.getBucketLeaderAndIsr(tb)), + Duration.ofMinutes(1), + "leader not elected"); + LeaderAndIsr newLeaderAndIsrOfZk = zookeeperClient.getLeaderAndIsr(tb).get(); + retry( + Duration.ofMinutes(1), + () -> { + assertThat(leaderAndIsr.leader()) + .isEqualTo(newLeaderAndIsrOfZk.leader()) + .isEqualTo(expectedLeader); + assertThat(leaderAndIsr.isr()) + .isEqualTo(newLeaderAndIsrOfZk.isr()) + .hasSameElementsAs(expectedIsr); + }); + } Review Comment: There is no need to `retry`, as the `newLeaderAndIsrOfZk` has been determinied before the `retry`, so no matter how many times retried, the result is the same. ########## fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java: ########## @@ -1790,6 +1800,109 @@ public static List<PbDescribeConfig> toPbConfigEntries(List<ConfigEntry> describ .collect(Collectors.toList()); } + public static RebalanceResponse makeRebalanceRespose(RebalancePlan rebalancePlan) { Review Comment: ```suggestion public static RebalanceResponse makeRebalanceResponse(RebalancePlan rebalancePlan) { ``` ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java: ########## @@ -1112,6 +1161,354 @@ private RemoveServerTagResponse processRemoveServerTag(RemoveServerTagEvent even return removeServerTagResponse; } + private RebalanceResponse processRebalance(RebalanceEvent rebalanceEvent) { + boolean isDryRun = rebalanceEvent.isDryRun(); + RebalancePlan rebalancePlan; + long startTime = System.currentTimeMillis(); + try { + rebalancePlan = + rebalanceManager.generateRebalancePlan(rebalanceEvent.getGoalsByPriority()); + } catch (Exception e) { + throw new RebalanceFailureException("Failed to generate rebalance plan.", e); + } + + if (!isDryRun) { + if (rebalanceManager.hasInProgressRebalance()) { + throw new RebalanceFailureException( + "Rebalance task already exists. Please wait for it to finish or cancel it first."); + } + + // 2. execute rebalance plan. + Map<TableBucket, RebalancePlanForBucket> executePlan = rebalancePlan.getExecutePlan(); + rebalanceManager.registerRebalance(rebalancePlan.getRebalanceId(), executePlan); + } + + LOG.info( + "Generate Rebalance plan rebalanace id {} with {} ms.", Review Comment: ```suggestion "Generate Rebalance plan rebalance id {} with {} ms.", ``` ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/LeaderReplicaDistributionGoal.java: ########## @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance; +import org.apache.fluss.server.coordinator.rebalance.ActionType; +import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction; +import org.apache.fluss.server.coordinator.rebalance.model.BucketModel; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModelStats; +import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; +import org.apache.fluss.server.coordinator.rebalance.model.StatisticType; +import org.apache.fluss.utils.MathUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT; +import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.REPLICA_REJECT; +import static org.apache.fluss.utils.MathUtils.EPSILON; +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** + * Soft goal to generate leadership movement and leader replica movement proposals to ensure that + * the number of leader replicas on each server is. + * + * <ul> + * <li>Under: (the average number of leader replicas per server) * (1 + leader replica count + * balance percentage) + * <li>Above: (the average number of leader replicas per server) * Math.max(0, 1 - leader replica + * count balance percentage) + * </ul> + */ +public class LeaderReplicaDistributionGoal extends ReplicaDistributionAbstractGoal { + + private static final Logger LOG = LoggerFactory.getLogger(LeaderReplicaDistributionGoal.class); + + /** + * The maximum allowed extent of unbalance for leader replica distribution. For example, 1.10 + * means the highest leader replica count of a server should not be 1.10x of average leader + * replica count of all alive tabletServers. + */ + private static final Double LEADER_REPLICA_COUNT_REBALANCE_THRESHOLD = 1.10d; + + @Override + public ActionAcceptance actionAcceptance(ReBalancingAction action, ClusterModel clusterModel) { + ServerModel sourceServer = clusterModel.server(action.getSourceServerId()); + checkNotNull( + sourceServer, "Source server " + action.getSourceServerId() + " is not found."); + ReplicaModel sourceReplica = sourceServer.replica(action.getTableBucket()); + checkNotNull(sourceReplica, "Source replica " + action.getTableBucket() + " is not found."); + ServerModel destServer = clusterModel.server(action.getDestinationServerId()); + switch (action.getActionType()) { + case LEADERSHIP_MOVEMENT: + return isLeaderMovementSatisfiable(sourceServer, destServer); + case REPLICA_MOVEMENT: + if (sourceReplica.isLeader()) { + return isLeaderMovementSatisfiable(sourceServer, destServer); + } + return ACCEPT; + default: + throw new IllegalArgumentException( + "Unsupported action type " + action.getActionType()); + } + } + + @Override + protected void rebalanceForServer( + ServerModel server, ClusterModel clusterModel, Set<Goal> optimizedGoals) + throws RebalanceFailureException { + LOG.debug( + "Rebalancing server {} [limits] lower: {} upper: {}.", + server.id(), + rebalanceLowerLimit, + rebalanceUpperLimit); + int numLeaderReplicas = server.leaderReplicas().size(); + boolean isExcludedForReplicaMove = isExcludedForReplicaMove(server); + boolean requireLessLeaderReplicas = + numLeaderReplicas > (isExcludedForReplicaMove ? 0 : rebalanceUpperLimit) + || !server.isAlive(); + boolean requireMoreLeaderReplicas = + !isExcludedForReplicaMove + && server.isAlive() + && numLeaderReplicas < rebalanceLowerLimit; + // Update server ids over the balance limit for logging purposes. + if (((requireLessLeaderReplicas + && rebalanceByMovingLeadershipOut(server, clusterModel, optimizedGoals))) + && rebalanceByMovingReplicasOut(server, clusterModel, optimizedGoals)) { + serverIdsAboveRebalanceUpperLimit.add(server.id()); + LOG.debug( + "Failed to sufficiently decrease leader replica count in server {}. Leader replicas: {}.", + server.id(), + server.leaderReplicas().size()); + } else if (requireMoreLeaderReplicas + && rebalanceByMovingLeadershipIn(server, clusterModel, optimizedGoals) + && rebalanceByMovingLeaderReplicasIn(server, clusterModel, optimizedGoals)) { + serverIdsBelowRebalanceLowerLimit.add(server.id()); + LOG.debug( + "Failed to sufficiently increase leader replica count in server {}. Leader replicas: {}.", + server.id(), + server.leaderReplicas().size()); + } + } + + @Override + public ClusterModelStatsComparator clusterModelStatsComparator() { + return new LeaderReplicaDistributionGoalStatsComparator(); + } + + @Override + int numInterestedReplicas(ClusterModel clusterModel) { + return clusterModel.numLeaderReplicas(); + } + + @Override + double balancePercentage() { + return LEADER_REPLICA_COUNT_REBALANCE_THRESHOLD; + } + + private ActionAcceptance isLeaderMovementSatisfiable( + ServerModel sourceServer, ServerModel destServer) { + return (isReplicaCountUnderBalanceUpperLimitAfterChange( + destServer, destServer.leaderReplicas().size()) + && (isExcludedForReplicaMove(sourceServer) + || isReplicaCountAboveBalanceLowerLimitAfterChange( + sourceServer, sourceServer.leaderReplicas().size()))) + ? ACCEPT + : REPLICA_REJECT; + } + + private boolean rebalanceByMovingLeadershipOut( + ServerModel server, ClusterModel cluster, Set<Goal> optimizedGoals) { + // If the source server is excluded for replica move, set its upper limit to 0. + int balanceUpperLimitForSourceServer = + isExcludedForReplicaMove(server) ? 0 : rebalanceUpperLimit; + int numLeaderReplicas = server.leaderReplicas().size(); + for (ReplicaModel leader : new HashSet<>(server.leaderReplicas())) { + BucketModel bucketModel = cluster.bucket(leader.tableBucket()); + checkNotNull(bucketModel, "Bucket " + leader.tableBucket() + " is not found."); + Set<ServerModel> candidateServers = + bucketModel.bucketServers().stream() + .filter(b -> b != server) + .collect(Collectors.toSet()); + ServerModel b = + maybeApplyBalancingAction( + cluster, + leader, + candidateServers, + ActionType.LEADERSHIP_MOVEMENT, + optimizedGoals); + // Only check if we successfully moved something. + if (b != null) { + if (--numLeaderReplicas <= balanceUpperLimitForSourceServer) { + return false; + } + } + } + return true; + } + + private boolean rebalanceByMovingLeadershipIn( + ServerModel server, ClusterModel cluster, Set<Goal> optimizedGoals) { + int numLeaderReplicas = server.leaderReplicas().size(); + Set<ServerModel> candidateServers = Collections.singleton(server); + for (ReplicaModel replica : server.replicas()) { + if (replica.isLeader()) { + continue; + } + + BucketModel bucket = cluster.bucket(replica.tableBucket()); + checkNotNull(bucket, "Bucket " + replica.tableBucket() + " is not found."); + ServerModel b = + maybeApplyBalancingAction( + cluster, + Objects.requireNonNull(bucket.leader()), + candidateServers, + ActionType.LEADERSHIP_MOVEMENT, + optimizedGoals); + // Only check if we successfully moved something. + if (b != null) { + if (++numLeaderReplicas >= rebalanceLowerLimit) { + return false; + } + } + } + return true; + } + + private boolean rebalanceByMovingReplicasOut( + ServerModel server, ClusterModel cluster, Set<Goal> optimizedGoals) { + // Get the eligible servers. + SortedSet<ServerModel> candidateServers; + candidateServers = + new TreeSet<>( + Comparator.comparingInt((ServerModel b) -> b.leaderReplicas().size()) + .thenComparingInt(ServerModel::id)); + candidateServers.addAll( + cluster.aliveServers().stream() + .filter(b -> b.leaderReplicas().size() < rebalanceUpperLimit) + .collect(Collectors.toSet())); + + int balanceUpperLimit = rebalanceUpperLimit; Review Comment: should consider `isExcludedForReplicaMove(server)` as well? ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionAbstractGoal.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT; +import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.aliveServersNotExcludeForReplicaMove; + +/** An abstract class for goals that are based on the distribution of replicas. */ +public abstract class ReplicaDistributionAbstractGoal extends AbstractGoal { + private static final Logger LOG = + LoggerFactory.getLogger(ReplicaDistributionAbstractGoal.class); + private static final double BALANCE_MARGIN = 0.9; + protected final Set<Integer> serverIdsAboveRebalanceUpperLimit; + protected final Set<Integer> serverIdsBelowRebalanceLowerLimit; + protected double avgReplicasOnAliveServer; + protected int rebalanceUpperLimit; + protected int rebalanceLowerLimit; + // This is used to identify servers not excluded for replica moves. + protected Set<Integer> serversAllowedReplicaRemove; + + public ReplicaDistributionAbstractGoal() { + serverIdsAboveRebalanceUpperLimit = new HashSet<>(); + serverIdsBelowRebalanceLowerLimit = new HashSet<>(); + } + + private int rebalanceUpperLimit(double balancePercentage) { + return (int) + Math.ceil( + avgReplicasOnAliveServer + * (1 + adjustedRebalancePercentage(balancePercentage))); + } + + private int rebalanceLowerLimit(double balancePercentage) { + return (int) + Math.floor( + avgReplicasOnAliveServer + * Math.max( + 0, (1 - adjustedRebalancePercentage(balancePercentage)))); + } + + private double adjustedRebalancePercentage(double rebalancePercentage) { + return (rebalancePercentage - 1) * BALANCE_MARGIN; + } + + boolean isReplicaCountUnderBalanceUpperLimitAfterChange( + ServerModel server, int currentReplicaCount) { + int serverBalanceUpperLimit = server.isAlive() ? rebalanceUpperLimit : 0; + return currentReplicaCount + 1 <= serverBalanceUpperLimit; + } + + boolean isReplicaCountAboveBalanceLowerLimitAfterChange( + ServerModel server, int currentReplicaCount) { + int serverBalanceLowerLimit = server.isAlive() ? rebalanceLowerLimit : 0; + return currentReplicaCount - 1 >= serverBalanceLowerLimit; + } + + @Override + protected void initGoalState(ClusterModel clusterModel) throws RebalanceFailureException { + serversAllowedReplicaRemove = aliveServersNotExcludeForReplicaMove(clusterModel); + if (serversAllowedReplicaRemove.isEmpty()) { + throw new RebalanceFailureException( + String.format( + "[%s] All alive tabletServers are excluded from replica moves.", + name())); + } + + // Initialize the average replicas on an alive server. + avgReplicasOnAliveServer = + numInterestedReplicas(clusterModel) / (double) serversAllowedReplicaRemove.size(); + + rebalanceUpperLimit = rebalanceUpperLimit(balancePercentage()); + rebalanceLowerLimit = rebalanceLowerLimit(balancePercentage()); + } + + @Override + protected boolean selfSatisfied(ClusterModel clusterModel, ReBalancingAction action) { + // Check that destination and source would not become unbalanced. + return actionAcceptance(action, clusterModel) == ACCEPT; + } + + @Override + protected void updateGoalState(ClusterModel clusterModel) throws RebalanceFailureException { + if (!serverIdsAboveRebalanceUpperLimit.isEmpty()) { + LOG.debug( + "Replicas count on server ids:{} {} above the balance limit of {} after rebalance.", + serverIdsAboveRebalanceUpperLimit, + (serverIdsAboveRebalanceUpperLimit.size() > 1) ? "are" : "is", + rebalanceUpperLimit); + serverIdsAboveRebalanceUpperLimit.clear(); + succeeded = false; + } + + if (!serverIdsBelowRebalanceLowerLimit.isEmpty()) { + LOG.debug( + "Replicas count on server ids:{} {} below the balance limit of {} after rebalance.", + serverIdsBelowRebalanceLowerLimit, + (serverIdsBelowRebalanceLowerLimit.size() > 1) ? "are" : "is", + rebalanceLowerLimit); + serverIdsBelowRebalanceLowerLimit.clear(); + succeeded = false; + } + + // TODO maybe need check offline server. + + finish(); + } + + abstract int numInterestedReplicas(ClusterModel clusterModel); + + /** + * @return The requested balance threshold. + */ + abstract double balancePercentage(); + + protected boolean isExcludedForReplicaMove(ServerModel server) { Review Comment: This method looks strange to me, IIUC, this checks whether the server is tagged offline, and should move all replicas to other nodes, so it is not excluded for replica move, because there is replica move for it. A better way is to rename it into `isOfflineTagged(server)` and add javadocs on it to explain we need to move all replicas to other nodes for such node. If we do such renaming, we should also rename `serversAllowedReplicaRemove` -> `serversAllowedNewReplicas` or `aliveServers` `aliveServersNotExcludeForReplicaMove()` -> `aliveServers()` ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModel.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.model; + +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePartition; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** A class that holds the information of the tabletServer for rebalance. */ +public class ServerModel implements Comparable<ServerModel> { + + private final int serverId; + private final boolean isAlive; + private final String rack; + private final Set<ReplicaModel> replicas; + /** A map for tracking (tableId) -> (BucketId -> replica) for none-partitioned table. */ + private final Map<Long, Map<Integer, ReplicaModel>> tableReplicas; + + /** A map for tracking (tableId, partitionId) -> (BucketId -> replica) for partitioned table. */ + private final Map<TablePartition, Map<Integer, ReplicaModel>> tablePartitionReplicas; + + public ServerModel(int serverId, String rack, boolean isAlive) { + this.serverId = serverId; + this.rack = rack; + this.isAlive = isAlive; + this.replicas = new HashSet<>(); + this.tableReplicas = new HashMap<>(); + this.tablePartitionReplicas = new HashMap<>(); + } + + public int id() { + return serverId; + } + + public String rack() { + return rack; + } + + public boolean isAlive() { + return isAlive; + } + + public Set<ReplicaModel> replicas() { + return new HashSet<>(replicas); + } + + public Set<ReplicaModel> leaderReplicas() { + return replicas.stream().filter(ReplicaModel::isLeader).collect(Collectors.toSet()); + } Review Comment: These 2 method are invoked many times and are time consuming, because of hash set building (heavy equals and hashcode). So, providing `numReplicas` and `numLeaderReplicas` would improve performance a lot. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java: ########## @@ -616,13 +613,23 @@ private Optional<ElectionResult> electLeader( } Optional<ElectionResult> resultOpt = Optional.empty(); - if (electionStrategy == DEFAULT_ELECTION) { - resultOpt = defaultReplicaLeaderElection(assignment, liveReplicas, leaderAndIsr); - } else if (electionStrategy == CONTROLLED_SHUTDOWN_ELECTION) { + if (electionStrategy instanceof DefaultLeaderElection) { + resultOpt = + ((DefaultLeaderElection) electionStrategy) Review Comment: Could you create an issue to refactor ReplicaLeaderElection into interface with method `leaderElection`? ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ListRebalanceProcessProcedure.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.procedure; + +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.cluster.rebalance.RebalanceProgress; +import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceStatus; +import org.apache.fluss.metadata.TableBucket; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +import javax.annotation.Nullable; + +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Procedure to list rebalance progress. + * + * <p>This procedure allows querying rebalance progress. See {@link + * Admin#listRebalanceProgress(String)} for more details. + * + * <p>Usage examples: + * + * <pre> + * -- List the rebalance progress without rebalance id + * CALL sys.list_rebalance_process(); + * + * -- List the rebalance progress with rebalance id + * CALL sys.list_rebalance_process('xxx_xxx_xxx'); + * </pre> + */ +public class ListRebalanceProcessProcedure extends ProcedureBase { + + @ProcedureHint( + argument = { + @ArgumentHint( + name = "rebalanceId", + type = @DataTypeHint("STRING"), + isOptional = true) + }) + public String[] call(ProcedureContext context, @Nullable String rebalanceId) throws Exception { + RebalanceProgress progress = admin.listRebalanceProgress(rebalanceId).get(); + return progressToString(progress); + } + + private static String[] progressToString(RebalanceProgress progress) { + RebalanceStatus status = progress.status(); + double rebalanceProgress = progress.progress(); + Map<TableBucket, RebalanceResultForBucket> bucketMap = progress.progressForBucketMap(); + + List<String> result = new ArrayList<>(); + if (progress.rebalanceId() != null) { + result.add("Rebalance id: " + progress.rebalanceId()); + } + result.add("Reblance total status: " + status); + result.add("Rebalance progress: " + formatAsPercentage(rebalanceProgress)); + result.add("Rebalance detail progress for bucket:"); + for (RebalanceResultForBucket resultForBucket : bucketMap.values()) { Review Comment: create a issue ticket to change the result format into a table format, like ``` rebalance id | rebalance status | rebalance plan xxxx | completed | {...} ``` This will be easier for us to support multiple rebalance ids in the future (list rebalance history if we support to store mulitple rebalacne result). And the plan can be printed using json format for user readability. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoal.java: ########## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance; +import org.apache.fluss.server.coordinator.rebalance.ActionType; +import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModelStats; +import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; +import org.apache.fluss.server.coordinator.rebalance.model.StatisticType; +import org.apache.fluss.utils.MathUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT; +import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.REPLICA_REJECT; +import static org.apache.fluss.utils.MathUtils.EPSILON; +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** + * Soft goal to generate replica movement proposals to ensure that the number of replicas on each + * server is. + * + * <ul> + * <li>Under: (the average number of replicas per server) * (1 + replica count balance percentage) + * <li>Above: (the average number of replicas per server) * Math.max(0, 1 - replica count balance + * percentage) + * </ul> + */ +public class ReplicaDistributionGoal extends ReplicaDistributionAbstractGoal { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicaDistributionGoal.class); + + // TODO configurable. + /** + * The maximum allowed extent of unbalance for replica leader replica distribution. For example, + * 1.10 means the highest leader replica count of a server should not be 1.10x of average leader + * replica count of all alive tabletServers. + */ + private static final Double REPLICA_COUNT_REBALANCE_THRESHOLD = 1.10d; + + @Override + public ActionAcceptance actionAcceptance(ReBalancingAction action, ClusterModel clusterModel) { + switch (action.getActionType()) { + case LEADERSHIP_MOVEMENT: + return ACCEPT; + case REPLICA_MOVEMENT: + ServerModel sourceServer = clusterModel.server(action.getSourceServerId()); + ServerModel destServer = clusterModel.server(action.getDestinationServerId()); + + checkNotNull( + sourceServer, + "Source server " + action.getSourceServerId() + " is not found."); + checkNotNull( + destServer, + "Destination server " + action.getDestinationServerId() + " is not found."); + + // Check that destination and source would not become unbalanced. + return (isReplicaCountUnderBalanceUpperLimitAfterChange( + destServer, destServer.replicas().size())) + && (isExcludedForReplicaMove(sourceServer) + || isReplicaCountAboveBalanceLowerLimitAfterChange( + sourceServer, sourceServer.replicas().size())) + ? ACCEPT + : REPLICA_REJECT; + default: + throw new IllegalArgumentException( + "Unsupported balancing action " + action.getActionType() + " is provided."); + } + } + + @Override + protected void rebalanceForServer( + ServerModel server, ClusterModel clusterModel, Set<Goal> optimizedGoals) + throws RebalanceFailureException { + LOG.debug( + "Rebalancing server {} [limits] lower: {} upper: {}.", + server.id(), + rebalanceLowerLimit, + rebalanceUpperLimit); + int numReplicas = server.replicas().size(); + boolean isExcludeForReplicaMove = isExcludedForReplicaMove(server); + + boolean requireLessReplicas = + numReplicas > rebalanceUpperLimit || isExcludeForReplicaMove || !server.isAlive(); + boolean requireMoreReplicas = + !isExcludeForReplicaMove && server.isAlive() && numReplicas < rebalanceLowerLimit; + if (!requireMoreReplicas && !requireLessReplicas) { + // return if the server is already within the limit. + return; + } + + if (requireLessReplicas + && rebalanceByMovingReplicasOut(server, clusterModel, optimizedGoals)) { + serverIdsAboveRebalanceUpperLimit.add(server.id()); + LOG.debug( + "Failed to sufficiently decrease replica count in server {} with replica movements. " + + "Replicas number after remove: {}.", + server.id(), + server.replicas().size()); + } + + if (requireMoreReplicas + && rebalanceByMovingReplicasIn(server, clusterModel, optimizedGoals)) { + serverIdsBelowRebalanceLowerLimit.add(server.id()); + LOG.debug( + "Failed to sufficiently increase replica count in server {} with replica movements. " + + "Replicas number after remove: {}.", + server.id(), + server.replicas().size()); + } + + if (!serverIdsAboveRebalanceUpperLimit.contains(server.id()) + && !serverIdsBelowRebalanceLowerLimit.contains(server.id())) { + LOG.debug( + "Successfully balanced replica count for server {} by moving replicas. " + + "Replicas number after remove: {}", + server.id(), + server.replicas().size()); + } + } + + @Override + public ClusterModelStatsComparator clusterModelStatsComparator() { + return new ReplicaDistributionGoalStatsComparator(); + } + + @Override + int numInterestedReplicas(ClusterModel clusterModel) { + return clusterModel.numReplicas(); + } + + @Override + double balancePercentage() { + return REPLICA_COUNT_REBALANCE_THRESHOLD; + } + + private boolean rebalanceByMovingReplicasOut( + ServerModel server, ClusterModel cluster, Set<Goal> optimizedGoals) { + SortedSet<ServerModel> candidateServers = + new TreeSet<>( + Comparator.comparingInt((ServerModel b) -> b.replicas().size()) + .thenComparingInt(ServerModel::id)); + + candidateServers.addAll( + cluster.aliveServers().stream() + .filter(b -> b.replicas().size() < rebalanceUpperLimit) + .collect(Collectors.toSet())); + int balanceUpperLimitForSourceServer = + isExcludedForReplicaMove(server) ? 0 : rebalanceUpperLimit; + + // Now let's do the replica out operation. + // TODO maybe use a sorted replicas set + for (ReplicaModel replica : server.replicas()) { + ServerModel b = + maybeApplyBalancingAction( + cluster, + replica, + candidateServers, + ActionType.REPLICA_MOVEMENT, + optimizedGoals); + // Only check if we successfully moved something. + if (b != null) { + if (server.replicas().size() <= balanceUpperLimitForSourceServer) { + return false; + } + + // Remove and reinsert the server so the order is correct. + candidateServers.remove(b); + if (b.replicas().size() < rebalanceUpperLimit) { + candidateServers.add(b); + } + } + } + + return !server.replicas().isEmpty(); + } + + private boolean rebalanceByMovingReplicasIn( + ServerModel aliveDestServer, ClusterModel cluster, Set<Goal> optimizedGoals) { + PriorityQueue<ServerModel> eligibleServers = + new PriorityQueue<>( + (b1, b2) -> { + // Servers are sorted by (1) all replica count then (2) server id. + int resultByAllReplicas = + Integer.compare(b2.replicas().size(), b1.replicas().size()); + return resultByAllReplicas == 0 + ? Integer.compare(b1.id(), b2.id()) + : resultByAllReplicas; + }); + + // Source server can be offline, alive. + for (ServerModel sourceServer : cluster.servers()) { + if (sourceServer.replicas().size() > rebalanceLowerLimit + || isExcludedForReplicaMove(sourceServer)) { + eligibleServers.add(sourceServer); + } + } + + List<ServerModel> candidateServers = Collections.singletonList(aliveDestServer); + while (!eligibleServers.isEmpty()) { + ServerModel sourceServer = eligibleServers.poll(); + // TODO maybe use a sorted replicas set + for (ReplicaModel replica : sourceServer.replicas()) { + ServerModel b = + maybeApplyBalancingAction( + cluster, + replica, + candidateServers, + ActionType.REPLICA_MOVEMENT, + optimizedGoals); + // Only need to check status if the action is taken. This will also handle the case + // that the source server has nothing to move in. In that case we will never + // re-enqueue that source server. + if (b != null) { + if (aliveDestServer.replicas().size() >= rebalanceLowerLimit) { + // Note that the server passed to this method is always alive; hence, there + // is no need to check if it is dead. + return false; + } + + if (!eligibleServers.isEmpty()) { + if (sourceServer.replicas().size() + < eligibleServers.peek().replicas().size()) { Review Comment: should use `>` and add comment ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java: ########## @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceProgress; +import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceStatus; +import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.exception.NoRebalanceInProgressException; +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.coordinator.CoordinatorContext; +import org.apache.fluss.server.coordinator.CoordinatorEventProcessor; +import org.apache.fluss.server.coordinator.rebalance.goal.Goal; +import org.apache.fluss.server.coordinator.rebalance.goal.GoalOptimizer; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.RackModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; +import org.apache.fluss.server.metadata.ServerInfo; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.server.zk.data.RebalancePlan; +import org.apache.fluss.utils.MapUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.ArrayDeque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.CANCELED; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NOT_STARTED; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NO_TASK; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.REBALANCING; +import static org.apache.fluss.utils.Preconditions.checkArgument; +import static org.apache.fluss.utils.Preconditions.checkNotNull; +import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock; +import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock; + +/** + * A rebalance manager to generate rebalance plan, and execution rebalance plan. + * + * <p>This manager can only be used in {@link CoordinatorEventProcessor} as a single threaded model. + */ +@ThreadSafe +public class RebalanceManager { + + private static final Logger LOG = LoggerFactory.getLogger(RebalanceManager.class); + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); Review Comment: We don't need lock in the `RebalanceManager`, because all the methods of `RebalanceManager` are invoked in `CoordinatorEventProcessor` which is a single thread. So it's thread-safe. Just add a note in the javadoc of `RebalanceManager` that this instance should only be called from `CoordinatorEventProcessor`. The lock in `RebalanceManager` will introduce many problems, because I see many ZK operations while holding locks, this may result in dead-lock or performance issues. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
