This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-0.9
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 1cb95bc945ab7a5d0612c0d7ef3bfe5d50be0b2b
Author: nhuan.bc <[email protected]>
AuthorDate: Sun Jan 25 23:58:59 2026 +0700

    [flink] Add IT test for latest scan startup mode for changelog virtual 
table (#2477)
---
 .../flink/source/ChangelogVirtualTableITCase.java  | 167 +++++++++++++++++++--
 1 file changed, 156 insertions(+), 11 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
index 342772996..4f40a7261 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
@@ -29,25 +29,39 @@ import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
 import org.apache.fluss.utils.clock.ManualClock;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 
+import javax.annotation.Nullable;
+
+import java.io.File;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
 import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
 import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
@@ -59,6 +73,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 abstract class ChangelogVirtualTableITCase extends AbstractTestBase {
 
     protected static final ManualClock CLOCK = new ManualClock();
+    @TempDir public static File checkpointDir;
+    @TempDir public static File savepointDir;
 
     @RegisterExtension
     public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
@@ -74,11 +90,20 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
     protected StreamTableEnvironment tEnv;
     protected static Connection conn;
     protected static Admin admin;
+    static MiniClusterWithClientResource cluster;
 
     protected static Configuration clientConf;
 
     @BeforeAll
-    protected static void beforeAll() {
+    protected static void beforeAll() throws Exception {
+        cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                
.setConfiguration(getFileBasedCheckpointsConfig(savepointDir))
+                                .setNumberTaskManagers(2)
+                                .setNumberSlotsPerTaskManager(2)
+                                .build());
+        cluster.before();
         clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
         conn = ConnectionFactory.createConnection(clientConf);
         admin = conn.getAdmin();
@@ -87,11 +112,39 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
     @BeforeEach
     void before() {
         // Initialize Flink environment
-        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-        tEnv = StreamTableEnvironment.create(execEnv, 
EnvironmentSettings.inStreamingMode());
+        tEnv = initTableEnvironment(null);
+        // reset clock before each test
+        CLOCK.advanceTime(-CLOCK.milliseconds(), TimeUnit.MILLISECONDS);
+    }
+
+    @AfterEach
+    void after() throws Exception {
+        tEnv.useDatabase(BUILTIN_DATABASE);
+        tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB));
+    }
+
+    @AfterAll
+    protected static void afterAll() throws Exception {
+        admin.close();
+        cluster.after();
+        conn.close();
+    }
 
-        // Initialize catalog and database
+    // init table environment from savepointPath
+    private StreamTableEnvironment initTableEnvironment(@Nullable String 
savepointPath) {
+        org.apache.flink.configuration.Configuration conf =
+                new org.apache.flink.configuration.Configuration();
+        if (savepointPath != null) {
+            conf.setString("execution.savepoint.path", savepointPath);
+        }
+        StreamExecutionEnvironment execEnv =
+                StreamExecutionEnvironment.getExecutionEnvironment(conf);
+        execEnv.setParallelism(1);
+        execEnv.enableCheckpointing(1000);
+        StreamTableEnvironment tEnv =
+                StreamTableEnvironment.create(execEnv, 
EnvironmentSettings.inStreamingMode());
         String bootstrapServers = String.join(",", 
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+        // crate catalog using sql
         tEnv.executeSql(
                 String.format(
                         "create catalog %s with ('type' = 'fluss', '%s' = 
'%s')",
@@ -100,14 +153,8 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
         
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
         tEnv.executeSql("create database " + DEFAULT_DB);
         tEnv.useDatabase(DEFAULT_DB);
-        // reset clock before each test
-        CLOCK.advanceTime(-CLOCK.milliseconds(), TimeUnit.MILLISECONDS);
-    }
 
-    @AfterEach
-    void after() {
-        tEnv.useDatabase(BUILTIN_DATABASE);
-        tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB));
+        return tEnv;
     }
 
     /** Deletes rows from a primary key table using the proper delete API. */
@@ -380,6 +427,73 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
                         "+I[insert, 4, 1970-01-01T00:00:00.200Z, 5, v5]");
     }
 
+    @Test
+    public void testChangelogWithLatestScanStartupMode() throws Exception {
+        // Create source and result tables
+        tEnv.executeSql(
+                "CREATE TABLE source_table ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+
+        tEnv.executeSql(
+                "CREATE TABLE result_table ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ")");
+
+        TablePath sourcePath = TablePath.of(DEFAULT_DB, "source_table");
+
+        // Write first batch of data.
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        writeRows(conn, sourcePath, Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3")), false);
+
+        // Pre-populate the result table with some existing records.
+        String optionsLatest = "/*+ OPTIONS('scan.startup.mode' = 'latest') 
*/";
+        TableResult insertResult =
+                tEnv.executeSql(
+                        "INSERT INTO result_table SELECT id, name FROM 
source_table$changelog "
+                                + optionsLatest);
+
+        // Wait for at least one checkpoint to complete before creating 
savepoint
+        waitForCheckpoint(insertResult.getJobClient().get().getJobID());
+
+        CloseableIterator<Row> rowIterLatest =
+                tEnv.executeSql("SELECT * FROM result_table").collect();
+
+        // now, stop the job with save point
+        String savepointPath =
+                insertResult
+                        .getJobClient()
+                        .get()
+                        .stopWithSavepoint(
+                                false,
+                                savepointDir.getAbsolutePath(),
+                                SavepointFormatType.CANONICAL)
+                        .get();
+
+        // Init env with savepoint Path
+        tEnv = initTableEnvironment(savepointPath);
+        insertResult =
+                tEnv.executeSql(
+                        "INSERT INTO result_table SELECT id, name FROM 
source_table$changelog "
+                                + optionsLatest);
+
+        // Write the third batch of data, ensure to get the lastest value
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        writeRows(conn, sourcePath, Arrays.asList(row(4, "v4"), row(5, "v5")), 
false);
+
+        // Should contain records from the third batch only
+        List<String> latestResults = collectRowsWithTimeout(rowIterLatest, 2, 
true);
+        assertThat(latestResults).hasSize(2);
+        assertThat(latestResults).containsExactly("+I[4, v4]", "+I[5, v5]");
+
+        // Cleanup job
+        insertResult.getJobClient().get().cancel().get();
+    }
+
     @Test
     public void testChangelogWithPartitionedTable() throws Exception {
         // Create a partitioned primary key table with 1 bucket per partition
@@ -423,4 +537,35 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
 
         rowIter.close();
     }
+
+    private static org.apache.flink.configuration.Configuration 
getFileBasedCheckpointsConfig(
+            File savepointDir) {
+        return getFileBasedCheckpointsConfig(savepointDir.toURI().toString());
+    }
+
+    private static org.apache.flink.configuration.Configuration 
getFileBasedCheckpointsConfig(
+            final String savepointDir) {
+        final org.apache.flink.configuration.Configuration config =
+                new org.apache.flink.configuration.Configuration();
+        config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
+        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toURI().toString());
+        config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 
MemorySize.ZERO);
+        config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
+        return config;
+    }
+
+    private static void waitForCheckpoint(JobID jobId) throws Exception {
+        String jobIdStr = jobId.toHexString();
+        waitUntilCondition(
+                () -> {
+                    File jobCheckpointDir = new File(checkpointDir, jobIdStr);
+                    if (!jobCheckpointDir.exists()) {
+                        return false;
+                    }
+                    File[] checkpoints =
+                            jobCheckpointDir.listFiles(
+                                    f -> f.isDirectory() && 
f.getName().startsWith("chk-"));
+                    return checkpoints != null && checkpoints.length > 0;
+                });
+    }
 }

Reply via email to