This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit fbdb7fa9a47aca88ab1bff84d1c1a7195812c16c Author: Jark Wu <[email protected]> AuthorDate: Sat Feb 28 10:36:07 2026 +0800 [hotfix] Fix unstable test KvReplicaRestoreITCase.testRowCountRecoveryAfterFailover --- .../flink/source/BinlogVirtualTableITCase.java | 16 ++++++++ .../flink/source/ChangelogVirtualTableITCase.java | 17 ++++++-- .../server/replica/KvReplicaRestoreITCase.java | 7 ++++ .../server/testutils/FlussClusterExtension.java | 45 +++++++++------------- .../server/testutils/RpcMessageTestUtils.java | 16 ++++++++ 5 files changed, 71 insertions(+), 30 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java index 824ac03fd..619553aca 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java @@ -113,6 +113,22 @@ abstract class BinlogVirtualTableITCase { CLOCK.advanceTime(-CLOCK.milliseconds(), TimeUnit.MILLISECONDS); } + @AfterEach + protected void afterEach() throws Exception { + if (cluster != null) { + cluster.after(); + cluster = null; + } + if (admin != null) { + admin.close(); + admin = null; + } + if (conn != null) { + conn.close(); + conn = null; + } + } + // init table environment from savepointPath private StreamTableEnvironment initTableEnvironment(@Nullable String savepointPath) { org.apache.flink.configuration.Configuration conf = diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java index b5e27ed01..1248976f8 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java @@ -61,7 +61,6 @@ import java.util.concurrent.TimeUnit; import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout; import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows; -import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; import static org.apache.fluss.testutils.DataTestUtils.row; import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil; import static org.assertj.core.api.Assertions.assertThat; @@ -112,9 +111,19 @@ abstract class ChangelogVirtualTableITCase { } @AfterEach - void after() throws Exception { - tEnv.useDatabase(BUILTIN_DATABASE); - tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB)); + protected void afterEach() throws Exception { + if (cluster != null) { + cluster.after(); + cluster = null; + } + if (admin != null) { + admin.close(); + admin = null; + } + if (conn != null) { + conn.close(); + conn = null; + } } // init table environment from savepointPath diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java index 8ca1b8e27..9aa6142f3 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java @@ -51,6 +51,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK; import static org.apache.fluss.server.testutils.KvTestUtils.assertLookupResponse; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.createTable; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.dropDatabase; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newLookupRequest; import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newPutKvRequest; import static org.apache.fluss.testutils.DataTestUtils.genKvRecordBatch; @@ -201,6 +202,12 @@ class KvReplicaRestoreITCase { .get(), keyValue.f1); } + + // restart the failed server to make sure the cluster is healthy for other tests + dropDatabase(FLUSS_CLUSTER_EXTENSION, "test_db"); + FLUSS_CLUSTER_EXTENSION.stopTabletServer(leaderServer); + FLUSS_CLUSTER_EXTENSION.startTabletServer(leaderServer); + FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(3); } @Test diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index 90f28dd95..305a4269a 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -67,7 +67,6 @@ import org.apache.fluss.server.zk.data.PartitionAssignment; import org.apache.fluss.server.zk.data.RemoteLogManifestHandle; import org.apache.fluss.server.zk.data.TableAssignment; import org.apache.fluss.server.zk.data.TableRegistration; -import org.apache.fluss.utils.FileUtils; import org.apache.fluss.utils.clock.Clock; import org.apache.fluss.utils.clock.SystemClock; @@ -83,7 +82,6 @@ import javax.annotation.Nullable; import java.io.File; import java.nio.file.Files; -import java.nio.file.Paths; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@ -94,8 +92,11 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropDatabaseRequest; +import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropTableRequest; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeNotifyBucketLeaderAndIsr; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeStopBucketReplica; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toServerNode; @@ -179,33 +180,25 @@ public final class FlussClusterExtension @Override public void afterEach(ExtensionContext extensionContext) throws Exception { - String defaultDb = BUILTIN_DATABASE; - // TODO: we need to cleanup all zk nodes, including the assignments, - // but currently, we don't have a good way to do it - if (metadataManager != null) { - // drop all database and tables. - List<String> databases = metadataManager.listDatabases(); - for (String database : databases) { - if (!database.equals(defaultDb)) { - metadataManager.dropDatabase(database, true, true); - // delete the data dirs - for (int serverId : tabletServers.keySet()) { - String dataDir = getDataDir(serverId); - FileUtils.deleteDirectoryQuietly(Paths.get(dataDir, database).toFile()); - } + List<String> dbs = metadataManager.listDatabases(); + CoordinatorGateway coordinatorGateway = newCoordinatorClient(); + List<CompletableFuture<?>> dropFutures = new ArrayList<>(); + for (String db : dbs) { + if (BUILTIN_DATABASE.equals(db)) { + // if it's built-in database, we just drop all tables in it but not drop the + // database itself. + List<String> tables = metadataManager.listTables(BUILTIN_DATABASE); + for (String table : tables) { + dropFutures.add( + coordinatorGateway.dropTable( + newDropTableRequest(BUILTIN_DATABASE, table, true))); } + } else { + dropFutures.add( + coordinatorGateway.dropDatabase(newDropDatabaseRequest(db, true, true))); } - List<String> tables = metadataManager.listTables(defaultDb); - for (String table : tables) { - metadataManager.dropTable(TablePath.of(defaultDb, table), true); - } - } - - // TODO we need to drop these table by dropTable Event instead of manual clear table - // metadata. - for (TabletServer tabletServer : tabletServers.values()) { - tabletServer.getMetadataCache().clearTableMetadata(); } + CompletableFuture.allOf(dropFutures.toArray(new CompletableFuture[0])).join(); } public void start() throws Exception { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java index 41c4ee967..856216382 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java @@ -408,6 +408,22 @@ public class RpcMessageTestUtils { return response.getTableId(); } + public static void dropTable(FlussClusterExtension extension, TablePath tablePath) + throws Exception { + CoordinatorGateway coordinatorGateway = extension.newCoordinatorClient(); + coordinatorGateway + .dropTable( + newDropTableRequest( + tablePath.getDatabaseName(), tablePath.getTableName(), true)) + .get(); + } + + public static void dropDatabase(FlussClusterExtension extension, String databaseName) + throws Exception { + CoordinatorGateway coordinatorGateway = extension.newCoordinatorClient(); + coordinatorGateway.dropDatabase(newDropDatabaseRequest(databaseName, true, true)).get(); + } + public static void assertProduceLogResponse( ProduceLogResponse produceLogResponse, int bucketId, Long baseOffset) { assertThat(produceLogResponse.getBucketsRespsCount()).isEqualTo(1);
