wuchong commented on code in PR #1452:
URL: https://github.com/apache/fluss/pull/1452#discussion_r2671139339


##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
+import org.apache.fluss.cluster.rebalance.RebalanceProgress;
+import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
+import org.apache.fluss.cluster.rebalance.RebalanceStatus;
+import org.apache.fluss.cluster.rebalance.ServerTag;
+import org.apache.fluss.exception.NoRebalanceInProgressException;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.coordinator.CoordinatorContext;
+import org.apache.fluss.server.coordinator.CoordinatorEventProcessor;
+import org.apache.fluss.server.coordinator.rebalance.goal.Goal;
+import org.apache.fluss.server.coordinator.rebalance.goal.GoalOptimizer;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.RackModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
+import org.apache.fluss.server.metadata.ServerInfo;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.LeaderAndIsr;
+import org.apache.fluss.server.zk.data.RebalanceTask;
+import org.apache.fluss.utils.MapUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.CANCELED;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED;
+import static 
org.apache.fluss.cluster.rebalance.RebalanceStatus.FINAL_STATUSES;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NOT_STARTED;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.REBALANCING;
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * A rebalance manager to generate rebalance plan, and execution rebalance 
plan.
+ *
+ * <p>This manager can only be used in {@link CoordinatorEventProcessor} as a 
single threaded model.
+ */
+public class RebalanceManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RebalanceManager.class);
+
+    private final ZooKeeperClient zkClient;
+    private final CoordinatorEventProcessor eventProcessor;
+
+    /** A queue of in progress table bucket to rebalance. */
+    private final Queue<TableBucket> inProgressRebalanceTasksQueue = new 
ArrayDeque<>();
+
+    /** A mapping from table bucket to rebalance status of pending and running 
tasks. */
+    private final Map<TableBucket, RebalanceResultForBucket> 
inProgressRebalanceTasks =
+            MapUtils.newConcurrentHashMap();
+
+    /** A mapping from table bucket to rebalance status of failed or completed 
tasks. */
+    private final Map<TableBucket, RebalanceResultForBucket> 
finishedRebalanceTasks =
+            MapUtils.newConcurrentHashMap();
+
+    private final GoalOptimizer goalOptimizer;
+    private volatile long registerTime;
+    private volatile @Nullable RebalanceStatus rebalanceStatus;
+    private volatile @Nullable String currentRebalanceId;
+    private volatile boolean isClosed = false;
+
+    public RebalanceManager(CoordinatorEventProcessor eventProcessor, 
ZooKeeperClient zkClient) {
+        this.eventProcessor = eventProcessor;
+        this.zkClient = zkClient;
+        this.goalOptimizer = new GoalOptimizer();
+    }
+
+    public void startup() {
+        LOG.info("Start up rebalance manager.");
+        initialize();
+    }
+
+    public @Nullable String getRebalanceId() {
+        return currentRebalanceId;
+    }
+
+    private void initialize() {
+        try {
+            zkClient.getRebalancePlan()
+                    .ifPresent(
+                            rebalancePlan ->
+                                    registerRebalance(
+                                            rebalancePlan.getRebalanceId(),
+                                            rebalancePlan.getExecutePlan(),
+                                            
rebalancePlan.getRebalanceStatus()));
+        } catch (Exception e) {
+            LOG.error(
+                    "Failed to get rebalance plan from zookeeper, it will be 
treated as no"
+                            + "rebalance tasks.",
+                    e);
+        }
+    }
+
+    public void registerRebalance(
+            String rebalanceId,
+            Map<TableBucket, RebalancePlanForBucket> rebalancePlan,
+            RebalanceStatus newStatus) {
+        checkNotClosed();
+        registerTime = System.currentTimeMillis();
+        // first clear all exists tasks.
+        inProgressRebalanceTasks.clear();
+        inProgressRebalanceTasksQueue.clear();
+        finishedRebalanceTasks.clear();
+
+        currentRebalanceId = rebalanceId;
+        rebalancePlan.forEach(
+                ((tableBucket, planForBucket) -> {
+                    if (FINAL_STATUSES.contains(newStatus)) {
+                        finishedRebalanceTasks.put(
+                                tableBucket, 
RebalanceResultForBucket.of(planForBucket, newStatus));
+                    } else {
+                        inProgressRebalanceTasksQueue.add(tableBucket);
+                        inProgressRebalanceTasks.put(
+                                tableBucket,
+                                RebalanceResultForBucket.of(planForBucket, 
NOT_STARTED));
+                    }
+                }));
+
+        if (!inProgressRebalanceTasksQueue.isEmpty()) {
+            // Trigger one rebalance task to execute.
+            rebalanceStatus = REBALANCING;
+            processNewRebalanceTask();
+        } else {
+            rebalanceStatus = newStatus;
+        }
+    }
+
+    public void finishRebalanceTask(TableBucket tableBucket, RebalanceStatus 
statusForBucket) {
+        checkNotClosed();
+        if (inProgressRebalanceTasksQueue.contains(tableBucket)) {
+            inProgressRebalanceTasksQueue.remove(tableBucket);
+            RebalanceResultForBucket resultForBucket = 
inProgressRebalanceTasks.remove(tableBucket);
+            checkNotNull(resultForBucket, "RebalanceResultForBucket is null.");
+            finishedRebalanceTasks.put(
+                    tableBucket,
+                    RebalanceResultForBucket.of(resultForBucket.plan(), 
statusForBucket));
+            LOG.info(
+                    "Rebalance task {} in progress: {} tasks pending, {} 
completed.",
+                    currentRebalanceId,
+                    inProgressRebalanceTasksQueue.size(),
+                    finishedRebalanceTasks.size());
+
+            if (inProgressRebalanceTasksQueue.isEmpty()) {
+                // All rebalance tasks are completed.
+                rebalanceStatus = COMPLETED;
+                completeRebalance();
+            } else {
+                // Trigger one rebalance task to execute.
+                processNewRebalanceTask();
+            }
+        }
+    }
+
+    public @Nullable RebalanceProgress listRebalanceProgress(@Nullable String 
rebalanceId) {
+        checkNotClosed();
+        if (rebalanceId != null
+                && currentRebalanceId != null
+                && !rebalanceId.equals(currentRebalanceId)) {
+            LOG.warn(
+                    "Ignore the list rebalance task because it is not the 
current"
+                            + " rebalance task.");
+            throw new NoRebalanceInProgressException(
+                    String.format(
+                            "Rebalance task id %s to list is not the current 
rebalance task id %s.",
+                            rebalanceId, currentRebalanceId));
+        }
+
+        if (currentRebalanceId == null) {
+            return null;
+        }
+
+        Map<TableBucket, RebalanceResultForBucket> progressForBucketMap = new 
HashMap<>();
+        progressForBucketMap.putAll(inProgressRebalanceTasks);
+        progressForBucketMap.putAll(finishedRebalanceTasks);
+        // the progress will be set at client.
+        return new RebalanceProgress(
+                currentRebalanceId, rebalanceStatus, 0.0, 
progressForBucketMap);
+    }
+
+    public void cancelRebalance(@Nullable String rebalanceId) {
+        checkNotClosed();
+
+        if (rebalanceId != null
+                && currentRebalanceId != null
+                && !rebalanceId.equals(currentRebalanceId)) {
+            // do nothing.
+            LOG.warn(
+                    "Ignore the cancel rebalance task because it is not the 
current"
+                            + " rebalance task.");
+            throw new NoRebalanceInProgressException(
+                    String.format(
+                            "Rebalance task id %s to cancel is not the current 
rebalance task id %s.",
+                            rebalanceId, currentRebalanceId));
+        }
+
+        if (rebalanceStatus != null && 
FINAL_STATUSES.contains(rebalanceStatus)) {
+            // do nothing for the final state rebalance task.
+            return;
+        }
+
+        try {
+            Optional<RebalanceTask> rebalancePlanOpt = 
zkClient.getRebalancePlan();
+            if (rebalancePlanOpt.isPresent()) {
+                RebalanceTask rebalanceTask = rebalancePlanOpt.get();
+                zkClient.registerRebalancePlan(
+                        new RebalanceTask(
+                                rebalanceTask.getRebalanceId(),
+                                CANCELED,
+                                rebalanceTask.getExecutePlan()));
+            }
+        } catch (Exception e) {
+            LOG.error("Error when delete rebalance plan from zookeeper.", e);
+        }
+
+        rebalanceStatus = CANCELED;
+        inProgressRebalanceTasksQueue.clear();
+        inProgressRebalanceTasks.clear();
+        // Here, it will not clear finishedRebalanceTasks, because it will be 
used by
+        // listRebalanceProgress. It will be cleared when next register.
+
+        LOG.info("Cancel rebalance task success.");
+    }
+
+    public boolean hasInProgressRebalance() {
+        checkNotClosed();
+        return !inProgressRebalanceTasks.isEmpty() || 
!inProgressRebalanceTasksQueue.isEmpty();
+    }
+
+    public RebalanceTask generateRebalancePlan(List<Goal> goalsByPriority) {

Review Comment:
   rename to `generateRebalanceTask`



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -1218,22 +1229,28 @@ public void deleteServerTags() throws Exception {
         deletePath(ServerTagsZNode.path());
     }
 
-    public void registerRebalancePlan(RebalancePlan rebalancePlan) throws 
Exception {
+    public void registerRebalancePlan(RebalanceTask rebalanceTask) throws 
Exception {

Review Comment:
   ```suggestion
       public void registerRebalanceTask(RebalanceTask rebalanceTask) throws 
Exception {
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -1218,22 +1229,28 @@ public void deleteServerTags() throws Exception {
         deletePath(ServerTagsZNode.path());
     }
 
-    public void registerRebalancePlan(RebalancePlan rebalancePlan) throws 
Exception {
+    public void registerRebalancePlan(RebalanceTask rebalanceTask) throws 
Exception {
         String path = RebalanceZNode.path();
-        zkClient.create()
-                .creatingParentsIfNeeded()
-                .withMode(CreateMode.PERSISTENT)
-                .forPath(path, RebalanceZNode.encode(rebalancePlan));
+        Stat stat = zkClient.checkExists().forPath(path);
+        if (stat == null) {
+            zkClient.create()
+                    .creatingParentsIfNeeded()
+                    .withMode(CreateMode.PERSISTENT)
+                    .forPath(path, RebalanceZNode.encode(rebalanceTask));
+        } else {
+            zkClient.setData().forPath(path, 
RebalanceZNode.encode(rebalanceTask));
+        }
     }
 
-    public void updateRebalancePlan(RebalancePlan rebalancePlan) throws 
Exception {
+    public Optional<RebalanceTask> getRebalancePlan() throws Exception {
         String path = RebalanceZNode.path();
-        zkClient.setData().forPath(path, RebalanceZNode.encode(rebalancePlan));
+        return getOrEmpty(path).map(RebalanceZNode::decode);
     }
 
-    public Optional<RebalancePlan> getRebalancePlan() throws Exception {
-        String path = RebalanceZNode.path();
-        return getOrEmpty(path).map(RebalanceZNode::decode);
+    /** Deletes the rebalance plan from ZooKeeper. Only for testing propose 
now */
+    @VisibleForTesting
+    public void deleteRebalancePlan() throws Exception {

Review Comment:
   ```suggestion
       public void deleteRebalanceTask() throws Exception {
   ```



##########
fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java:
##########
@@ -602,4 +604,7 @@ ListOffsetsResult listOffsets(
      *     NoRebalanceInProgressException} will be thrown.
      */
     CompletableFuture<Void> cancelRebalance(@Nullable String rebalanceId);
+
+    // TODO support CompletableFuture<Optional<RebalanceProgress>> 
listRebalanceProgress(@Nullable
+    // String rebalanceId);

Review Comment:
   remove, this has been implemented. 



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/RebalanceProcedure.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.cluster.rebalance.GoalType;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Procedure to trigger rebalance.
+ *
+ * <p>This procedure allows triggering rebalance with different goals. See 
{@link
+ * Admin#rebalance(List)} for more details.
+ *
+ * <p>Usage examples:
+ *
+ * <pre>
+ * -- Trigger rebalance with REPLICA_DISTRIBUTION goal
+ * CALL sys.rebalance('REPLICA_DISTRIBUTION');
+ * -- Trigger rebalance with REPLICA_DISTRIBUTION and LEADER_DISTRIBUTION goals
+ * CALL sys.rebalance('REPLICA_DISTRIBUTION;LEADER_DISTRIBUTION');

Review Comment:
   ```suggestion
    * CALL sys.rebalance('REPLICA_DISTRIBUTION,LEADER_DISTRIBUTION');
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -1218,22 +1229,28 @@ public void deleteServerTags() throws Exception {
         deletePath(ServerTagsZNode.path());
     }
 
-    public void registerRebalancePlan(RebalancePlan rebalancePlan) throws 
Exception {
+    public void registerRebalancePlan(RebalanceTask rebalanceTask) throws 
Exception {
         String path = RebalanceZNode.path();
-        zkClient.create()
-                .creatingParentsIfNeeded()
-                .withMode(CreateMode.PERSISTENT)
-                .forPath(path, RebalanceZNode.encode(rebalancePlan));
+        Stat stat = zkClient.checkExists().forPath(path);
+        if (stat == null) {
+            zkClient.create()
+                    .creatingParentsIfNeeded()
+                    .withMode(CreateMode.PERSISTENT)
+                    .forPath(path, RebalanceZNode.encode(rebalanceTask));
+        } else {
+            zkClient.setData().forPath(path, 
RebalanceZNode.encode(rebalanceTask));
+        }
     }
 
-    public void updateRebalancePlan(RebalancePlan rebalancePlan) throws 
Exception {
+    public Optional<RebalanceTask> getRebalancePlan() throws Exception {

Review Comment:
   ```suggestion
       public Optional<RebalanceTask> getRebalanceTask() throws Exception {
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/RebalanceProcedure.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.cluster.rebalance.GoalType;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Procedure to trigger rebalance.
+ *
+ * <p>This procedure allows triggering rebalance with different goals. See 
{@link
+ * Admin#rebalance(List)} for more details.
+ *
+ * <p>Usage examples:
+ *
+ * <pre>
+ * -- Trigger rebalance with REPLICA_DISTRIBUTION goal
+ * CALL sys.rebalance('REPLICA_DISTRIBUTION');
+ * -- Trigger rebalance with REPLICA_DISTRIBUTION and LEADER_DISTRIBUTION goals
+ * CALL sys.rebalance('REPLICA_DISTRIBUTION;LEADER_DISTRIBUTION');
+ * </pre>
+ */
+public class RebalanceProcedure extends ProcedureBase {
+
+    /**
+     * As flink call don't support input a nested type like 'ARRAY'. So 
priorityGoals is defined as
+     * a String type, and different goals are split by ';'.

Review Comment:
   ```suggestion
        * a String type, and different goals are split by ','.
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
+import org.apache.fluss.cluster.rebalance.RebalanceProgress;
+import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
+import org.apache.fluss.cluster.rebalance.RebalanceStatus;
+import org.apache.fluss.cluster.rebalance.ServerTag;
+import org.apache.fluss.exception.NoRebalanceInProgressException;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.coordinator.CoordinatorContext;
+import org.apache.fluss.server.coordinator.CoordinatorEventProcessor;
+import org.apache.fluss.server.coordinator.rebalance.goal.Goal;
+import org.apache.fluss.server.coordinator.rebalance.goal.GoalOptimizer;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.RackModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
+import org.apache.fluss.server.metadata.ServerInfo;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.LeaderAndIsr;
+import org.apache.fluss.server.zk.data.RebalanceTask;
+import org.apache.fluss.utils.MapUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.CANCELED;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED;
+import static 
org.apache.fluss.cluster.rebalance.RebalanceStatus.FINAL_STATUSES;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NOT_STARTED;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.REBALANCING;
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * A rebalance manager to generate rebalance plan, and execution rebalance 
plan.
+ *
+ * <p>This manager can only be used in {@link CoordinatorEventProcessor} as a 
single threaded model.
+ */
+public class RebalanceManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RebalanceManager.class);
+
+    private final ZooKeeperClient zkClient;
+    private final CoordinatorEventProcessor eventProcessor;
+
+    /** A queue of in progress table bucket to rebalance. */
+    private final Queue<TableBucket> inProgressRebalanceTasksQueue = new 
ArrayDeque<>();
+
+    /** A mapping from table bucket to rebalance status of pending and running 
tasks. */
+    private final Map<TableBucket, RebalanceResultForBucket> 
inProgressRebalanceTasks =
+            MapUtils.newConcurrentHashMap();
+
+    /** A mapping from table bucket to rebalance status of failed or completed 
tasks. */
+    private final Map<TableBucket, RebalanceResultForBucket> 
finishedRebalanceTasks =
+            MapUtils.newConcurrentHashMap();
+
+    private final GoalOptimizer goalOptimizer;
+    private volatile long registerTime;
+    private volatile @Nullable RebalanceStatus rebalanceStatus;
+    private volatile @Nullable String currentRebalanceId;
+    private volatile boolean isClosed = false;
+
+    public RebalanceManager(CoordinatorEventProcessor eventProcessor, 
ZooKeeperClient zkClient) {
+        this.eventProcessor = eventProcessor;
+        this.zkClient = zkClient;
+        this.goalOptimizer = new GoalOptimizer();
+    }
+
+    public void startup() {
+        LOG.info("Start up rebalance manager.");
+        initialize();
+    }
+
+    public @Nullable String getRebalanceId() {
+        return currentRebalanceId;
+    }
+
+    private void initialize() {
+        try {
+            zkClient.getRebalancePlan()
+                    .ifPresent(
+                            rebalancePlan ->
+                                    registerRebalance(
+                                            rebalancePlan.getRebalanceId(),
+                                            rebalancePlan.getExecutePlan(),
+                                            
rebalancePlan.getRebalanceStatus()));
+        } catch (Exception e) {
+            LOG.error(
+                    "Failed to get rebalance plan from zookeeper, it will be 
treated as no"
+                            + "rebalance tasks.",
+                    e);
+        }
+    }
+
+    public void registerRebalance(
+            String rebalanceId,
+            Map<TableBucket, RebalancePlanForBucket> rebalancePlan,
+            RebalanceStatus newStatus) {
+        checkNotClosed();
+        registerTime = System.currentTimeMillis();
+        // first clear all exists tasks.
+        inProgressRebalanceTasks.clear();
+        inProgressRebalanceTasksQueue.clear();
+        finishedRebalanceTasks.clear();
+
+        currentRebalanceId = rebalanceId;
+        rebalancePlan.forEach(
+                ((tableBucket, planForBucket) -> {
+                    if (FINAL_STATUSES.contains(newStatus)) {
+                        finishedRebalanceTasks.put(
+                                tableBucket, 
RebalanceResultForBucket.of(planForBucket, newStatus));
+                    } else {
+                        inProgressRebalanceTasksQueue.add(tableBucket);
+                        inProgressRebalanceTasks.put(
+                                tableBucket,
+                                RebalanceResultForBucket.of(planForBucket, 
NOT_STARTED));
+                    }
+                }));
+
+        if (!inProgressRebalanceTasksQueue.isEmpty()) {
+            // Trigger one rebalance task to execute.
+            rebalanceStatus = REBALANCING;
+            processNewRebalanceTask();
+        } else {
+            rebalanceStatus = newStatus;
+        }
+    }
+
+    public void finishRebalanceTask(TableBucket tableBucket, RebalanceStatus 
statusForBucket) {
+        checkNotClosed();
+        if (inProgressRebalanceTasksQueue.contains(tableBucket)) {
+            inProgressRebalanceTasksQueue.remove(tableBucket);
+            RebalanceResultForBucket resultForBucket = 
inProgressRebalanceTasks.remove(tableBucket);
+            checkNotNull(resultForBucket, "RebalanceResultForBucket is null.");
+            finishedRebalanceTasks.put(
+                    tableBucket,
+                    RebalanceResultForBucket.of(resultForBucket.plan(), 
statusForBucket));
+            LOG.info(
+                    "Rebalance task {} in progress: {} tasks pending, {} 
completed.",
+                    currentRebalanceId,
+                    inProgressRebalanceTasksQueue.size(),
+                    finishedRebalanceTasks.size());
+
+            if (inProgressRebalanceTasksQueue.isEmpty()) {
+                // All rebalance tasks are completed.
+                rebalanceStatus = COMPLETED;
+                completeRebalance();
+            } else {
+                // Trigger one rebalance task to execute.
+                processNewRebalanceTask();
+            }
+        }
+    }
+
+    public @Nullable RebalanceProgress listRebalanceProgress(@Nullable String 
rebalanceId) {
+        checkNotClosed();
+        if (rebalanceId != null
+                && currentRebalanceId != null
+                && !rebalanceId.equals(currentRebalanceId)) {
+            LOG.warn(
+                    "Ignore the list rebalance task because it is not the 
current"
+                            + " rebalance task.");
+            throw new NoRebalanceInProgressException(
+                    String.format(
+                            "Rebalance task id %s to list is not the current 
rebalance task id %s.",
+                            rebalanceId, currentRebalanceId));
+        }
+
+        if (currentRebalanceId == null) {
+            return null;
+        }
+
+        Map<TableBucket, RebalanceResultForBucket> progressForBucketMap = new 
HashMap<>();
+        progressForBucketMap.putAll(inProgressRebalanceTasks);
+        progressForBucketMap.putAll(finishedRebalanceTasks);
+        // the progress will be set at client.
+        return new RebalanceProgress(
+                currentRebalanceId, rebalanceStatus, 0.0, 
progressForBucketMap);
+    }
+
+    public void cancelRebalance(@Nullable String rebalanceId) {
+        checkNotClosed();
+
+        if (rebalanceId != null
+                && currentRebalanceId != null
+                && !rebalanceId.equals(currentRebalanceId)) {
+            // do nothing.
+            LOG.warn(
+                    "Ignore the cancel rebalance task because it is not the 
current"
+                            + " rebalance task.");
+            throw new NoRebalanceInProgressException(
+                    String.format(
+                            "Rebalance task id %s to cancel is not the current 
rebalance task id %s.",
+                            rebalanceId, currentRebalanceId));
+        }
+
+        if (rebalanceStatus != null && 
FINAL_STATUSES.contains(rebalanceStatus)) {
+            // do nothing for the final state rebalance task.
+            return;
+        }
+
+        try {
+            Optional<RebalanceTask> rebalancePlanOpt = 
zkClient.getRebalancePlan();
+            if (rebalancePlanOpt.isPresent()) {
+                RebalanceTask rebalanceTask = rebalancePlanOpt.get();
+                zkClient.registerRebalancePlan(
+                        new RebalanceTask(
+                                rebalanceTask.getRebalanceId(),
+                                CANCELED,
+                                rebalanceTask.getExecutePlan()));
+            }
+        } catch (Exception e) {
+            LOG.error("Error when delete rebalance plan from zookeeper.", e);
+        }
+
+        rebalanceStatus = CANCELED;
+        inProgressRebalanceTasksQueue.clear();
+        inProgressRebalanceTasks.clear();
+        // Here, it will not clear finishedRebalanceTasks, because it will be 
used by
+        // listRebalanceProgress. It will be cleared when next register.
+
+        LOG.info("Cancel rebalance task success.");
+    }
+
+    public boolean hasInProgressRebalance() {
+        checkNotClosed();
+        return !inProgressRebalanceTasks.isEmpty() || 
!inProgressRebalanceTasksQueue.isEmpty();
+    }
+
+    public RebalanceTask generateRebalancePlan(List<Goal> goalsByPriority) {
+        checkNotClosed();
+        List<RebalancePlanForBucket> rebalancePlanForBuckets;
+        String rebalanceId = UUID.randomUUID().toString();
+        try {
+            // Generate the latest cluster model.
+            long startTime = System.currentTimeMillis();
+            ClusterModel clusterModel = 
buildClusterModel(eventProcessor.getCoordinatorContext());
+            LOG.info(
+                    "Build cluster model for rebalance id {} with {} ms.",
+                    rebalanceId,
+                    System.currentTimeMillis() - startTime);
+
+            // do optimize.
+            startTime = System.currentTimeMillis();
+            rebalancePlanForBuckets = 
goalOptimizer.doOptimizeOnce(clusterModel, goalsByPriority);
+            LOG.info(
+                    "Do optimize for rebalance id {} with {} ms.",
+                    rebalanceId,
+                    System.currentTimeMillis() - startTime);
+        } catch (Exception e) {
+            LOG.error("Failed to generate rebalance plan.", e);
+            throw e;
+        }
+
+        // group by tableId and partitionId to generate rebalance plan.
+        return buildRebalancePlan(rebalanceId, rebalancePlanForBuckets);
+    }
+
+    public @Nullable RebalancePlanForBucket 
getRebalancePlanForBucket(TableBucket tableBucket) {
+        checkNotClosed();
+        RebalanceResultForBucket resultForBucket = 
inProgressRebalanceTasks.get(tableBucket);
+        if (resultForBucket != null) {
+            return resultForBucket.plan();
+        }
+        return null;
+    }
+
+    private void processNewRebalanceTask() {
+        TableBucket tableBucket = inProgressRebalanceTasksQueue.peek();
+        if (tableBucket != null && 
inProgressRebalanceTasks.containsKey(tableBucket)) {
+            RebalanceResultForBucket resultForBucket = 
inProgressRebalanceTasks.get(tableBucket);
+            RebalanceResultForBucket rebalanceResultForBucket =
+                    RebalanceResultForBucket.of(resultForBucket.plan(), 
REBALANCING);
+            
eventProcessor.tryToExecuteRebalanceTask(rebalanceResultForBucket.plan());
+        }
+    }
+
+    private void completeRebalance() {
+        checkNotClosed();
+        try {
+            Optional<RebalanceTask> rebalancePlanOpt = 
zkClient.getRebalancePlan();
+            if (rebalancePlanOpt.isPresent()) {
+                RebalanceTask rebalanceTask = rebalancePlanOpt.get();
+                zkClient.registerRebalancePlan(
+                        new RebalanceTask(
+                                rebalanceTask.getRebalanceId(),
+                                COMPLETED,
+                                rebalanceTask.getExecutePlan()));
+            }
+        } catch (Exception e) {
+            LOG.error("Error when update rebalance plan from zookeeper.", e);
+        }
+
+        inProgressRebalanceTasks.clear();
+        inProgressRebalanceTasksQueue.clear();
+
+        // Here, it will not clear finishedRebalanceTasks, because it will be 
used by
+        // listRebalanceProgress. It will be cleared when next register.
+
+        LOG.info("Rebalance complete with {} ms.", System.currentTimeMillis() 
- registerTime);
+    }
+
+    private ClusterModel buildClusterModel(CoordinatorContext 
coordinatorContext) {
+        Map<Integer, ServerInfo> liveTabletServers = 
coordinatorContext.getLiveTabletServers();
+        Map<Integer, ServerTag> serverTags = 
coordinatorContext.getServerTags();
+
+        Map<Integer, ServerModel> serverModelMap = new HashMap<>();
+        for (ServerInfo serverInfo : liveTabletServers.values()) {
+            Integer id = serverInfo.id();
+            String rack = serverInfo.rack() == null ? RackModel.DEFAULT_RACK : 
serverInfo.rack();
+            if (serverTags.containsKey(id)) {
+                serverModelMap.put(
+                        id, new ServerModel(id, rack, 
isOfflineTagged(serverTags.get(id))));
+            } else {
+                serverModelMap.put(id, new ServerModel(id, rack, false));
+            }
+        }
+
+        ClusterModel clusterModel = initialClusterModel(serverModelMap);
+
+        // Try to update the cluster model with the latest bucket states.
+        Set<TableBucket> allBuckets = coordinatorContext.getAllBuckets();
+        for (TableBucket tableBucket : allBuckets) {
+            List<Integer> assignment = 
coordinatorContext.getAssignment(tableBucket);
+            Optional<LeaderAndIsr> bucketLeaderAndIsrOpt =
+                    coordinatorContext.getBucketLeaderAndIsr(tableBucket);
+            checkArgument(bucketLeaderAndIsrOpt.isPresent(), "Bucket leader 
and isr is empty.");
+            LeaderAndIsr isr = bucketLeaderAndIsrOpt.get();
+            int leader = isr.leader();
+            for (int i = 0; i < assignment.size(); i++) {
+                int replica = assignment.get(i);
+                clusterModel.createReplica(replica, tableBucket, i, leader == 
replica);
+            }
+        }
+        return clusterModel;
+    }
+
+    private RebalanceTask buildRebalancePlan(

Review Comment:
   rename to `buildRebalanceTask`



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
+import org.apache.fluss.cluster.rebalance.RebalanceProgress;
+import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
+import org.apache.fluss.cluster.rebalance.RebalanceStatus;
+import org.apache.fluss.cluster.rebalance.ServerTag;
+import org.apache.fluss.exception.NoRebalanceInProgressException;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.coordinator.CoordinatorContext;
+import org.apache.fluss.server.coordinator.CoordinatorEventProcessor;
+import org.apache.fluss.server.coordinator.rebalance.goal.Goal;
+import org.apache.fluss.server.coordinator.rebalance.goal.GoalOptimizer;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.RackModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
+import org.apache.fluss.server.metadata.ServerInfo;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.LeaderAndIsr;
+import org.apache.fluss.server.zk.data.RebalanceTask;
+import org.apache.fluss.utils.MapUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.CANCELED;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED;
+import static 
org.apache.fluss.cluster.rebalance.RebalanceStatus.FINAL_STATUSES;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NOT_STARTED;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.REBALANCING;
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * A rebalance manager to generate rebalance plan, and execution rebalance 
plan.
+ *
+ * <p>This manager can only be used in {@link CoordinatorEventProcessor} as a 
single threaded model.
+ */
+public class RebalanceManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RebalanceManager.class);
+
+    private final ZooKeeperClient zkClient;
+    private final CoordinatorEventProcessor eventProcessor;
+
+    /** A queue of in progress table bucket to rebalance. */
+    private final Queue<TableBucket> inProgressRebalanceTasksQueue = new 
ArrayDeque<>();
+
+    /** A mapping from table bucket to rebalance status of pending and running 
tasks. */
+    private final Map<TableBucket, RebalanceResultForBucket> 
inProgressRebalanceTasks =
+            MapUtils.newConcurrentHashMap();
+
+    /** A mapping from table bucket to rebalance status of failed or completed 
tasks. */
+    private final Map<TableBucket, RebalanceResultForBucket> 
finishedRebalanceTasks =
+            MapUtils.newConcurrentHashMap();
+
+    private final GoalOptimizer goalOptimizer;
+    private volatile long registerTime;
+    private volatile @Nullable RebalanceStatus rebalanceStatus;
+    private volatile @Nullable String currentRebalanceId;
+    private volatile boolean isClosed = false;
+
+    public RebalanceManager(CoordinatorEventProcessor eventProcessor, 
ZooKeeperClient zkClient) {
+        this.eventProcessor = eventProcessor;
+        this.zkClient = zkClient;
+        this.goalOptimizer = new GoalOptimizer();
+    }
+
+    public void startup() {
+        LOG.info("Start up rebalance manager.");
+        initialize();
+    }
+
+    public @Nullable String getRebalanceId() {
+        return currentRebalanceId;
+    }
+
+    private void initialize() {
+        try {
+            zkClient.getRebalancePlan()
+                    .ifPresent(
+                            rebalancePlan ->
+                                    registerRebalance(
+                                            rebalancePlan.getRebalanceId(),
+                                            rebalancePlan.getExecutePlan(),
+                                            
rebalancePlan.getRebalanceStatus()));
+        } catch (Exception e) {
+            LOG.error(
+                    "Failed to get rebalance plan from zookeeper, it will be 
treated as no"
+                            + "rebalance tasks.",
+                    e);
+        }
+    }
+
+    public void registerRebalance(
+            String rebalanceId,
+            Map<TableBucket, RebalancePlanForBucket> rebalancePlan,
+            RebalanceStatus newStatus) {
+        checkNotClosed();
+        registerTime = System.currentTimeMillis();
+        // first clear all exists tasks.
+        inProgressRebalanceTasks.clear();
+        inProgressRebalanceTasksQueue.clear();
+        finishedRebalanceTasks.clear();
+
+        currentRebalanceId = rebalanceId;
+        rebalancePlan.forEach(
+                ((tableBucket, planForBucket) -> {
+                    if (FINAL_STATUSES.contains(newStatus)) {
+                        finishedRebalanceTasks.put(
+                                tableBucket, 
RebalanceResultForBucket.of(planForBucket, newStatus));
+                    } else {
+                        inProgressRebalanceTasksQueue.add(tableBucket);
+                        inProgressRebalanceTasks.put(
+                                tableBucket,
+                                RebalanceResultForBucket.of(planForBucket, 
NOT_STARTED));
+                    }
+                }));
+
+        if (!inProgressRebalanceTasksQueue.isEmpty()) {
+            // Trigger one rebalance task to execute.
+            rebalanceStatus = REBALANCING;
+            processNewRebalanceTask();
+        } else {
+            rebalanceStatus = newStatus;
+        }
+    }
+
+    public void finishRebalanceTask(TableBucket tableBucket, RebalanceStatus 
statusForBucket) {
+        checkNotClosed();
+        if (inProgressRebalanceTasksQueue.contains(tableBucket)) {
+            inProgressRebalanceTasksQueue.remove(tableBucket);
+            RebalanceResultForBucket resultForBucket = 
inProgressRebalanceTasks.remove(tableBucket);
+            checkNotNull(resultForBucket, "RebalanceResultForBucket is null.");
+            finishedRebalanceTasks.put(
+                    tableBucket,
+                    RebalanceResultForBucket.of(resultForBucket.plan(), 
statusForBucket));
+            LOG.info(
+                    "Rebalance task {} in progress: {} tasks pending, {} 
completed.",
+                    currentRebalanceId,
+                    inProgressRebalanceTasksQueue.size(),
+                    finishedRebalanceTasks.size());
+
+            if (inProgressRebalanceTasksQueue.isEmpty()) {
+                // All rebalance tasks are completed.
+                rebalanceStatus = COMPLETED;
+                completeRebalance();
+            } else {
+                // Trigger one rebalance task to execute.
+                processNewRebalanceTask();
+            }
+        }
+    }
+
+    public @Nullable RebalanceProgress listRebalanceProgress(@Nullable String 
rebalanceId) {
+        checkNotClosed();
+        if (rebalanceId != null
+                && currentRebalanceId != null
+                && !rebalanceId.equals(currentRebalanceId)) {
+            LOG.warn(
+                    "Ignore the list rebalance task because it is not the 
current"
+                            + " rebalance task.");
+            throw new NoRebalanceInProgressException(
+                    String.format(
+                            "Rebalance task id %s to list is not the current 
rebalance task id %s.",
+                            rebalanceId, currentRebalanceId));
+        }
+
+        if (currentRebalanceId == null) {
+            return null;
+        }
+
+        Map<TableBucket, RebalanceResultForBucket> progressForBucketMap = new 
HashMap<>();
+        progressForBucketMap.putAll(inProgressRebalanceTasks);
+        progressForBucketMap.putAll(finishedRebalanceTasks);
+        // the progress will be set at client.
+        return new RebalanceProgress(
+                currentRebalanceId, rebalanceStatus, 0.0, 
progressForBucketMap);
+    }
+
+    public void cancelRebalance(@Nullable String rebalanceId) {
+        checkNotClosed();
+
+        if (rebalanceId != null
+                && currentRebalanceId != null
+                && !rebalanceId.equals(currentRebalanceId)) {
+            // do nothing.
+            LOG.warn(
+                    "Ignore the cancel rebalance task because it is not the 
current"
+                            + " rebalance task.");
+            throw new NoRebalanceInProgressException(
+                    String.format(
+                            "Rebalance task id %s to cancel is not the current 
rebalance task id %s.",
+                            rebalanceId, currentRebalanceId));
+        }
+
+        if (rebalanceStatus != null && 
FINAL_STATUSES.contains(rebalanceStatus)) {
+            // do nothing for the final state rebalance task.
+            return;
+        }
+
+        try {
+            Optional<RebalanceTask> rebalancePlanOpt = 
zkClient.getRebalancePlan();
+            if (rebalancePlanOpt.isPresent()) {
+                RebalanceTask rebalanceTask = rebalancePlanOpt.get();
+                zkClient.registerRebalancePlan(
+                        new RebalanceTask(
+                                rebalanceTask.getRebalanceId(),
+                                CANCELED,
+                                rebalanceTask.getExecutePlan()));
+            }
+        } catch (Exception e) {
+            LOG.error("Error when delete rebalance plan from zookeeper.", e);
+        }
+
+        rebalanceStatus = CANCELED;
+        inProgressRebalanceTasksQueue.clear();
+        inProgressRebalanceTasks.clear();
+        // Here, it will not clear finishedRebalanceTasks, because it will be 
used by
+        // listRebalanceProgress. It will be cleared when next register.
+
+        LOG.info("Cancel rebalance task success.");
+    }
+
+    public boolean hasInProgressRebalance() {
+        checkNotClosed();
+        return !inProgressRebalanceTasks.isEmpty() || 
!inProgressRebalanceTasksQueue.isEmpty();
+    }
+
+    public RebalanceTask generateRebalancePlan(List<Goal> goalsByPriority) {
+        checkNotClosed();
+        List<RebalancePlanForBucket> rebalancePlanForBuckets;
+        String rebalanceId = UUID.randomUUID().toString();
+        try {
+            // Generate the latest cluster model.
+            long startTime = System.currentTimeMillis();
+            ClusterModel clusterModel = 
buildClusterModel(eventProcessor.getCoordinatorContext());
+            LOG.info(
+                    "Build cluster model for rebalance id {} with {} ms.",
+                    rebalanceId,
+                    System.currentTimeMillis() - startTime);
+
+            // do optimize.
+            startTime = System.currentTimeMillis();
+            rebalancePlanForBuckets = 
goalOptimizer.doOptimizeOnce(clusterModel, goalsByPriority);
+            LOG.info(
+                    "Do optimize for rebalance id {} with {} ms.",
+                    rebalanceId,
+                    System.currentTimeMillis() - startTime);
+        } catch (Exception e) {
+            LOG.error("Failed to generate rebalance plan.", e);
+            throw e;
+        }
+
+        // group by tableId and partitionId to generate rebalance plan.
+        return buildRebalancePlan(rebalanceId, rebalancePlanForBuckets);
+    }
+
+    public @Nullable RebalancePlanForBucket 
getRebalancePlanForBucket(TableBucket tableBucket) {
+        checkNotClosed();
+        RebalanceResultForBucket resultForBucket = 
inProgressRebalanceTasks.get(tableBucket);
+        if (resultForBucket != null) {
+            return resultForBucket.plan();
+        }
+        return null;
+    }
+
+    private void processNewRebalanceTask() {
+        TableBucket tableBucket = inProgressRebalanceTasksQueue.peek();
+        if (tableBucket != null && 
inProgressRebalanceTasks.containsKey(tableBucket)) {
+            RebalanceResultForBucket resultForBucket = 
inProgressRebalanceTasks.get(tableBucket);
+            RebalanceResultForBucket rebalanceResultForBucket =
+                    RebalanceResultForBucket.of(resultForBucket.plan(), 
REBALANCING);
+            
eventProcessor.tryToExecuteRebalanceTask(rebalanceResultForBucket.plan());
+        }
+    }
+
+    private void completeRebalance() {
+        checkNotClosed();
+        try {
+            Optional<RebalanceTask> rebalancePlanOpt = 
zkClient.getRebalancePlan();

Review Comment:
   rename to `rebalanceTaskOpt` and `zkClient.getRebalanceTask()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to