This is an automated email from the ASF dual-hosted git repository.

yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new a5e7e40d5 [hotfix] Fix the rebalance status cannot reach complete when 
the rebalance plan is empty (#2329)
a5e7e40d5 is described below

commit a5e7e40d525ad611d475e91242d78b82ce3c61e1
Author: yunhong <[email protected]>
AuthorDate: Thu Jan 8 21:00:08 2026 +0800

    [hotfix] Fix the rebalance status cannot reach complete when the rebalance 
plan is empty (#2329)
---
 .../coordinator/rebalance/RebalanceManager.java    |  28 +++--
 .../rebalance/RebalanceManagerTest.java            | 131 +++++++++++++++++++++
 2 files changed, 152 insertions(+), 7 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
index 1275388aa..9d45dc234 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
@@ -134,6 +134,11 @@ public class RebalanceManager {
         finishedRebalanceTasks.clear();
 
         currentRebalanceId = rebalanceId;
+        if (rebalancePlan.isEmpty()) {
+            completeRebalance();
+            return;
+        }
+
         rebalancePlan.forEach(
                 ((tableBucket, planForBucket) -> {
                     if (FINAL_STATUSES.contains(newStatus)) {
@@ -173,7 +178,6 @@ public class RebalanceManager {
 
             if (inProgressRebalanceTasksQueue.isEmpty()) {
                 // All rebalance tasks are completed.
-                rebalanceStatus = COMPLETED;
                 completeRebalance();
             } else {
                 // Trigger one rebalance task to execute.
@@ -309,18 +313,22 @@ public class RebalanceManager {
         checkNotClosed();
         try {
             Optional<RebalanceTask> rebalanceTaskOpt = 
zkClient.getRebalanceTask();
+            Map<TableBucket, RebalancePlanForBucket> bucketPlan;
             if (rebalanceTaskOpt.isPresent()) {
-                RebalanceTask rebalanceTask = rebalanceTaskOpt.get();
-                zkClient.registerRebalanceTask(
-                        new RebalanceTask(
-                                rebalanceTask.getRebalanceId(),
-                                COMPLETED,
-                                rebalanceTask.getExecutePlan()));
+                bucketPlan = rebalanceTaskOpt.get().getExecutePlan();
+            } else {
+                LOG.warn(
+                        "Rebalance task is empty in zk when complete 
rebalance. "
+                                + "It will be treated as no rebalance tasks.");
+                bucketPlan = new HashMap<>();
             }
+            zkClient.registerRebalanceTask(
+                    new RebalanceTask(currentRebalanceId, COMPLETED, 
bucketPlan));
         } catch (Exception e) {
             LOG.error("Error when update rebalance plan from zookeeper.", e);
         }
 
+        rebalanceStatus = COMPLETED;
         inProgressRebalanceTasks.clear();
         inProgressRebalanceTasksQueue.clear();
 
@@ -395,4 +403,10 @@ public class RebalanceManager {
     public ClusterModel buildClusterModel() {
         return buildClusterModel(eventProcessor.getCoordinatorContext());
     }
+
+    @VisibleForTesting
+    @Nullable
+    RebalanceStatus getRebalanceStatus() {
+        return rebalanceStatus;
+    }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java
new file mode 100644
index 000000000..fc36e3034
--- /dev/null
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance;
+
+import org.apache.fluss.cluster.rebalance.RebalanceStatus;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.server.coordinator.AutoPartitionManager;
+import org.apache.fluss.server.coordinator.CoordinatorContext;
+import org.apache.fluss.server.coordinator.CoordinatorEventProcessor;
+import org.apache.fluss.server.coordinator.LakeCatalogDynamicLoader;
+import org.apache.fluss.server.coordinator.LakeTableTieringManager;
+import org.apache.fluss.server.coordinator.MetadataManager;
+import org.apache.fluss.server.coordinator.TestCoordinatorChannelManager;
+import org.apache.fluss.server.metadata.CoordinatorMetadataCache;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
+import org.apache.fluss.server.zk.NOPErrorHandler;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.ZooKeeperExtension;
+import org.apache.fluss.server.zk.data.RebalanceTask;
+import org.apache.fluss.testutils.common.AllCallbackWrapper;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.HashMap;
+import java.util.concurrent.Executors;
+
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NOT_STARTED;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RebalanceManager}. */
+public class RebalanceManagerTest {
+
+    @RegisterExtension
+    public static final AllCallbackWrapper<ZooKeeperExtension> 
ZOO_KEEPER_EXTENSION_WRAPPER =
+            new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+    private static ZooKeeperClient zookeeperClient;
+    private static MetadataManager metadataManager;
+
+    private CoordinatorMetadataCache serverMetadataCache;
+    private TestCoordinatorChannelManager testCoordinatorChannelManager;
+    private AutoPartitionManager autoPartitionManager;
+    private LakeTableTieringManager lakeTableTieringManager;
+    private RebalanceManager rebalanceManager;
+
+    @BeforeAll
+    static void baseBeforeAll() {
+        zookeeperClient =
+                ZOO_KEEPER_EXTENSION_WRAPPER
+                        .getCustomExtension()
+                        .getZooKeeperClient(NOPErrorHandler.INSTANCE);
+    }
+
+    @BeforeEach
+    void beforeEach() {
+        serverMetadataCache = new CoordinatorMetadataCache();
+        testCoordinatorChannelManager = new TestCoordinatorChannelManager();
+        autoPartitionManager =
+                new AutoPartitionManager(serverMetadataCache, metadataManager, 
new Configuration());
+        lakeTableTieringManager = new LakeTableTieringManager();
+        CoordinatorEventProcessor eventProcessor = 
buildCoordinatorEventProcessor();
+        rebalanceManager = new RebalanceManager(eventProcessor, 
zookeeperClient);
+        rebalanceManager.startup();
+    }
+
+    @AfterEach
+    void afterEach() throws Exception {
+        zookeeperClient.deleteRebalanceTask();
+        metadataManager =
+                new MetadataManager(
+                        zookeeperClient,
+                        new Configuration(),
+                        new LakeCatalogDynamicLoader(new Configuration(), 
null, true));
+    }
+
+    @Test
+    void testRebalanceWithoutTask() throws Exception {
+        assertThat(rebalanceManager.getRebalanceId()).isNull();
+        assertThat(rebalanceManager.getRebalanceStatus()).isNull();
+
+        String rebalanceId = "test-rebalance-id";
+        RebalanceTask rebalanceTask = new RebalanceTask(rebalanceId, 
NOT_STARTED, new HashMap<>());
+        zookeeperClient.registerRebalanceTask(rebalanceTask);
+        assertThat(zookeeperClient.getRebalanceTask()).hasValue(rebalanceTask);
+
+        // register a rebalance task with empty plan.
+        rebalanceManager.registerRebalance(rebalanceId, new HashMap<>(), 
NOT_STARTED);
+
+        assertThat(rebalanceManager.getRebalanceId()).isEqualTo(rebalanceId);
+        RebalanceStatus status = rebalanceManager.getRebalanceStatus();
+        assertThat(status).isNotNull();
+        assertThat(status).isEqualTo(COMPLETED);
+        assertThat(zookeeperClient.getRebalanceTask())
+                .hasValue(new RebalanceTask(rebalanceId, COMPLETED, new 
HashMap<>()));
+    }
+
+    private CoordinatorEventProcessor buildCoordinatorEventProcessor() {
+        return new CoordinatorEventProcessor(
+                zookeeperClient,
+                serverMetadataCache,
+                testCoordinatorChannelManager,
+                new CoordinatorContext(),
+                autoPartitionManager,
+                lakeTableTieringManager,
+                TestingMetricGroups.COORDINATOR_METRICS,
+                new Configuration(),
+                Executors.newFixedThreadPool(1, new 
ExecutorThreadFactory("test-coordinator-io")),
+                metadataManager);
+    }
+}

Reply via email to