This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-0.9
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit ef80fd04cc36ff805d3c293e6f5a499d8f77665a
Author: Jark Wu <[email protected]>
AuthorDate: Sat Feb 28 01:03:55 2026 +0800

    [flink] Fix $changelog and $binlog virtual tables fail (NPE) to complete 
checkpoint (#2743)
---
 .../fluss/flink/source/BinlogFlinkTableSource.java |   3 +-
 .../flink/source/ChangelogFlinkTableSource.java    |   3 +-
 .../flink/source/BinlogVirtualTableITCase.java     | 170 ++++++++++++++++++---
 .../flink/source/ChangelogVirtualTableITCase.java  |  47 +++---
 4 files changed, 174 insertions(+), 49 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java
index a5ec91bf3..65083c3be 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java
@@ -20,6 +20,7 @@ package org.apache.fluss.flink.source;
 import org.apache.fluss.client.initializer.OffsetsInitializer;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.source.deserializer.BinlogDeserializationSchema;
+import org.apache.fluss.flink.source.reader.LeaseContext;
 import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
 import org.apache.fluss.flink.utils.FlinkConversions;
 import org.apache.fluss.metadata.TablePath;
@@ -130,7 +131,7 @@ public class BinlogFlinkTableSource implements 
ScanTableSource {
                         new BinlogDeserializationSchema(),
                         streaming,
                         partitionFilters,
-                        null);
+                        LeaseContext.DEFAULT);
 
         return SourceProvider.of(source);
     }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
index a034807d7..0b6b4979b 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java
@@ -20,6 +20,7 @@ package org.apache.fluss.flink.source;
 import org.apache.fluss.client.initializer.OffsetsInitializer;
 import org.apache.fluss.config.Configuration;
 import 
org.apache.fluss.flink.source.deserializer.ChangelogDeserializationSchema;
+import org.apache.fluss.flink.source.reader.LeaseContext;
 import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
 import org.apache.fluss.flink.utils.FlinkConversions;
 import org.apache.fluss.metadata.TableDescriptor;
@@ -167,7 +168,7 @@ public class ChangelogFlinkTableSource implements 
ScanTableSource {
                         new ChangelogDeserializationSchema(),
                         streaming,
                         partitionFilters,
-                        null); // Lake source not supported
+                        LeaseContext.DEFAULT); // Lake source not supported
 
         return SourceProvider.of(source);
     }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java
index 2fa306f88..824ac03fd 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java
@@ -29,19 +29,29 @@ import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
 import org.apache.fluss.utils.clock.ManualClock;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 
+import javax.annotation.Nullable;
+
+import java.io.File;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -54,13 +64,16 @@ import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.co
 import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
 import static 
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
 import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Integration test for $binlog virtual table functionality. */
-abstract class BinlogVirtualTableITCase extends AbstractTestBase {
+abstract class BinlogVirtualTableITCase {
 
     protected static final ManualClock CLOCK = new ManualClock();
+    @TempDir public static File checkpointDir;
+    @TempDir public static File savepointDir;
 
     @RegisterExtension
     public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
@@ -74,36 +87,57 @@ abstract class BinlogVirtualTableITCase extends 
AbstractTestBase {
     static final String DEFAULT_DB = "test_binlog_db";
     protected StreamExecutionEnvironment execEnv;
     protected StreamTableEnvironment tEnv;
-    protected static Connection conn;
-    protected static Admin admin;
-
-    protected static Configuration clientConf;
+    protected Connection conn;
+    protected Admin admin;
+    protected Configuration clientConf;
+    protected MiniClusterWithClientResource cluster;
 
-    @BeforeAll
-    protected static void beforeAll() {
+    @BeforeEach
+    protected void beforeEach() throws Exception {
         clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
         conn = ConnectionFactory.createConnection(clientConf);
         admin = conn.getAdmin();
-    }
 
-    @BeforeEach
-    void before() {
+        cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                
.setConfiguration(getFileBasedCheckpointsConfig(savepointDir))
+                                .setNumberTaskManagers(2)
+                                .setNumberSlotsPerTaskManager(2)
+                                .build());
+        cluster.before();
+
         // Initialize Flink environment
-        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-        tEnv = StreamTableEnvironment.create(execEnv, 
EnvironmentSettings.inStreamingMode());
+        tEnv = initTableEnvironment(null);
+        // reset clock before each test
+        CLOCK.advanceTime(-CLOCK.milliseconds(), TimeUnit.MILLISECONDS);
+    }
 
-        // Initialize catalog and database
+    // init table environment from savepointPath
+    private StreamTableEnvironment initTableEnvironment(@Nullable String 
savepointPath) {
+        org.apache.flink.configuration.Configuration conf =
+                new org.apache.flink.configuration.Configuration();
+        if (savepointPath != null) {
+            conf.setString("execution.savepoint.path", savepointPath);
+        }
+        StreamExecutionEnvironment execEnv =
+                StreamExecutionEnvironment.getExecutionEnvironment(conf);
+        execEnv.setParallelism(1);
+        execEnv.enableCheckpointing(1000);
+        StreamTableEnvironment tEnv =
+                StreamTableEnvironment.create(execEnv, 
EnvironmentSettings.inStreamingMode());
         String bootstrapServers = String.join(",", 
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+        // crate catalog using sql
         tEnv.executeSql(
                 String.format(
                         "create catalog %s with ('type' = 'fluss', '%s' = 
'%s')",
                         CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
         tEnv.executeSql("use catalog " + CATALOG_NAME);
         
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
-        tEnv.executeSql("create database " + DEFAULT_DB);
+        tEnv.executeSql("create database if not exists " + DEFAULT_DB);
         tEnv.useDatabase(DEFAULT_DB);
-        // reset clock before each test
-        CLOCK.advanceTime(-CLOCK.milliseconds(), TimeUnit.MILLISECONDS);
+
+        return tEnv;
     }
 
     @AfterEach
@@ -359,4 +393,104 @@ abstract class BinlogVirtualTableITCase extends 
AbstractTestBase {
         assertThat(timestampResults)
                 .isEqualTo(Arrays.asList("+I[insert, 4, v4]", "+I[insert, 5, 
v5]"));
     }
+
+    @Test
+    public void testBinlogWithLatestScanStartupMode() throws Exception {
+        // Create source and result tables
+        tEnv.executeSql(
+                "CREATE TABLE source_table ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ") WITH ('bucket.num' = '1')");
+
+        tEnv.executeSql(
+                "CREATE TABLE result_table ("
+                        + "  id INT NOT NULL,"
+                        + "  name STRING,"
+                        + "  PRIMARY KEY (id) NOT ENFORCED"
+                        + ")");
+
+        TablePath sourcePath = TablePath.of(DEFAULT_DB, "source_table");
+
+        // Write first batch of data.
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        writeRows(conn, sourcePath, Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3")), false);
+
+        // Pre-populate the result table with some existing records.
+        String optionsLatest = "/*+ OPTIONS('scan.startup.mode' = 'latest') 
*/";
+        TableResult insertResult =
+                tEnv.executeSql(
+                        "INSERT INTO result_table SELECT after.id, after.name 
FROM source_table$binlog "
+                                + optionsLatest);
+
+        // Wait for at least one checkpoint to complete before creating 
savepoint
+        waitForCheckpoint(insertResult.getJobClient().get().getJobID());
+
+        CloseableIterator<Row> rowIterLatest =
+                tEnv.executeSql("SELECT * FROM result_table").collect();
+
+        // now, stop the job with save point
+        String savepointPath =
+                insertResult
+                        .getJobClient()
+                        .get()
+                        .stopWithSavepoint(
+                                false,
+                                savepointDir.getAbsolutePath(),
+                                SavepointFormatType.CANONICAL)
+                        .get(60, TimeUnit.SECONDS);
+
+        // Init env with savepoint Path
+        tEnv = initTableEnvironment(savepointPath);
+        insertResult =
+                tEnv.executeSql(
+                        "INSERT INTO result_table SELECT after.id, after.name 
FROM source_table$binlog "
+                                + optionsLatest);
+
+        // Write the third batch of data, ensure to get the lastest value
+        CLOCK.advanceTime(Duration.ofMillis(100));
+        writeRows(conn, sourcePath, Arrays.asList(row(4, "v4"), row(5, "v5")), 
false);
+
+        // Should contain records from the third batch only
+        List<String> latestResults = collectRowsWithTimeout(rowIterLatest, 2, 
true);
+        assertThat(latestResults).hasSize(2);
+        assertThat(latestResults).containsExactly("+I[4, v4]", "+I[5, v5]");
+
+        // Cleanup job
+        insertResult.getJobClient().get().cancel().get();
+    }
+
+    private static org.apache.flink.configuration.Configuration 
getFileBasedCheckpointsConfig(
+            File savepointDir) {
+        return getFileBasedCheckpointsConfig(savepointDir.toURI().toString());
+    }
+
+    private static org.apache.flink.configuration.Configuration 
getFileBasedCheckpointsConfig(
+            final String savepointDir) {
+        final org.apache.flink.configuration.Configuration config =
+                new org.apache.flink.configuration.Configuration();
+        config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
+        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toURI().toString());
+        config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 
MemorySize.ZERO);
+        config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
+        return config;
+    }
+
+    protected static void waitForCheckpoint(JobID jobId) {
+        String jobIdStr = jobId.toHexString();
+        waitUntil(
+                () -> {
+                    File jobCheckpointDir = new File(checkpointDir, jobIdStr);
+                    if (!jobCheckpointDir.exists()) {
+                        return false;
+                    }
+                    File[] checkpoints =
+                            jobCheckpointDir.listFiles(
+                                    f -> f.isDirectory() && 
f.getName().startsWith("chk-"));
+                    return checkpoints != null && checkpoints.length > 0;
+                },
+                Duration.ofSeconds(60),
+                "Timeout waiting for checkpoint for job " + jobIdStr);
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
index 4f40a7261..b5e27ed01 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java
@@ -40,13 +40,10 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -61,16 +58,16 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
 import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
 import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
 import static 
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
 import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Integration test for $changelog virtual table functionality. */
-abstract class ChangelogVirtualTableITCase extends AbstractTestBase {
+abstract class ChangelogVirtualTableITCase {
 
     protected static final ManualClock CLOCK = new ManualClock();
     @TempDir public static File checkpointDir;
@@ -88,14 +85,17 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
     static final String DEFAULT_DB = "test_changelog_db";
     protected StreamExecutionEnvironment execEnv;
     protected StreamTableEnvironment tEnv;
-    protected static Connection conn;
-    protected static Admin admin;
-    static MiniClusterWithClientResource cluster;
+    protected Connection conn;
+    protected Admin admin;
+    protected Configuration clientConf;
+    protected MiniClusterWithClientResource cluster;
 
-    protected static Configuration clientConf;
+    @BeforeEach
+    protected void beforeEach() throws Exception {
+        clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+        conn = ConnectionFactory.createConnection(clientConf);
+        admin = conn.getAdmin();
 
-    @BeforeAll
-    protected static void beforeAll() throws Exception {
         cluster =
                 new MiniClusterWithClientResource(
                         new MiniClusterResourceConfiguration.Builder()
@@ -104,13 +104,7 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
                                 .setNumberSlotsPerTaskManager(2)
                                 .build());
         cluster.before();
-        clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
-        conn = ConnectionFactory.createConnection(clientConf);
-        admin = conn.getAdmin();
-    }
 
-    @BeforeEach
-    void before() {
         // Initialize Flink environment
         tEnv = initTableEnvironment(null);
         // reset clock before each test
@@ -123,13 +117,6 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
         tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB));
     }
 
-    @AfterAll
-    protected static void afterAll() throws Exception {
-        admin.close();
-        cluster.after();
-        conn.close();
-    }
-
     // init table environment from savepointPath
     private StreamTableEnvironment initTableEnvironment(@Nullable String 
savepointPath) {
         org.apache.flink.configuration.Configuration conf =
@@ -151,7 +138,7 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
                         CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
         tEnv.executeSql("use catalog " + CATALOG_NAME);
         
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
-        tEnv.executeSql("create database " + DEFAULT_DB);
+        tEnv.executeSql("create database if not exists " + DEFAULT_DB);
         tEnv.useDatabase(DEFAULT_DB);
 
         return tEnv;
@@ -472,7 +459,7 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
                                 false,
                                 savepointDir.getAbsolutePath(),
                                 SavepointFormatType.CANONICAL)
-                        .get();
+                        .get(60, TimeUnit.SECONDS);
 
         // Init env with savepoint Path
         tEnv = initTableEnvironment(savepointPath);
@@ -554,9 +541,9 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
         return config;
     }
 
-    private static void waitForCheckpoint(JobID jobId) throws Exception {
+    protected static void waitForCheckpoint(JobID jobId) {
         String jobIdStr = jobId.toHexString();
-        waitUntilCondition(
+        waitUntil(
                 () -> {
                     File jobCheckpointDir = new File(checkpointDir, jobIdStr);
                     if (!jobCheckpointDir.exists()) {
@@ -566,6 +553,8 @@ abstract class ChangelogVirtualTableITCase extends 
AbstractTestBase {
                             jobCheckpointDir.listFiles(
                                     f -> f.isDirectory() && 
f.getName().startsWith("chk-"));
                     return checkpoints != null && checkpoints.length > 0;
-                });
+                },
+                Duration.ofSeconds(60),
+                "Timeout waiting for checkpoint for job " + jobIdStr);
     }
 }

Reply via email to