wuchong commented on code in PR #1380:
URL: https://github.com/apache/fluss/pull/1380#discussion_r2649536974
##########
fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java:
##########
@@ -492,4 +503,90 @@ ListOffsetsResult listOffsets(
* @return A CompletableFuture indicating completion of the operation.
*/
CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig>
configs);
+
+ /**
+ * Add server tag to the specified tabletServers, one tabletServer can
only have one serverTag.
+ *
+ * <p>If one tabletServer failed adding tag, none of the tags will take
effect.
+ *
+ * <ul>
+ * <li>{@link AuthorizationException} If the authenticated user doesn't
have cluster
+ * permissions.
+ * <li>{@link ServerNotExistException} If the tabletServer in {@code
tabletServers} does not
+ * exist.
+ * <li>{@link ServerTagAlreadyExistException} If the server tag already
exists for any one of
+ * the tabletServers.
+ * </ul>
+ *
+ * @param tabletServers the tabletServers we want to add server tags.
+ * @param serverTag the server tag to be added.
+ */
+ CompletableFuture<Void> addServerTag(List<Integer> tabletServers,
ServerTag serverTag);
+
+ /**
+ * Remove server tag from the specified tabletServers.
+ *
+ * <p>If one tabletServer failed removing tag, none of the tags will be
removed.
+ *
+ * <ul>
+ * <li>{@link AuthorizationException} If the authenticated user doesn't
have cluster
+ * permissions.
+ * <li>{@link ServerNotExistException} If the tabletServer in {@code
tabletServers} does not
+ * exist.
+ * <li>{@link ServerTagNotExistException} If the server tag does not
exist for any one of the
+ * tabletServers.
+ * </ul>
+ *
+ * @param tabletServers the tabletServers we want to remove server tags.
+ */
+ CompletableFuture<Void> removeServerTag(List<Integer> tabletServers,
ServerTag serverTag);
+
+ /**
+ * Based on the provided {@code priorityGoals}, Fluss performs load
balancing on the cluster's
+ * bucket load.
+ *
+ * <p>More details, Fluss collects the cluster's load information and
optimizes to perform load
+ * balancing according to the user-defined {@code priorityGoals}.
+ *
+ * <p>Currently, Fluss only supports one active rebalance task in the
cluster. If an uncompleted
+ * rebalance task exists, an {@link RebalanceFailureException} will be
thrown.
+ *
+ * <ul>
+ * <li>{@link AuthorizationException} If the authenticated user doesn't
have cluster
+ * permissions.
+ * <li>{@link RebalanceFailureException} If the rebalance failed. Such
as there is an ongoing
+ * execution.
+ * </ul>
+ *
+ * @param priorityGoals the goals to be optimized.
+ * @param dryRun Calculate and return the rebalance optimization proposal,
but do not execute
+ * it.
+ * @return the generated rebalance plan for all the tableBuckets which
need to do rebalance.
+ */
+ CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
+ List<GoalType> priorityGoals, boolean dryRun);
Review Comment:
Could you update the return signature to `CompletableFuture<RebalancePlan>`
and the `RebalancePlan` contains the `Map<TableBucket,
RebalancePlanForBucket>`, this allows us to evolve the return result easier in
the future.
##########
fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java:
##########
@@ -492,4 +503,90 @@ ListOffsetsResult listOffsets(
* @return A CompletableFuture indicating completion of the operation.
*/
CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig>
configs);
+
+ /**
+ * Add server tag to the specified tabletServers, one tabletServer can
only have one serverTag.
+ *
+ * <p>If one tabletServer failed adding tag, none of the tags will take
effect.
+ *
+ * <ul>
+ * <li>{@link AuthorizationException} If the authenticated user doesn't
have cluster
+ * permissions.
+ * <li>{@link ServerNotExistException} If the tabletServer in {@code
tabletServers} does not
+ * exist.
+ * <li>{@link ServerTagAlreadyExistException} If the server tag already
exists for any one of
+ * the tabletServers.
+ * </ul>
+ *
+ * @param tabletServers the tabletServers we want to add server tags.
+ * @param serverTag the server tag to be added.
+ */
+ CompletableFuture<Void> addServerTag(List<Integer> tabletServers,
ServerTag serverTag);
+
+ /**
+ * Remove server tag from the specified tabletServers.
+ *
+ * <p>If one tabletServer failed removing tag, none of the tags will be
removed.
+ *
+ * <ul>
+ * <li>{@link AuthorizationException} If the authenticated user doesn't
have cluster
+ * permissions.
+ * <li>{@link ServerNotExistException} If the tabletServer in {@code
tabletServers} does not
+ * exist.
+ * <li>{@link ServerTagNotExistException} If the server tag does not
exist for any one of the
+ * tabletServers.
+ * </ul>
+ *
+ * @param tabletServers the tabletServers we want to remove server tags.
+ */
+ CompletableFuture<Void> removeServerTag(List<Integer> tabletServers,
ServerTag serverTag);
+
+ /**
+ * Based on the provided {@code priorityGoals}, Fluss performs load
balancing on the cluster's
+ * bucket load.
+ *
+ * <p>More details, Fluss collects the cluster's load information and
optimizes to perform load
+ * balancing according to the user-defined {@code priorityGoals}.
+ *
+ * <p>Currently, Fluss only supports one active rebalance task in the
cluster. If an uncompleted
+ * rebalance task exists, an {@link RebalanceFailureException} will be
thrown.
+ *
+ * <ul>
+ * <li>{@link AuthorizationException} If the authenticated user doesn't
have cluster
+ * permissions.
+ * <li>{@link RebalanceFailureException} If the rebalance failed. Such
as there is an ongoing
+ * execution.
+ * </ul>
+ *
+ * @param priorityGoals the goals to be optimized.
+ * @param dryRun Calculate and return the rebalance optimization proposal,
but do not execute
+ * it.
+ * @return the generated rebalance plan for all the tableBuckets which
need to do rebalance.
+ */
+ CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
+ List<GoalType> priorityGoals, boolean dryRun);
+
+ /**
+ * List the rebalance process.
+ *
+ * <ul>
+ * <li>{@link AuthorizationException} If the authenticated user doesn't
have cluster
+ * permissions.
+ * <li>{@link NoRebalanceInProgressException} If there are no rebalance
tasks in progress.
+ * </ul>
+ *
+ * @return the rebalance process for all the tableBuckets doing rebalance.
+ */
+ CompletableFuture<Map<TableBucket, RebalanceResultForBucket>>
listRebalanceProcess();
Review Comment:
Changed from `listRebalanceProcess` to `listRebalanceProgress`.
Because in distributed systems, `Process` often refers to an OS-level
execution or a long-running daemon. `Status` or `Progress` accurately reflects
that this is a query for the current state of a task.
Additionally, we can improve the return type to:
```java
CompletableFuture<RebalanceProgress>
```
where `RebalanceProgress` includes:
- A `Map<TableBucket, RebalanceResultForBucket>` detailing per-bucket
results,
- A top-level `status` field (e.g., using an enum like
`RebalanceOverallStatus`),
- A `progress` field (e.g., a `double` between 0.0 and 1.0) representing the
overall completion percentage.
(both `status` and `progress` can be calculated in client side, not
necessary coded in RPC)
This structure provides both granular visibility and a high-level summary,
making it easier for clients to monitor and react to rebalancing activity.
##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -955,3 +993,40 @@ message PbDescribeConfig {
required string config_source = 3;
}
+message PbRebalancePlanForTable {
+ required int64 table_id = 1;
+ repeated PbRebalancePlanForPartition partitions_plan = 2; // for
none-partition table, this is empty
+ repeated PbRebalancePlanForBucket buckets_plan = 3; // for partition table,
this is empty
+
+}
+
+message PbRebalancePlanForPartition {
+ required int64 partition_id = 1;
+ repeated PbRebalancePlanForBucket buckets_plan = 2;
+}
+
+message PbRebalancePlanForBucket {
+ required int32 bucket_id = 1;
+ optional int32 original_leader = 2;
+ optional int32 new_leader = 3;
+ repeated int32 original_replicas = 4 [packed = true];
+ repeated int32 new_replicas = 5 [packed = true];
+}
+
+message PbRebalanceProcessForTable {
Review Comment:
`PbRebalanceProgressForTable`
##########
fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java:
##########
@@ -492,4 +503,90 @@ ListOffsetsResult listOffsets(
* @return A CompletableFuture indicating completion of the operation.
*/
CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig>
configs);
+
+ /**
+ * Add server tag to the specified tabletServers, one tabletServer can
only have one serverTag.
+ *
+ * <p>If one tabletServer failed adding tag, none of the tags will take
effect.
+ *
+ * <ul>
+ * <li>{@link AuthorizationException} If the authenticated user doesn't
have cluster
+ * permissions.
+ * <li>{@link ServerNotExistException} If the tabletServer in {@code
tabletServers} does not
+ * exist.
+ * <li>{@link ServerTagAlreadyExistException} If the server tag already
exists for any one of
Review Comment:
The method `addServerTag` should be **idempotent**: if some servers already
have the given tag, it should succeed without error. Only when a server already
has a **different** server tag (i.e., not equal to the one being set) should it
throw `ServerTagAlreadyExistException`. Could you update the javadoc here?
##########
fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cluster.rebalance;
+
+import org.apache.fluss.annotation.PublicEvolving;
+
+/**
+ * Rebalance status for single bucket.
+ *
+ * @since 0.8
+ */
+@PublicEvolving
+public enum RebalanceStatusForBucket {
Review Comment:
We can rename this to `RebalanceStatus`, so we can use it as a top-level
status of rebalance.
##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -955,3 +993,40 @@ message PbDescribeConfig {
required string config_source = 3;
}
+message PbRebalancePlanForTable {
+ required int64 table_id = 1;
+ repeated PbRebalancePlanForPartition partitions_plan = 2; // for
none-partition table, this is empty
+ repeated PbRebalancePlanForBucket buckets_plan = 3; // for partition table,
this is empty
Review Comment:
I think this makes the RPC message complex, maybe we can follow other RPC
messages that omit the `PbRebalancePlanForPartition` and adding an `optional
int64 partition_id` into `PbRebalancePlanForBucket`?
The same to `PbRebalanceProcessForTable`.
##########
fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java:
##########
@@ -492,4 +503,90 @@ ListOffsetsResult listOffsets(
* @return A CompletableFuture indicating completion of the operation.
*/
CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig>
configs);
+
+ /**
+ * Add server tag to the specified tabletServers, one tabletServer can
only have one serverTag.
+ *
+ * <p>If one tabletServer failed adding tag, none of the tags will take
effect.
+ *
+ * <ul>
+ * <li>{@link AuthorizationException} If the authenticated user doesn't
have cluster
+ * permissions.
+ * <li>{@link ServerNotExistException} If the tabletServer in {@code
tabletServers} does not
+ * exist.
+ * <li>{@link ServerTagAlreadyExistException} If the server tag already
exists for any one of
+ * the tabletServers.
+ * </ul>
+ *
+ * @param tabletServers the tabletServers we want to add server tags.
+ * @param serverTag the server tag to be added.
+ */
+ CompletableFuture<Void> addServerTag(List<Integer> tabletServers,
ServerTag serverTag);
+
+ /**
+ * Remove server tag from the specified tabletServers.
+ *
+ * <p>If one tabletServer failed removing tag, none of the tags will be
removed.
+ *
+ * <ul>
+ * <li>{@link AuthorizationException} If the authenticated user doesn't
have cluster
+ * permissions.
+ * <li>{@link ServerNotExistException} If the tabletServer in {@code
tabletServers} does not
+ * exist.
+ * <li>{@link ServerTagNotExistException} If the server tag does not
exist for any one of the
+ * tabletServers.
+ * </ul>
+ *
+ * @param tabletServers the tabletServers we want to remove server tags.
+ */
+ CompletableFuture<Void> removeServerTag(List<Integer> tabletServers,
ServerTag serverTag);
Review Comment:
Same here, we may not throw `ServerTagNotExistException` if the server
already has no any server tag now.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -1188,6 +1192,49 @@ public void insertConfigChangeNotification() throws
Exception {
ZkData.ConfigEntityChangeNotificationSequenceZNode.encode());
}
+ //
--------------------------------------------------------------------------------------------
+ // Maintenance
+ //
--------------------------------------------------------------------------------------------
+
+ public void registerServerTags(ServerTags newServerTags) throws Exception {
+ String path = ServerTagsZNode.path();
+ if (getOrEmpty(path).isPresent()) {
+ zkClient.setData().forPath(path,
ServerTagsZNode.encode(newServerTags));
+ } else {
+ zkClient.create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(path, ServerTagsZNode.encode(newServerTags));
+ }
+ }
+
+ public Optional<ServerTags> getServerTags() throws Exception {
+ String path = ServerTagsZNode.path();
+ return getOrEmpty(path).map(ServerTagsZNode::decode);
+ }
+
+ public void registerRebalancePlan(RebalancePlan rebalancePlan) throws
Exception {
+ String path = RebalanceZNode.path();
+ if (getOrEmpty(path).isPresent()) {
+ zkClient.setData().forPath(path,
RebalanceZNode.encode(rebalancePlan));
+ } else {
+ zkClient.create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(path, RebalanceZNode.encode(rebalancePlan));
+ }
+ }
+
+ public Optional<RebalancePlan> getRebalancePlan() throws Exception {
+ String path = RebalanceZNode.path();
+ return getOrEmpty(path).map(RebalanceZNode::decode);
+ }
+
+ public void deleteRebalancePlan() throws Exception {
Review Comment:
I doubt whether we should delete the rebalance node on ZK. If we delete it
once the rebalance successfully finished, we don't have the ability to respond
users the status of the rebalance plan.
Maybe we can just soft delete the rebalance node by mark the rebalance plan
is cancelled or failed or completed (the `RebalanceStatus`), so
`listRebalanceProgress` still work after the rebalance finished, and can be
overriden by next rebalance task.
##########
fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cluster.rebalance;
+
+import org.apache.fluss.annotation.PublicEvolving;
+
+import java.util.Arrays;
+
+/**
+ * The type of goal to optimize.
+ *
+ * @since 0.8
+ */
+@PublicEvolving
+public enum GoalType {
+ /**
+ * Goal to generate replica movement tasks to ensure that the number of
replicas on each
+ * tabletServer is near balanced.
+ */
+ REPLICA_DISTRIBUTION_GOAL(0),
+
+ /**
+ * Goal to generate leadership movement and leader replica movement tasks
to ensure that the
+ * number of leader replicas on each tabletServer is near balanced.
+ */
+ LEADER_REPLICA_DISTRIBUTION_GOAL(1);
Review Comment:
Simplify to `LEADER_DISTRIBUTION_GOAL(1);`?
##########
fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cluster.rebalance;
+
+import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.metadata.TableBucket;
+
+import java.util.List;
+
+/**
+ * Status of rebalance process for a tabletBucket.
+ *
+ * @since 0.8
+ */
+@PublicEvolving
+public class RebalanceResultForBucket {
+ private final RebalancePlanForBucket rebalancePlanForBucket;
+ private RebalanceStatusForBucket rebalanceStatusForBucket;
+
+ public RebalanceResultForBucket(
+ RebalancePlanForBucket rebalancePlanForBucket,
+ RebalanceStatusForBucket rebalanceStatusForBucket) {
+ this.rebalancePlanForBucket = rebalancePlanForBucket;
+ this.rebalanceStatusForBucket = rebalanceStatusForBucket;
+ }
+
+ public TableBucket tableBucket() {
+ return rebalancePlanForBucket.getTableBucket();
+ }
+
+ public RebalancePlanForBucket planForBucket() {
Review Comment:
simplify to `plan()`.
##########
fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cluster.rebalance;
+
+import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.metadata.TableBucket;
+
+import java.util.List;
+
+/**
+ * Status of rebalance process for a tabletBucket.
+ *
+ * @since 0.8
+ */
+@PublicEvolving
+public class RebalanceResultForBucket {
+ private final RebalancePlanForBucket rebalancePlanForBucket;
+ private RebalanceStatusForBucket rebalanceStatusForBucket;
+
+ public RebalanceResultForBucket(
+ RebalancePlanForBucket rebalancePlanForBucket,
+ RebalanceStatusForBucket rebalanceStatusForBucket) {
+ this.rebalancePlanForBucket = rebalancePlanForBucket;
+ this.rebalanceStatusForBucket = rebalanceStatusForBucket;
+ }
+
+ public TableBucket tableBucket() {
+ return rebalancePlanForBucket.getTableBucket();
+ }
+
+ public RebalancePlanForBucket planForBucket() {
+ return rebalancePlanForBucket;
+ }
+
+ public List<Integer> newReplicas() {
+ return rebalancePlanForBucket.getNewReplicas();
+ }
+
+ public RebalanceResultForBucket setNewStatus(RebalanceStatusForBucket
status) {
Review Comment:
Remove this method and mark `rebalanceStatusForBucket` as `final`, users can
re-create a new `RebalanceResultForBucket` to make the
`RebalanceResultForBucket` instance immutable, this is helful to guarantee
concurrency safety.
##########
fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cluster.rebalance;
+
+import org.apache.fluss.annotation.PublicEvolving;
+
+import java.util.Arrays;
+
+/**
+ * The tag of tabletServer.
+ *
+ * @since 0.8
+ */
+@PublicEvolving
+public enum ServerTag {
+ PERMANENT_OFFLINE(0),
+ TEMPORARY_OFFLINE(1);
Review Comment:
Add more javadoc descriptions for the enums, I saw you have these in the
FIP.
https://cwiki.apache.org/confluence/display/FLUSS/FIP-8%3A+Support+Cluster+Reblance
##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java:
##########
@@ -120,6 +130,22 @@ public interface AdminGateway extends AdminReadOnlyGateway
{
CompletableFuture<AlterClusterConfigsResponse> alterClusterConfigs(
AlterClusterConfigsRequest request);
+ @RPC(api = ApiKeys.ADD_SERVER_TAG)
+ CompletableFuture<AddServerTagResponse> addServerTag(AddServerTagRequest
request);
+
+ @RPC(api = ApiKeys.REMOVE_SERVER_TAG)
+ CompletableFuture<RemoveServerTagResponse>
removeServerTag(RemoveServerTagRequest request);
+
+ @RPC(api = ApiKeys.REBALANCE)
+ CompletableFuture<RebalanceResponse> rebalance(RebalanceRequest request);
+
+ @RPC(api = ApiKeys.LIST_REBALANCE_PROCESS)
+ CompletableFuture<ListRebalanceProcessResponse> listRebalanceProcess(
+ ListRebalanceProcessRequest request);
Review Comment:
ditto. rename to `listRebalanceProgress`
##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -955,3 +993,40 @@ message PbDescribeConfig {
required string config_source = 3;
}
+message PbRebalancePlanForTable {
+ required int64 table_id = 1;
+ repeated PbRebalancePlanForPartition partitions_plan = 2; // for
none-partition table, this is empty
+ repeated PbRebalancePlanForBucket buckets_plan = 3; // for partition table,
this is empty
+
+}
+
+message PbRebalancePlanForPartition {
+ required int64 partition_id = 1;
+ repeated PbRebalancePlanForBucket buckets_plan = 2;
+}
+
+message PbRebalancePlanForBucket {
+ required int32 bucket_id = 1;
+ optional int32 original_leader = 2;
+ optional int32 new_leader = 3;
+ repeated int32 original_replicas = 4 [packed = true];
+ repeated int32 new_replicas = 5 [packed = true];
+}
+
+message PbRebalanceProcessForTable {
+ required int64 table_id = 1;
+ repeated PbRebalanceProcessForPartition partitions_process = 2;
+ repeated PbRebalanceProcessForBucket buckets_process = 3;
+}
+
+message PbRebalanceProcessForPartition {
+ required int64 partition_id = 1;
+ repeated PbRebalanceProcessForBucket buckets_process = 2;
+}
+
+message PbRebalanceProcessForBucket {
+ required int32 bucket_id = 1;
+ repeated int32 original_replicas = 2 [packed = true];
+ repeated int32 new_replicas = 3 [packed = true];
Review Comment:
Do we need the `optional_leader` and `new_leader`? If need, maybe we can
just combine `PbRebalancePlanForBucket` and `rebalance_status` together as the
`PbRebalanceProgressForBucket`?
##########
fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.cluster.rebalance;
+
+import org.apache.fluss.annotation.PublicEvolving;
+
+/**
+ * Rebalance status for single bucket.
+ *
+ * @since 0.8
Review Comment:
Please update all the `@since` to `0.9`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]