This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 1b760c1d4a7c9cc300ee7ddf4972d76421c3934f Author: Jark Wu <[email protected]> AuthorDate: Sat Feb 28 01:03:55 2026 +0800 [flink] Fix $changelog and $binlog virtual tables fail (NPE) to complete checkpoint (#2743) --- .../fluss/flink/source/BinlogFlinkTableSource.java | 3 +- .../flink/source/ChangelogFlinkTableSource.java | 3 +- .../flink/source/BinlogVirtualTableITCase.java | 170 ++++++++++++++++++--- .../flink/source/ChangelogVirtualTableITCase.java | 47 +++--- 4 files changed, 174 insertions(+), 49 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java index a5ec91bf3..65083c3be 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java @@ -20,6 +20,7 @@ package org.apache.fluss.flink.source; import org.apache.fluss.client.initializer.OffsetsInitializer; import org.apache.fluss.config.Configuration; import org.apache.fluss.flink.source.deserializer.BinlogDeserializationSchema; +import org.apache.fluss.flink.source.reader.LeaseContext; import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; import org.apache.fluss.flink.utils.FlinkConversions; import org.apache.fluss.metadata.TablePath; @@ -130,7 +131,7 @@ public class BinlogFlinkTableSource implements ScanTableSource { new BinlogDeserializationSchema(), streaming, partitionFilters, - null); + LeaseContext.DEFAULT); return SourceProvider.of(source); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java index a034807d7..0b6b4979b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java @@ -20,6 +20,7 @@ package org.apache.fluss.flink.source; import org.apache.fluss.client.initializer.OffsetsInitializer; import org.apache.fluss.config.Configuration; import org.apache.fluss.flink.source.deserializer.ChangelogDeserializationSchema; +import org.apache.fluss.flink.source.reader.LeaseContext; import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; import org.apache.fluss.flink.utils.FlinkConversions; import org.apache.fluss.metadata.TableDescriptor; @@ -167,7 +168,7 @@ public class ChangelogFlinkTableSource implements ScanTableSource { new ChangelogDeserializationSchema(), streaming, partitionFilters, - null); // Lake source not supported + LeaseContext.DEFAULT); // Lake source not supported return SourceProvider.of(source); } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java index 2fa306f88..824ac03fd 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/BinlogVirtualTableITCase.java @@ -29,19 +29,29 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.utils.clock.ManualClock; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import javax.annotation.Nullable; + +import java.io.File; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -54,13 +64,16 @@ import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.co import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows; import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Integration test for $binlog virtual table functionality. */ -abstract class BinlogVirtualTableITCase extends AbstractTestBase { +abstract class BinlogVirtualTableITCase { protected static final ManualClock CLOCK = new ManualClock(); + @TempDir public static File checkpointDir; + @TempDir public static File savepointDir; @RegisterExtension public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = @@ -74,36 +87,57 @@ abstract class BinlogVirtualTableITCase extends AbstractTestBase { static final String DEFAULT_DB = "test_binlog_db"; protected StreamExecutionEnvironment execEnv; protected StreamTableEnvironment tEnv; - protected static Connection conn; - protected static Admin admin; - - protected static Configuration clientConf; + protected Connection conn; + protected Admin admin; + protected Configuration clientConf; + protected MiniClusterWithClientResource cluster; - @BeforeAll - protected static void beforeAll() { + @BeforeEach + protected void beforeEach() throws Exception { clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig(); conn = ConnectionFactory.createConnection(clientConf); admin = conn.getAdmin(); - } - @BeforeEach - void before() { + cluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getFileBasedCheckpointsConfig(savepointDir)) + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(2) + .build()); + cluster.before(); + // Initialize Flink environment - execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - tEnv = StreamTableEnvironment.create(execEnv, EnvironmentSettings.inStreamingMode()); + tEnv = initTableEnvironment(null); + // reset clock before each test + CLOCK.advanceTime(-CLOCK.milliseconds(), TimeUnit.MILLISECONDS); + } - // Initialize catalog and database + // init table environment from savepointPath + private StreamTableEnvironment initTableEnvironment(@Nullable String savepointPath) { + org.apache.flink.configuration.Configuration conf = + new org.apache.flink.configuration.Configuration(); + if (savepointPath != null) { + conf.setString("execution.savepoint.path", savepointPath); + } + StreamExecutionEnvironment execEnv = + StreamExecutionEnvironment.getExecutionEnvironment(conf); + execEnv.setParallelism(1); + execEnv.enableCheckpointing(1000); + StreamTableEnvironment tEnv = + StreamTableEnvironment.create(execEnv, EnvironmentSettings.inStreamingMode()); String bootstrapServers = String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS)); + // crate catalog using sql tEnv.executeSql( String.format( "create catalog %s with ('type' = 'fluss', '%s' = '%s')", CATALOG_NAME, BOOTSTRAP_SERVERS.key(), bootstrapServers)); tEnv.executeSql("use catalog " + CATALOG_NAME); tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); - tEnv.executeSql("create database " + DEFAULT_DB); + tEnv.executeSql("create database if not exists " + DEFAULT_DB); tEnv.useDatabase(DEFAULT_DB); - // reset clock before each test - CLOCK.advanceTime(-CLOCK.milliseconds(), TimeUnit.MILLISECONDS); + + return tEnv; } @AfterEach @@ -359,4 +393,104 @@ abstract class BinlogVirtualTableITCase extends AbstractTestBase { assertThat(timestampResults) .isEqualTo(Arrays.asList("+I[insert, 4, v4]", "+I[insert, 5, v5]")); } + + @Test + public void testBinlogWithLatestScanStartupMode() throws Exception { + // Create source and result tables + tEnv.executeSql( + "CREATE TABLE source_table (" + + " id INT NOT NULL," + + " name STRING," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH ('bucket.num' = '1')"); + + tEnv.executeSql( + "CREATE TABLE result_table (" + + " id INT NOT NULL," + + " name STRING," + + " PRIMARY KEY (id) NOT ENFORCED" + + ")"); + + TablePath sourcePath = TablePath.of(DEFAULT_DB, "source_table"); + + // Write first batch of data. + CLOCK.advanceTime(Duration.ofMillis(100)); + writeRows(conn, sourcePath, Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3")), false); + + // Pre-populate the result table with some existing records. + String optionsLatest = "/*+ OPTIONS('scan.startup.mode' = 'latest') */"; + TableResult insertResult = + tEnv.executeSql( + "INSERT INTO result_table SELECT after.id, after.name FROM source_table$binlog " + + optionsLatest); + + // Wait for at least one checkpoint to complete before creating savepoint + waitForCheckpoint(insertResult.getJobClient().get().getJobID()); + + CloseableIterator<Row> rowIterLatest = + tEnv.executeSql("SELECT * FROM result_table").collect(); + + // now, stop the job with save point + String savepointPath = + insertResult + .getJobClient() + .get() + .stopWithSavepoint( + false, + savepointDir.getAbsolutePath(), + SavepointFormatType.CANONICAL) + .get(60, TimeUnit.SECONDS); + + // Init env with savepoint Path + tEnv = initTableEnvironment(savepointPath); + insertResult = + tEnv.executeSql( + "INSERT INTO result_table SELECT after.id, after.name FROM source_table$binlog " + + optionsLatest); + + // Write the third batch of data, ensure to get the lastest value + CLOCK.advanceTime(Duration.ofMillis(100)); + writeRows(conn, sourcePath, Arrays.asList(row(4, "v4"), row(5, "v5")), false); + + // Should contain records from the third batch only + List<String> latestResults = collectRowsWithTimeout(rowIterLatest, 2, true); + assertThat(latestResults).hasSize(2); + assertThat(latestResults).containsExactly("+I[4, v4]", "+I[5, v5]"); + + // Cleanup job + insertResult.getJobClient().get().cancel().get(); + } + + private static org.apache.flink.configuration.Configuration getFileBasedCheckpointsConfig( + File savepointDir) { + return getFileBasedCheckpointsConfig(savepointDir.toURI().toString()); + } + + private static org.apache.flink.configuration.Configuration getFileBasedCheckpointsConfig( + final String savepointDir) { + final org.apache.flink.configuration.Configuration config = + new org.apache.flink.configuration.Configuration(); + config.set(StateBackendOptions.STATE_BACKEND, "hashmap"); + config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); + config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO); + config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir); + return config; + } + + protected static void waitForCheckpoint(JobID jobId) { + String jobIdStr = jobId.toHexString(); + waitUntil( + () -> { + File jobCheckpointDir = new File(checkpointDir, jobIdStr); + if (!jobCheckpointDir.exists()) { + return false; + } + File[] checkpoints = + jobCheckpointDir.listFiles( + f -> f.isDirectory() && f.getName().startsWith("chk-")); + return checkpoints != null && checkpoints.length > 0; + }, + Duration.ofSeconds(60), + "Timeout waiting for checkpoint for job " + jobIdStr); + } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java index 4f40a7261..b5e27ed01 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java @@ -40,13 +40,10 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -61,16 +58,16 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; -import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition; import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout; import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows; import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil; import static org.assertj.core.api.Assertions.assertThat; /** Integration test for $changelog virtual table functionality. */ -abstract class ChangelogVirtualTableITCase extends AbstractTestBase { +abstract class ChangelogVirtualTableITCase { protected static final ManualClock CLOCK = new ManualClock(); @TempDir public static File checkpointDir; @@ -88,14 +85,17 @@ abstract class ChangelogVirtualTableITCase extends AbstractTestBase { static final String DEFAULT_DB = "test_changelog_db"; protected StreamExecutionEnvironment execEnv; protected StreamTableEnvironment tEnv; - protected static Connection conn; - protected static Admin admin; - static MiniClusterWithClientResource cluster; + protected Connection conn; + protected Admin admin; + protected Configuration clientConf; + protected MiniClusterWithClientResource cluster; - protected static Configuration clientConf; + @BeforeEach + protected void beforeEach() throws Exception { + clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig(); + conn = ConnectionFactory.createConnection(clientConf); + admin = conn.getAdmin(); - @BeforeAll - protected static void beforeAll() throws Exception { cluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() @@ -104,13 +104,7 @@ abstract class ChangelogVirtualTableITCase extends AbstractTestBase { .setNumberSlotsPerTaskManager(2) .build()); cluster.before(); - clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig(); - conn = ConnectionFactory.createConnection(clientConf); - admin = conn.getAdmin(); - } - @BeforeEach - void before() { // Initialize Flink environment tEnv = initTableEnvironment(null); // reset clock before each test @@ -123,13 +117,6 @@ abstract class ChangelogVirtualTableITCase extends AbstractTestBase { tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB)); } - @AfterAll - protected static void afterAll() throws Exception { - admin.close(); - cluster.after(); - conn.close(); - } - // init table environment from savepointPath private StreamTableEnvironment initTableEnvironment(@Nullable String savepointPath) { org.apache.flink.configuration.Configuration conf = @@ -151,7 +138,7 @@ abstract class ChangelogVirtualTableITCase extends AbstractTestBase { CATALOG_NAME, BOOTSTRAP_SERVERS.key(), bootstrapServers)); tEnv.executeSql("use catalog " + CATALOG_NAME); tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); - tEnv.executeSql("create database " + DEFAULT_DB); + tEnv.executeSql("create database if not exists " + DEFAULT_DB); tEnv.useDatabase(DEFAULT_DB); return tEnv; @@ -472,7 +459,7 @@ abstract class ChangelogVirtualTableITCase extends AbstractTestBase { false, savepointDir.getAbsolutePath(), SavepointFormatType.CANONICAL) - .get(); + .get(60, TimeUnit.SECONDS); // Init env with savepoint Path tEnv = initTableEnvironment(savepointPath); @@ -554,9 +541,9 @@ abstract class ChangelogVirtualTableITCase extends AbstractTestBase { return config; } - private static void waitForCheckpoint(JobID jobId) throws Exception { + protected static void waitForCheckpoint(JobID jobId) { String jobIdStr = jobId.toHexString(); - waitUntilCondition( + waitUntil( () -> { File jobCheckpointDir = new File(checkpointDir, jobIdStr); if (!jobCheckpointDir.exists()) { @@ -566,6 +553,8 @@ abstract class ChangelogVirtualTableITCase extends AbstractTestBase { jobCheckpointDir.listFiles( f -> f.isDirectory() && f.getName().startsWith("chk-")); return checkpoints != null && checkpoints.length > 0; - }); + }, + Duration.ofSeconds(60), + "Timeout waiting for checkpoint for job " + jobIdStr); } }
