This is an automated email from the ASF dual-hosted git repository.

yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 194ed4fd2 [FLUSS-2473][server][test] Fix flaky test 
FlussAuthorizationITCase.testRebalance (#2487)
194ed4fd2 is described below

commit 194ed4fd29bce09a02ec3010529a14b94366e74c
Author: Zübeyir Eser <[email protected]>
AuthorDate: Thu Jan 29 03:14:34 2026 +0100

    [FLUSS-2473][server][test] Fix flaky test 
FlussAuthorizationITCase.testRebalance (#2487)
---
 .../security/acl/FlussAuthorizationITCase.java     | 38 ++++++++++++++++++++++
 .../coordinator/rebalance/RebalanceManager.java    |  5 +++
 2 files changed, 43 insertions(+)

diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
index 89c64e5ca..fdb5e3015 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
@@ -102,6 +102,7 @@ import static 
org.apache.fluss.security.acl.OperationType.READ;
 import static org.apache.fluss.testutils.DataTestUtils.row;
 import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assertions.catchThrowable;
 
@@ -1022,6 +1023,43 @@ public class FlussAuthorizationITCase {
         guestAdmin.cancelRebalance(null).get();
     }
 
+    @Test
+    void testRebalanceDuringConcurrentTableCreation() throws Exception {
+        // Setup WRITE permission on the cluster for the guest user to allow 
rebalance operations.
+        rootAdmin
+                .createAcls(
+                        Collections.singletonList(
+                                new AclBinding(
+                                        Resource.cluster(),
+                                        new AccessControlEntry(
+                                                guestPrincipal,
+                                                WILD_CARD_HOST,
+                                                OperationType.WRITE,
+                                                PermissionType.ALLOW))))
+                .all()
+                .get();
+
+        // Run multiple iterations to catch potential race conditions between
+        // table creation events and rebalance plan generation.
+        // Locally verified with 50+ iterations without failures.
+        for (int i = 0; i < 5; i++) {
+            TablePath transientTable = TablePath.of("test_db_1", 
"transient_rebalance_table_" + i);
+
+            // Trigger table creation. We do not wait for the table to be 
"ready"
+            // to maximize the chance of the rebalancer encountering transient 
metadata.
+            rootAdmin.createTable(transientTable, DATA1_TABLE_DESCRIPTOR_PK, 
false);
+
+            // Attempt to rebalance the cluster.
+            // This verifies that the rebalance operation is robust against 
transient table states
+            // (e.g., leader elected but not yet present in the assignment 
list) and does not fail.
+            assertThatCode(() -> 
guestAdmin.rebalance(Collections.emptyList()).get())
+                    .doesNotThrowAnyException();
+
+            // Cleanup the table for the next iteration.
+            rootAdmin.dropTable(transientTable, true).get();
+        }
+    }
+
     // ------------------------------------------------------------------------
     //  Producer Offsets Authorization Tests
     // ------------------------------------------------------------------------
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
index 9d45dc234..30cf40bd3 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
@@ -365,6 +365,11 @@ public class RebalanceManager {
             checkArgument(bucketLeaderAndIsrOpt.isPresent(), "Bucket leader 
and isr is empty.");
             LeaderAndIsr isr = bucketLeaderAndIsrOpt.get();
             int leader = isr.leader();
+            // Skip the bucket if it is in a transient state (e.g., during 
table creation)
+            // where the leader is elected but not yet present in the 
assignment list.
+            if (leader == -1 || !assignment.contains(leader)) {
+                continue;
+            }
             for (int i = 0; i < assignment.size(); i++) {
                 int replica = assignment.get(i);
                 clusterModel.createReplica(replica, tableBucket, i, leader == 
replica);

Reply via email to