This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit ea1f60b5aacc7f197c8627819a6bb0af926334b5 Author: nhuan.bc <[email protected]> AuthorDate: Sun Jan 25 23:58:59 2026 +0700 [flink] Add IT test for latest scan startup mode for changelog virtual table (#2477) --- .../flink/source/ChangelogVirtualTableITCase.java | 167 +++++++++++++++++++-- 1 file changed, 156 insertions(+), 11 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java index 342772996..4f40a7261 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java @@ -29,25 +29,39 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.utils.clock.ManualClock; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import javax.annotation.Nullable; + +import java.io.File; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; +import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition; import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout; import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows; @@ -59,6 +73,8 @@ import static org.assertj.core.api.Assertions.assertThat; abstract class ChangelogVirtualTableITCase extends AbstractTestBase { protected static final ManualClock CLOCK = new ManualClock(); + @TempDir public static File checkpointDir; + @TempDir public static File savepointDir; @RegisterExtension public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = @@ -74,11 +90,20 @@ abstract class ChangelogVirtualTableITCase extends AbstractTestBase { protected StreamTableEnvironment tEnv; protected static Connection conn; protected static Admin admin; + static MiniClusterWithClientResource cluster; protected static Configuration clientConf; @BeforeAll - protected static void beforeAll() { + protected static void beforeAll() throws Exception { + cluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getFileBasedCheckpointsConfig(savepointDir)) + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(2) + .build()); + cluster.before(); clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig(); conn = ConnectionFactory.createConnection(clientConf); admin = conn.getAdmin(); @@ -87,11 +112,39 @@ abstract class ChangelogVirtualTableITCase extends AbstractTestBase { @BeforeEach void before() { // Initialize Flink environment - execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - tEnv = StreamTableEnvironment.create(execEnv, EnvironmentSettings.inStreamingMode()); + tEnv = initTableEnvironment(null); + // reset clock before each test + CLOCK.advanceTime(-CLOCK.milliseconds(), TimeUnit.MILLISECONDS); + } + + @AfterEach + void after() throws Exception { + tEnv.useDatabase(BUILTIN_DATABASE); + tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB)); + } + + @AfterAll + protected static void afterAll() throws Exception { + admin.close(); + cluster.after(); + conn.close(); + } - // Initialize catalog and database + // init table environment from savepointPath + private StreamTableEnvironment initTableEnvironment(@Nullable String savepointPath) { + org.apache.flink.configuration.Configuration conf = + new org.apache.flink.configuration.Configuration(); + if (savepointPath != null) { + conf.setString("execution.savepoint.path", savepointPath); + } + StreamExecutionEnvironment execEnv = + StreamExecutionEnvironment.getExecutionEnvironment(conf); + execEnv.setParallelism(1); + execEnv.enableCheckpointing(1000); + StreamTableEnvironment tEnv = + StreamTableEnvironment.create(execEnv, EnvironmentSettings.inStreamingMode()); String bootstrapServers = String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS)); + // crate catalog using sql tEnv.executeSql( String.format( "create catalog %s with ('type' = 'fluss', '%s' = '%s')", @@ -100,14 +153,8 @@ abstract class ChangelogVirtualTableITCase extends AbstractTestBase { tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); tEnv.executeSql("create database " + DEFAULT_DB); tEnv.useDatabase(DEFAULT_DB); - // reset clock before each test - CLOCK.advanceTime(-CLOCK.milliseconds(), TimeUnit.MILLISECONDS); - } - @AfterEach - void after() { - tEnv.useDatabase(BUILTIN_DATABASE); - tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB)); + return tEnv; } /** Deletes rows from a primary key table using the proper delete API. */ @@ -380,6 +427,73 @@ abstract class ChangelogVirtualTableITCase extends AbstractTestBase { "+I[insert, 4, 1970-01-01T00:00:00.200Z, 5, v5]"); } + @Test + public void testChangelogWithLatestScanStartupMode() throws Exception { + // Create source and result tables + tEnv.executeSql( + "CREATE TABLE source_table (" + + " id INT NOT NULL," + + " name STRING," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH ('bucket.num' = '1')"); + + tEnv.executeSql( + "CREATE TABLE result_table (" + + " id INT NOT NULL," + + " name STRING," + + " PRIMARY KEY (id) NOT ENFORCED" + + ")"); + + TablePath sourcePath = TablePath.of(DEFAULT_DB, "source_table"); + + // Write first batch of data. + CLOCK.advanceTime(Duration.ofMillis(100)); + writeRows(conn, sourcePath, Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3")), false); + + // Pre-populate the result table with some existing records. + String optionsLatest = "/*+ OPTIONS('scan.startup.mode' = 'latest') */"; + TableResult insertResult = + tEnv.executeSql( + "INSERT INTO result_table SELECT id, name FROM source_table$changelog " + + optionsLatest); + + // Wait for at least one checkpoint to complete before creating savepoint + waitForCheckpoint(insertResult.getJobClient().get().getJobID()); + + CloseableIterator<Row> rowIterLatest = + tEnv.executeSql("SELECT * FROM result_table").collect(); + + // now, stop the job with save point + String savepointPath = + insertResult + .getJobClient() + .get() + .stopWithSavepoint( + false, + savepointDir.getAbsolutePath(), + SavepointFormatType.CANONICAL) + .get(); + + // Init env with savepoint Path + tEnv = initTableEnvironment(savepointPath); + insertResult = + tEnv.executeSql( + "INSERT INTO result_table SELECT id, name FROM source_table$changelog " + + optionsLatest); + + // Write the third batch of data, ensure to get the lastest value + CLOCK.advanceTime(Duration.ofMillis(100)); + writeRows(conn, sourcePath, Arrays.asList(row(4, "v4"), row(5, "v5")), false); + + // Should contain records from the third batch only + List<String> latestResults = collectRowsWithTimeout(rowIterLatest, 2, true); + assertThat(latestResults).hasSize(2); + assertThat(latestResults).containsExactly("+I[4, v4]", "+I[5, v5]"); + + // Cleanup job + insertResult.getJobClient().get().cancel().get(); + } + @Test public void testChangelogWithPartitionedTable() throws Exception { // Create a partitioned primary key table with 1 bucket per partition @@ -423,4 +537,35 @@ abstract class ChangelogVirtualTableITCase extends AbstractTestBase { rowIter.close(); } + + private static org.apache.flink.configuration.Configuration getFileBasedCheckpointsConfig( + File savepointDir) { + return getFileBasedCheckpointsConfig(savepointDir.toURI().toString()); + } + + private static org.apache.flink.configuration.Configuration getFileBasedCheckpointsConfig( + final String savepointDir) { + final org.apache.flink.configuration.Configuration config = + new org.apache.flink.configuration.Configuration(); + config.set(StateBackendOptions.STATE_BACKEND, "hashmap"); + config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); + config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO); + config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir); + return config; + } + + private static void waitForCheckpoint(JobID jobId) throws Exception { + String jobIdStr = jobId.toHexString(); + waitUntilCondition( + () -> { + File jobCheckpointDir = new File(checkpointDir, jobIdStr); + if (!jobCheckpointDir.exists()) { + return false; + } + File[] checkpoints = + jobCheckpointDir.listFiles( + f -> f.isDirectory() && f.getName().startsWith("chk-")); + return checkpoints != null && checkpoints.length > 0; + }); + } }
