This is an automated email from the ASF dual-hosted git repository.
yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 194ed4fd2 [FLUSS-2473][server][test] Fix flaky test
FlussAuthorizationITCase.testRebalance (#2487)
194ed4fd2 is described below
commit 194ed4fd29bce09a02ec3010529a14b94366e74c
Author: Zübeyir Eser <[email protected]>
AuthorDate: Thu Jan 29 03:14:34 2026 +0100
[FLUSS-2473][server][test] Fix flaky test
FlussAuthorizationITCase.testRebalance (#2487)
---
.../security/acl/FlussAuthorizationITCase.java | 38 ++++++++++++++++++++++
.../coordinator/rebalance/RebalanceManager.java | 5 +++
2 files changed, 43 insertions(+)
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
index 89c64e5ca..fdb5e3015 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
@@ -102,6 +102,7 @@ import static
org.apache.fluss.security.acl.OperationType.READ;
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.catchThrowable;
@@ -1022,6 +1023,43 @@ public class FlussAuthorizationITCase {
guestAdmin.cancelRebalance(null).get();
}
+ @Test
+ void testRebalanceDuringConcurrentTableCreation() throws Exception {
+ // Setup WRITE permission on the cluster for the guest user to allow
rebalance operations.
+ rootAdmin
+ .createAcls(
+ Collections.singletonList(
+ new AclBinding(
+ Resource.cluster(),
+ new AccessControlEntry(
+ guestPrincipal,
+ WILD_CARD_HOST,
+ OperationType.WRITE,
+ PermissionType.ALLOW))))
+ .all()
+ .get();
+
+ // Run multiple iterations to catch potential race conditions between
+ // table creation events and rebalance plan generation.
+ // Locally verified with 50+ iterations without failures.
+ for (int i = 0; i < 5; i++) {
+ TablePath transientTable = TablePath.of("test_db_1",
"transient_rebalance_table_" + i);
+
+ // Trigger table creation. We do not wait for the table to be
"ready"
+ // to maximize the chance of the rebalancer encountering transient
metadata.
+ rootAdmin.createTable(transientTable, DATA1_TABLE_DESCRIPTOR_PK,
false);
+
+ // Attempt to rebalance the cluster.
+ // This verifies that the rebalance operation is robust against
transient table states
+ // (e.g., leader elected but not yet present in the assignment
list) and does not fail.
+ assertThatCode(() ->
guestAdmin.rebalance(Collections.emptyList()).get())
+ .doesNotThrowAnyException();
+
+ // Cleanup the table for the next iteration.
+ rootAdmin.dropTable(transientTable, true).get();
+ }
+ }
+
// ------------------------------------------------------------------------
// Producer Offsets Authorization Tests
// ------------------------------------------------------------------------
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
index 9d45dc234..30cf40bd3 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
@@ -365,6 +365,11 @@ public class RebalanceManager {
checkArgument(bucketLeaderAndIsrOpt.isPresent(), "Bucket leader
and isr is empty.");
LeaderAndIsr isr = bucketLeaderAndIsrOpt.get();
int leader = isr.leader();
+ // Skip the bucket if it is in a transient state (e.g., during
table creation)
+ // where the leader is elected but not yet present in the
assignment list.
+ if (leader == -1 || !assignment.contains(leader)) {
+ continue;
+ }
for (int i = 0; i < assignment.size(); i++) {
int replica = assignment.get(i);
clusterModel.createReplica(replica, tableBucket, i, leader ==
replica);