This is an automated email from the ASF dual-hosted git repository.
yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new a5e7e40d5 [hotfix] Fix the rebalance status cannot reach complete when
the rebalance plan is empty (#2329)
a5e7e40d5 is described below
commit a5e7e40d525ad611d475e91242d78b82ce3c61e1
Author: yunhong <[email protected]>
AuthorDate: Thu Jan 8 21:00:08 2026 +0800
[hotfix] Fix the rebalance status cannot reach complete when the rebalance
plan is empty (#2329)
---
.../coordinator/rebalance/RebalanceManager.java | 28 +++--
.../rebalance/RebalanceManagerTest.java | 131 +++++++++++++++++++++
2 files changed, 152 insertions(+), 7 deletions(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
index 1275388aa..9d45dc234 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
@@ -134,6 +134,11 @@ public class RebalanceManager {
finishedRebalanceTasks.clear();
currentRebalanceId = rebalanceId;
+ if (rebalancePlan.isEmpty()) {
+ completeRebalance();
+ return;
+ }
+
rebalancePlan.forEach(
((tableBucket, planForBucket) -> {
if (FINAL_STATUSES.contains(newStatus)) {
@@ -173,7 +178,6 @@ public class RebalanceManager {
if (inProgressRebalanceTasksQueue.isEmpty()) {
// All rebalance tasks are completed.
- rebalanceStatus = COMPLETED;
completeRebalance();
} else {
// Trigger one rebalance task to execute.
@@ -309,18 +313,22 @@ public class RebalanceManager {
checkNotClosed();
try {
Optional<RebalanceTask> rebalanceTaskOpt =
zkClient.getRebalanceTask();
+ Map<TableBucket, RebalancePlanForBucket> bucketPlan;
if (rebalanceTaskOpt.isPresent()) {
- RebalanceTask rebalanceTask = rebalanceTaskOpt.get();
- zkClient.registerRebalanceTask(
- new RebalanceTask(
- rebalanceTask.getRebalanceId(),
- COMPLETED,
- rebalanceTask.getExecutePlan()));
+ bucketPlan = rebalanceTaskOpt.get().getExecutePlan();
+ } else {
+ LOG.warn(
+ "Rebalance task is empty in zk when complete
rebalance. "
+ + "It will be treated as no rebalance tasks.");
+ bucketPlan = new HashMap<>();
}
+ zkClient.registerRebalanceTask(
+ new RebalanceTask(currentRebalanceId, COMPLETED,
bucketPlan));
} catch (Exception e) {
LOG.error("Error when update rebalance plan from zookeeper.", e);
}
+ rebalanceStatus = COMPLETED;
inProgressRebalanceTasks.clear();
inProgressRebalanceTasksQueue.clear();
@@ -395,4 +403,10 @@ public class RebalanceManager {
public ClusterModel buildClusterModel() {
return buildClusterModel(eventProcessor.getCoordinatorContext());
}
+
+ @VisibleForTesting
+ @Nullable
+ RebalanceStatus getRebalanceStatus() {
+ return rebalanceStatus;
+ }
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java
new file mode 100644
index 000000000..fc36e3034
--- /dev/null
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance;
+
+import org.apache.fluss.cluster.rebalance.RebalanceStatus;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.server.coordinator.AutoPartitionManager;
+import org.apache.fluss.server.coordinator.CoordinatorContext;
+import org.apache.fluss.server.coordinator.CoordinatorEventProcessor;
+import org.apache.fluss.server.coordinator.LakeCatalogDynamicLoader;
+import org.apache.fluss.server.coordinator.LakeTableTieringManager;
+import org.apache.fluss.server.coordinator.MetadataManager;
+import org.apache.fluss.server.coordinator.TestCoordinatorChannelManager;
+import org.apache.fluss.server.metadata.CoordinatorMetadataCache;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
+import org.apache.fluss.server.zk.NOPErrorHandler;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.ZooKeeperExtension;
+import org.apache.fluss.server.zk.data.RebalanceTask;
+import org.apache.fluss.testutils.common.AllCallbackWrapper;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.HashMap;
+import java.util.concurrent.Executors;
+
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NOT_STARTED;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RebalanceManager}. */
+public class RebalanceManagerTest {
+
+ @RegisterExtension
+ public static final AllCallbackWrapper<ZooKeeperExtension>
ZOO_KEEPER_EXTENSION_WRAPPER =
+ new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+ private static ZooKeeperClient zookeeperClient;
+ private static MetadataManager metadataManager;
+
+ private CoordinatorMetadataCache serverMetadataCache;
+ private TestCoordinatorChannelManager testCoordinatorChannelManager;
+ private AutoPartitionManager autoPartitionManager;
+ private LakeTableTieringManager lakeTableTieringManager;
+ private RebalanceManager rebalanceManager;
+
+ @BeforeAll
+ static void baseBeforeAll() {
+ zookeeperClient =
+ ZOO_KEEPER_EXTENSION_WRAPPER
+ .getCustomExtension()
+ .getZooKeeperClient(NOPErrorHandler.INSTANCE);
+ }
+
+ @BeforeEach
+ void beforeEach() {
+ serverMetadataCache = new CoordinatorMetadataCache();
+ testCoordinatorChannelManager = new TestCoordinatorChannelManager();
+ autoPartitionManager =
+ new AutoPartitionManager(serverMetadataCache, metadataManager,
new Configuration());
+ lakeTableTieringManager = new LakeTableTieringManager();
+ CoordinatorEventProcessor eventProcessor =
buildCoordinatorEventProcessor();
+ rebalanceManager = new RebalanceManager(eventProcessor,
zookeeperClient);
+ rebalanceManager.startup();
+ }
+
+ @AfterEach
+ void afterEach() throws Exception {
+ zookeeperClient.deleteRebalanceTask();
+ metadataManager =
+ new MetadataManager(
+ zookeeperClient,
+ new Configuration(),
+ new LakeCatalogDynamicLoader(new Configuration(),
null, true));
+ }
+
+ @Test
+ void testRebalanceWithoutTask() throws Exception {
+ assertThat(rebalanceManager.getRebalanceId()).isNull();
+ assertThat(rebalanceManager.getRebalanceStatus()).isNull();
+
+ String rebalanceId = "test-rebalance-id";
+ RebalanceTask rebalanceTask = new RebalanceTask(rebalanceId,
NOT_STARTED, new HashMap<>());
+ zookeeperClient.registerRebalanceTask(rebalanceTask);
+ assertThat(zookeeperClient.getRebalanceTask()).hasValue(rebalanceTask);
+
+ // register a rebalance task with empty plan.
+ rebalanceManager.registerRebalance(rebalanceId, new HashMap<>(),
NOT_STARTED);
+
+ assertThat(rebalanceManager.getRebalanceId()).isEqualTo(rebalanceId);
+ RebalanceStatus status = rebalanceManager.getRebalanceStatus();
+ assertThat(status).isNotNull();
+ assertThat(status).isEqualTo(COMPLETED);
+ assertThat(zookeeperClient.getRebalanceTask())
+ .hasValue(new RebalanceTask(rebalanceId, COMPLETED, new
HashMap<>()));
+ }
+
+ private CoordinatorEventProcessor buildCoordinatorEventProcessor() {
+ return new CoordinatorEventProcessor(
+ zookeeperClient,
+ serverMetadataCache,
+ testCoordinatorChannelManager,
+ new CoordinatorContext(),
+ autoPartitionManager,
+ lakeTableTieringManager,
+ TestingMetricGroups.COORDINATOR_METRICS,
+ new Configuration(),
+ Executors.newFixedThreadPool(1, new
ExecutorThreadFactory("test-coordinator-io")),
+ metadataManager);
+ }
+}