This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit fbdb7fa9a47aca88ab1bff84d1c1a7195812c16c
Author: Jark Wu <[email protected]>
AuthorDate: Sat Feb 28 10:36:07 2026 +0800

    [hotfix] Fix unstable test 
KvReplicaRestoreITCase.testRowCountRecoveryAfterFailover
---
 .../flink/source/BinlogVirtualTableITCase.java     | 16 ++++++++
 .../flink/source/ChangelogVirtualTableITCase.java  | 17 ++++++--
 .../server/replica/KvReplicaRestoreITCase.java     |  7 ++++
 .../server/testutils/FlussClusterExtension.java    | 45 +++++++++-------------
 .../server/testutils/RpcMessageTestUtils.java      | 16 ++++++++
 5 files changed, 71 insertions(+), 30 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java
index 824ac03fd..619553aca 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java
@@ -113,6 +113,22 @@ abstract class BinlogVirtualTableITCase {
         CLOCK.advanceTime(-CLOCK.milliseconds(), TimeUnit.MILLISECONDS);
     }
 
+    @AfterEach
+    protected void afterEach() throws Exception {
+        if (cluster != null) {
+            cluster.after();
+            cluster = null;
+        }
+        if (admin != null) {
+            admin.close();
+            admin = null;
+        }
+        if (conn != null) {
+            conn.close();
+            conn = null;
+        }
+    }
+
     // init table environment from savepointPath
     private StreamTableEnvironment initTableEnvironment(@Nullable String 
savepointPath) {
         org.apache.flink.configuration.Configuration conf =
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
index b5e27ed01..1248976f8 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
@@ -61,7 +61,6 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
 import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
-import static 
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
 import static org.apache.fluss.testutils.DataTestUtils.row;
 import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -112,9 +111,19 @@ abstract class ChangelogVirtualTableITCase {
     }
 
     @AfterEach
-    void after() throws Exception {
-        tEnv.useDatabase(BUILTIN_DATABASE);
-        tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB));
+    protected void afterEach() throws Exception {
+        if (cluster != null) {
+            cluster.after();
+            cluster = null;
+        }
+        if (admin != null) {
+            admin.close();
+            admin = null;
+        }
+        if (conn != null) {
+            conn.close();
+            conn = null;
+        }
     }
 
     // init table environment from savepointPath
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java
index 8ca1b8e27..9aa6142f3 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java
@@ -51,6 +51,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK;
 import static 
org.apache.fluss.server.testutils.KvTestUtils.assertLookupResponse;
 import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.createTable;
+import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.dropDatabase;
 import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.newLookupRequest;
 import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.newPutKvRequest;
 import static org.apache.fluss.testutils.DataTestUtils.genKvRecordBatch;
@@ -201,6 +202,12 @@ class KvReplicaRestoreITCase {
                             .get(),
                     keyValue.f1);
         }
+
+        // restart the failed server to make sure the cluster is healthy for 
other tests
+        dropDatabase(FLUSS_CLUSTER_EXTENSION, "test_db");
+        FLUSS_CLUSTER_EXTENSION.stopTabletServer(leaderServer);
+        FLUSS_CLUSTER_EXTENSION.startTabletServer(leaderServer);
+        FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(3);
     }
 
     @Test
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
index 90f28dd95..305a4269a 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
@@ -67,7 +67,6 @@ import org.apache.fluss.server.zk.data.PartitionAssignment;
 import org.apache.fluss.server.zk.data.RemoteLogManifestHandle;
 import org.apache.fluss.server.zk.data.TableAssignment;
 import org.apache.fluss.server.zk.data.TableRegistration;
-import org.apache.fluss.utils.FileUtils;
 import org.apache.fluss.utils.clock.Clock;
 import org.apache.fluss.utils.clock.SystemClock;
 
@@ -83,7 +82,6 @@ import javax.annotation.Nullable;
 
 import java.io.File;
 import java.nio.file.Files;
-import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -94,8 +92,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
+import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropDatabaseRequest;
+import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropTableRequest;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeNotifyBucketLeaderAndIsr;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeStopBucketReplica;
 import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toServerNode;
@@ -179,33 +180,25 @@ public final class FlussClusterExtension
 
     @Override
     public void afterEach(ExtensionContext extensionContext) throws Exception {
-        String defaultDb = BUILTIN_DATABASE;
-        // TODO: we need to cleanup all zk nodes, including the assignments,
-        //  but currently, we don't have a good way to do it
-        if (metadataManager != null) {
-            // drop all database and tables.
-            List<String> databases = metadataManager.listDatabases();
-            for (String database : databases) {
-                if (!database.equals(defaultDb)) {
-                    metadataManager.dropDatabase(database, true, true);
-                    // delete the data dirs
-                    for (int serverId : tabletServers.keySet()) {
-                        String dataDir = getDataDir(serverId);
-                        FileUtils.deleteDirectoryQuietly(Paths.get(dataDir, 
database).toFile());
-                    }
+        List<String> dbs = metadataManager.listDatabases();
+        CoordinatorGateway coordinatorGateway = newCoordinatorClient();
+        List<CompletableFuture<?>> dropFutures = new ArrayList<>();
+        for (String db : dbs) {
+            if (BUILTIN_DATABASE.equals(db)) {
+                // if it's built-in database, we just drop all tables in it 
but not drop the
+                // database itself.
+                List<String> tables = 
metadataManager.listTables(BUILTIN_DATABASE);
+                for (String table : tables) {
+                    dropFutures.add(
+                            coordinatorGateway.dropTable(
+                                    newDropTableRequest(BUILTIN_DATABASE, 
table, true)));
                 }
+            } else {
+                dropFutures.add(
+                        
coordinatorGateway.dropDatabase(newDropDatabaseRequest(db, true, true)));
             }
-            List<String> tables = metadataManager.listTables(defaultDb);
-            for (String table : tables) {
-                metadataManager.dropTable(TablePath.of(defaultDb, table), 
true);
-            }
-        }
-
-        // TODO we need to drop these table by dropTable Event instead of 
manual clear table
-        // metadata.
-        for (TabletServer tabletServer : tabletServers.values()) {
-            tabletServer.getMetadataCache().clearTableMetadata();
         }
+        CompletableFuture.allOf(dropFutures.toArray(new 
CompletableFuture[0])).join();
     }
 
     public void start() throws Exception {
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
index 41c4ee967..856216382 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
@@ -408,6 +408,22 @@ public class RpcMessageTestUtils {
         return response.getTableId();
     }
 
+    public static void dropTable(FlussClusterExtension extension, TablePath 
tablePath)
+            throws Exception {
+        CoordinatorGateway coordinatorGateway = 
extension.newCoordinatorClient();
+        coordinatorGateway
+                .dropTable(
+                        newDropTableRequest(
+                                tablePath.getDatabaseName(), 
tablePath.getTableName(), true))
+                .get();
+    }
+
+    public static void dropDatabase(FlussClusterExtension extension, String 
databaseName)
+            throws Exception {
+        CoordinatorGateway coordinatorGateway = 
extension.newCoordinatorClient();
+        coordinatorGateway.dropDatabase(newDropDatabaseRequest(databaseName, 
true, true)).get();
+    }
+
     public static void assertProduceLogResponse(
             ProduceLogResponse produceLogResponse, int bucketId, Long 
baseOffset) {
         assertThat(produceLogResponse.getBucketsRespsCount()).isEqualTo(1);

Reply via email to