wuchong commented on code in PR #1380:
URL: https://github.com/apache/fluss/pull/1380#discussion_r2649536974


##########
fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java:
##########
@@ -492,4 +503,90 @@ ListOffsetsResult listOffsets(
      * @return A CompletableFuture indicating completion of the operation.
      */
     CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> 
configs);
+
+    /**
+     * Add server tag to the specified tabletServers, one tabletServer can 
only have one serverTag.
+     *
+     * <p>If one tabletServer failed adding tag, none of the tags will take 
effect.
+     *
+     * <ul>
+     *   <li>{@link AuthorizationException} If the authenticated user doesn't 
have cluster
+     *       permissions.
+     *   <li>{@link ServerNotExistException} If the tabletServer in {@code 
tabletServers} does not
+     *       exist.
+     *   <li>{@link ServerTagAlreadyExistException} If the server tag already 
exists for any one of
+     *       the tabletServers.
+     * </ul>
+     *
+     * @param tabletServers the tabletServers we want to add server tags.
+     * @param serverTag the server tag to be added.
+     */
+    CompletableFuture<Void> addServerTag(List<Integer> tabletServers, 
ServerTag serverTag);
+
+    /**
+     * Remove server tag from the specified tabletServers.
+     *
+     * <p>If one tabletServer failed removing tag, none of the tags will be 
removed.
+     *
+     * <ul>
+     *   <li>{@link AuthorizationException} If the authenticated user doesn't 
have cluster
+     *       permissions.
+     *   <li>{@link ServerNotExistException} If the tabletServer in {@code 
tabletServers} does not
+     *       exist.
+     *   <li>{@link ServerTagNotExistException} If the server tag does not 
exist for any one of the
+     *       tabletServers.
+     * </ul>
+     *
+     * @param tabletServers the tabletServers we want to remove server tags.
+     */
+    CompletableFuture<Void> removeServerTag(List<Integer> tabletServers, 
ServerTag serverTag);
+
+    /**
+     * Based on the provided {@code priorityGoals}, Fluss performs load 
balancing on the cluster's
+     * bucket load.
+     *
+     * <p>More details, Fluss collects the cluster's load information and 
optimizes to perform load
+     * balancing according to the user-defined {@code priorityGoals}.
+     *
+     * <p>Currently, Fluss only supports one active rebalance task in the 
cluster. If an uncompleted
+     * rebalance task exists, an {@link RebalanceFailureException} will be 
thrown.
+     *
+     * <ul>
+     *   <li>{@link AuthorizationException} If the authenticated user doesn't 
have cluster
+     *       permissions.
+     *   <li>{@link RebalanceFailureException} If the rebalance failed. Such 
as there is an ongoing
+     *       execution.
+     * </ul>
+     *
+     * @param priorityGoals the goals to be optimized.
+     * @param dryRun Calculate and return the rebalance optimization proposal, 
but do not execute
+     *     it.
+     * @return the generated rebalance plan for all the tableBuckets which 
need to do rebalance.
+     */
+    CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
+            List<GoalType> priorityGoals, boolean dryRun);

Review Comment:
   Could you update the return signature to `CompletableFuture<RebalancePlan>` 
and the `RebalancePlan` contains the `Map<TableBucket, 
RebalancePlanForBucket>`, this allows us to evolve the return result easier in 
the future. 



##########
fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java:
##########
@@ -492,4 +503,90 @@ ListOffsetsResult listOffsets(
      * @return A CompletableFuture indicating completion of the operation.
      */
     CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> 
configs);
+
+    /**
+     * Add server tag to the specified tabletServers, one tabletServer can 
only have one serverTag.
+     *
+     * <p>If one tabletServer failed adding tag, none of the tags will take 
effect.
+     *
+     * <ul>
+     *   <li>{@link AuthorizationException} If the authenticated user doesn't 
have cluster
+     *       permissions.
+     *   <li>{@link ServerNotExistException} If the tabletServer in {@code 
tabletServers} does not
+     *       exist.
+     *   <li>{@link ServerTagAlreadyExistException} If the server tag already 
exists for any one of
+     *       the tabletServers.
+     * </ul>
+     *
+     * @param tabletServers the tabletServers we want to add server tags.
+     * @param serverTag the server tag to be added.
+     */
+    CompletableFuture<Void> addServerTag(List<Integer> tabletServers, 
ServerTag serverTag);
+
+    /**
+     * Remove server tag from the specified tabletServers.
+     *
+     * <p>If one tabletServer failed removing tag, none of the tags will be 
removed.
+     *
+     * <ul>
+     *   <li>{@link AuthorizationException} If the authenticated user doesn't 
have cluster
+     *       permissions.
+     *   <li>{@link ServerNotExistException} If the tabletServer in {@code 
tabletServers} does not
+     *       exist.
+     *   <li>{@link ServerTagNotExistException} If the server tag does not 
exist for any one of the
+     *       tabletServers.
+     * </ul>
+     *
+     * @param tabletServers the tabletServers we want to remove server tags.
+     */
+    CompletableFuture<Void> removeServerTag(List<Integer> tabletServers, 
ServerTag serverTag);
+
+    /**
+     * Based on the provided {@code priorityGoals}, Fluss performs load 
balancing on the cluster's
+     * bucket load.
+     *
+     * <p>More details, Fluss collects the cluster's load information and 
optimizes to perform load
+     * balancing according to the user-defined {@code priorityGoals}.
+     *
+     * <p>Currently, Fluss only supports one active rebalance task in the 
cluster. If an uncompleted
+     * rebalance task exists, an {@link RebalanceFailureException} will be 
thrown.
+     *
+     * <ul>
+     *   <li>{@link AuthorizationException} If the authenticated user doesn't 
have cluster
+     *       permissions.
+     *   <li>{@link RebalanceFailureException} If the rebalance failed. Such 
as there is an ongoing
+     *       execution.
+     * </ul>
+     *
+     * @param priorityGoals the goals to be optimized.
+     * @param dryRun Calculate and return the rebalance optimization proposal, 
but do not execute
+     *     it.
+     * @return the generated rebalance plan for all the tableBuckets which 
need to do rebalance.
+     */
+    CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
+            List<GoalType> priorityGoals, boolean dryRun);
+
+    /**
+     * List the rebalance process.
+     *
+     * <ul>
+     *   <li>{@link AuthorizationException} If the authenticated user doesn't 
have cluster
+     *       permissions.
+     *   <li>{@link NoRebalanceInProgressException} If there are no rebalance 
tasks in progress.
+     * </ul>
+     *
+     * @return the rebalance process for all the tableBuckets doing rebalance.
+     */
+    CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> 
listRebalanceProcess();

Review Comment:
   Changed from `listRebalanceProcess` to `listRebalanceProgress`.
   
   Because in distributed systems, `Process` often refers to an OS-level 
execution or a long-running daemon. `Status` or `Progress` accurately reflects 
that this is a query for the current state of a task.
   
   Additionally, we can improve the return type to:  
   ```java
   CompletableFuture<RebalanceProgress>
   ```  
   where `RebalanceProgress` includes:
   - A `Map<TableBucket, RebalanceResultForBucket>` detailing per-bucket 
results,
   - A top-level `status` field (e.g., using an enum like 
`RebalanceOverallStatus`),
   - A `progress` field (e.g., a `double` between 0.0 and 1.0) representing the 
overall completion percentage.
   
   (both `status` and `progress` can be calculated in client side, not 
necessary coded in RPC)
   
   This structure provides both granular visibility and a high-level summary, 
making it easier for clients to monitor and react to rebalancing activity.
   



##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -955,3 +993,40 @@ message PbDescribeConfig {
   required string config_source = 3;
 }
 
+message PbRebalancePlanForTable {
+  required int64 table_id = 1;
+  repeated PbRebalancePlanForPartition partitions_plan = 2; // for 
none-partition table, this is empty
+  repeated PbRebalancePlanForBucket buckets_plan = 3; // for partition table, 
this is empty
+
+}
+
+message PbRebalancePlanForPartition {
+  required int64 partition_id = 1;
+  repeated PbRebalancePlanForBucket buckets_plan = 2;
+}
+
+message PbRebalancePlanForBucket {
+  required int32 bucket_id = 1;
+  optional int32 original_leader = 2;
+  optional int32 new_leader = 3;
+  repeated int32 original_replicas = 4 [packed = true];
+  repeated int32 new_replicas = 5 [packed = true];
+}
+
+message PbRebalanceProcessForTable {

Review Comment:
   `PbRebalanceProgressForTable`



##########
fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java:
##########
@@ -492,4 +503,90 @@ ListOffsetsResult listOffsets(
      * @return A CompletableFuture indicating completion of the operation.
      */
     CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> 
configs);
+
+    /**
+     * Add server tag to the specified tabletServers, one tabletServer can 
only have one serverTag.
+     *
+     * <p>If one tabletServer failed adding tag, none of the tags will take 
effect.
+     *
+     * <ul>
+     *   <li>{@link AuthorizationException} If the authenticated user doesn't 
have cluster
+     *       permissions.
+     *   <li>{@link ServerNotExistException} If the tabletServer in {@code 
tabletServers} does not
+     *       exist.
+     *   <li>{@link ServerTagAlreadyExistException} If the server tag already 
exists for any one of

Review Comment:
   The method `addServerTag` should be **idempotent**: if some servers already 
have the given tag, it should succeed without error. Only when a server already 
has a **different** server tag (i.e., not equal to the one being set) should it 
throw `ServerTagAlreadyExistException`. Could you update the javadoc here?



##########
fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cluster.rebalance;
+
+import org.apache.fluss.annotation.PublicEvolving;
+
+/**
+ * Rebalance status for single bucket.
+ *
+ * @since 0.8
+ */
+@PublicEvolving
+public enum RebalanceStatusForBucket {

Review Comment:
   We can rename this to `RebalanceStatus`, so we can use it as a top-level 
status of rebalance. 



##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -955,3 +993,40 @@ message PbDescribeConfig {
   required string config_source = 3;
 }
 
+message PbRebalancePlanForTable {
+  required int64 table_id = 1;
+  repeated PbRebalancePlanForPartition partitions_plan = 2; // for 
none-partition table, this is empty
+  repeated PbRebalancePlanForBucket buckets_plan = 3; // for partition table, 
this is empty

Review Comment:
   I think this makes the RPC message complex, maybe we can follow other RPC 
messages that omit the `PbRebalancePlanForPartition` and adding an `optional 
int64 partition_id` into `PbRebalancePlanForBucket`?
   
   The same to `PbRebalanceProcessForTable`.



##########
fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java:
##########
@@ -492,4 +503,90 @@ ListOffsetsResult listOffsets(
      * @return A CompletableFuture indicating completion of the operation.
      */
     CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> 
configs);
+
+    /**
+     * Add server tag to the specified tabletServers, one tabletServer can 
only have one serverTag.
+     *
+     * <p>If one tabletServer failed adding tag, none of the tags will take 
effect.
+     *
+     * <ul>
+     *   <li>{@link AuthorizationException} If the authenticated user doesn't 
have cluster
+     *       permissions.
+     *   <li>{@link ServerNotExistException} If the tabletServer in {@code 
tabletServers} does not
+     *       exist.
+     *   <li>{@link ServerTagAlreadyExistException} If the server tag already 
exists for any one of
+     *       the tabletServers.
+     * </ul>
+     *
+     * @param tabletServers the tabletServers we want to add server tags.
+     * @param serverTag the server tag to be added.
+     */
+    CompletableFuture<Void> addServerTag(List<Integer> tabletServers, 
ServerTag serverTag);
+
+    /**
+     * Remove server tag from the specified tabletServers.
+     *
+     * <p>If one tabletServer failed removing tag, none of the tags will be 
removed.
+     *
+     * <ul>
+     *   <li>{@link AuthorizationException} If the authenticated user doesn't 
have cluster
+     *       permissions.
+     *   <li>{@link ServerNotExistException} If the tabletServer in {@code 
tabletServers} does not
+     *       exist.
+     *   <li>{@link ServerTagNotExistException} If the server tag does not 
exist for any one of the
+     *       tabletServers.
+     * </ul>
+     *
+     * @param tabletServers the tabletServers we want to remove server tags.
+     */
+    CompletableFuture<Void> removeServerTag(List<Integer> tabletServers, 
ServerTag serverTag);

Review Comment:
   Same here, we may not throw `ServerTagNotExistException` if the server 
already has no any server tag now. 



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -1188,6 +1192,49 @@ public void insertConfigChangeNotification() throws 
Exception {
                         
ZkData.ConfigEntityChangeNotificationSequenceZNode.encode());
     }
 
+    // 
--------------------------------------------------------------------------------------------
+    // Maintenance
+    // 
--------------------------------------------------------------------------------------------
+
+    public void registerServerTags(ServerTags newServerTags) throws Exception {
+        String path = ServerTagsZNode.path();
+        if (getOrEmpty(path).isPresent()) {
+            zkClient.setData().forPath(path, 
ServerTagsZNode.encode(newServerTags));
+        } else {
+            zkClient.create()
+                    .creatingParentsIfNeeded()
+                    .withMode(CreateMode.PERSISTENT)
+                    .forPath(path, ServerTagsZNode.encode(newServerTags));
+        }
+    }
+
+    public Optional<ServerTags> getServerTags() throws Exception {
+        String path = ServerTagsZNode.path();
+        return getOrEmpty(path).map(ServerTagsZNode::decode);
+    }
+
+    public void registerRebalancePlan(RebalancePlan rebalancePlan) throws 
Exception {
+        String path = RebalanceZNode.path();
+        if (getOrEmpty(path).isPresent()) {
+            zkClient.setData().forPath(path, 
RebalanceZNode.encode(rebalancePlan));
+        } else {
+            zkClient.create()
+                    .creatingParentsIfNeeded()
+                    .withMode(CreateMode.PERSISTENT)
+                    .forPath(path, RebalanceZNode.encode(rebalancePlan));
+        }
+    }
+
+    public Optional<RebalancePlan> getRebalancePlan() throws Exception {
+        String path = RebalanceZNode.path();
+        return getOrEmpty(path).map(RebalanceZNode::decode);
+    }
+
+    public void deleteRebalancePlan() throws Exception {

Review Comment:
   I doubt whether we should delete the rebalance node on ZK. If we delete it 
once the rebalance successfully finished, we don't have the ability to respond 
users the status of the rebalance plan.
   
   Maybe we can just soft delete the rebalance node by mark the rebalance plan 
is cancelled or failed or completed (the `RebalanceStatus`), so 
`listRebalanceProgress` still work after the rebalance finished, and can be 
overriden by next rebalance task. 



##########
fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cluster.rebalance;
+
+import org.apache.fluss.annotation.PublicEvolving;
+
+import java.util.Arrays;
+
+/**
+ * The type of goal to optimize.
+ *
+ * @since 0.8
+ */
+@PublicEvolving
+public enum GoalType {
+    /**
+     * Goal to generate replica movement tasks to ensure that the number of 
replicas on each
+     * tabletServer is near balanced.
+     */
+    REPLICA_DISTRIBUTION_GOAL(0),
+
+    /**
+     * Goal to generate leadership movement and leader replica movement tasks 
to ensure that the
+     * number of leader replicas on each tabletServer is near balanced.
+     */
+    LEADER_REPLICA_DISTRIBUTION_GOAL(1);

Review Comment:
   Simplify to `LEADER_DISTRIBUTION_GOAL(1);`?



##########
fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cluster.rebalance;
+
+import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.metadata.TableBucket;
+
+import java.util.List;
+
+/**
+ * Status of rebalance process for a tabletBucket.
+ *
+ * @since 0.8
+ */
+@PublicEvolving
+public class RebalanceResultForBucket {
+    private final RebalancePlanForBucket rebalancePlanForBucket;
+    private RebalanceStatusForBucket rebalanceStatusForBucket;
+
+    public RebalanceResultForBucket(
+            RebalancePlanForBucket rebalancePlanForBucket,
+            RebalanceStatusForBucket rebalanceStatusForBucket) {
+        this.rebalancePlanForBucket = rebalancePlanForBucket;
+        this.rebalanceStatusForBucket = rebalanceStatusForBucket;
+    }
+
+    public TableBucket tableBucket() {
+        return rebalancePlanForBucket.getTableBucket();
+    }
+
+    public RebalancePlanForBucket planForBucket() {

Review Comment:
   simplify to `plan()`.



##########
fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cluster.rebalance;
+
+import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.metadata.TableBucket;
+
+import java.util.List;
+
+/**
+ * Status of rebalance process for a tabletBucket.
+ *
+ * @since 0.8
+ */
+@PublicEvolving
+public class RebalanceResultForBucket {
+    private final RebalancePlanForBucket rebalancePlanForBucket;
+    private RebalanceStatusForBucket rebalanceStatusForBucket;
+
+    public RebalanceResultForBucket(
+            RebalancePlanForBucket rebalancePlanForBucket,
+            RebalanceStatusForBucket rebalanceStatusForBucket) {
+        this.rebalancePlanForBucket = rebalancePlanForBucket;
+        this.rebalanceStatusForBucket = rebalanceStatusForBucket;
+    }
+
+    public TableBucket tableBucket() {
+        return rebalancePlanForBucket.getTableBucket();
+    }
+
+    public RebalancePlanForBucket planForBucket() {
+        return rebalancePlanForBucket;
+    }
+
+    public List<Integer> newReplicas() {
+        return rebalancePlanForBucket.getNewReplicas();
+    }
+
+    public RebalanceResultForBucket setNewStatus(RebalanceStatusForBucket 
status) {

Review Comment:
   Remove this method and mark `rebalanceStatusForBucket` as `final`, users can 
re-create a new `RebalanceResultForBucket` to make the 
`RebalanceResultForBucket` instance immutable, this is helful to guarantee 
concurrency safety. 



##########
fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cluster.rebalance;
+
+import org.apache.fluss.annotation.PublicEvolving;
+
+import java.util.Arrays;
+
+/**
+ * The tag of tabletServer.
+ *
+ * @since 0.8
+ */
+@PublicEvolving
+public enum ServerTag {
+    PERMANENT_OFFLINE(0),
+    TEMPORARY_OFFLINE(1);

Review Comment:
   Add more javadoc descriptions for the enums, I saw you have these in the 
FIP. 
https://cwiki.apache.org/confluence/display/FLUSS/FIP-8%3A+Support+Cluster+Reblance



##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java:
##########
@@ -120,6 +130,22 @@ public interface AdminGateway extends AdminReadOnlyGateway 
{
     CompletableFuture<AlterClusterConfigsResponse> alterClusterConfigs(
             AlterClusterConfigsRequest request);
 
+    @RPC(api = ApiKeys.ADD_SERVER_TAG)
+    CompletableFuture<AddServerTagResponse> addServerTag(AddServerTagRequest 
request);
+
+    @RPC(api = ApiKeys.REMOVE_SERVER_TAG)
+    CompletableFuture<RemoveServerTagResponse> 
removeServerTag(RemoveServerTagRequest request);
+
+    @RPC(api = ApiKeys.REBALANCE)
+    CompletableFuture<RebalanceResponse> rebalance(RebalanceRequest request);
+
+    @RPC(api = ApiKeys.LIST_REBALANCE_PROCESS)
+    CompletableFuture<ListRebalanceProcessResponse> listRebalanceProcess(
+            ListRebalanceProcessRequest request);

Review Comment:
   ditto. rename to `listRebalanceProgress`



##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -955,3 +993,40 @@ message PbDescribeConfig {
   required string config_source = 3;
 }
 
+message PbRebalancePlanForTable {
+  required int64 table_id = 1;
+  repeated PbRebalancePlanForPartition partitions_plan = 2; // for 
none-partition table, this is empty
+  repeated PbRebalancePlanForBucket buckets_plan = 3; // for partition table, 
this is empty
+
+}
+
+message PbRebalancePlanForPartition {
+  required int64 partition_id = 1;
+  repeated PbRebalancePlanForBucket buckets_plan = 2;
+}
+
+message PbRebalancePlanForBucket {
+  required int32 bucket_id = 1;
+  optional int32 original_leader = 2;
+  optional int32 new_leader = 3;
+  repeated int32 original_replicas = 4 [packed = true];
+  repeated int32 new_replicas = 5 [packed = true];
+}
+
+message PbRebalanceProcessForTable {
+  required int64 table_id = 1;
+  repeated PbRebalanceProcessForPartition partitions_process = 2;
+  repeated PbRebalanceProcessForBucket buckets_process = 3;
+}
+
+message PbRebalanceProcessForPartition {
+  required int64 partition_id = 1;
+  repeated PbRebalanceProcessForBucket buckets_process = 2;
+}
+
+message PbRebalanceProcessForBucket {
+  required int32 bucket_id = 1;
+  repeated int32 original_replicas = 2 [packed = true];
+  repeated int32 new_replicas = 3 [packed = true];

Review Comment:
   Do we need the `optional_leader` and `new_leader`? If need, maybe we can 
just combine `PbRebalancePlanForBucket` and `rebalance_status` together as the 
`PbRebalanceProgressForBucket`?



##########
fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cluster.rebalance;
+
+import org.apache.fluss.annotation.PublicEvolving;
+
+/**
+ * Rebalance status for single bucket.
+ *
+ * @since 0.8

Review Comment:
   Please update all the `@since` to `0.9`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to