This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 920809f80 [flink] Flink Source need to check tableId on recovery in 
case that table is recreated. (#2154)
920809f80 is described below

commit 920809f8008c6f612ecfed0fbcee1d3511324577
Author: Hongshun Wang <[email protected]>
AuthorDate: Mon Dec 29 21:44:44 2025 +0800

    [flink] Flink Source need to check tableId on recovery in case that table 
is recreated. (#2154)
---
 .../source/reader/FlinkSourceSplitReader.java      |  22 +--
 .../source/FlinkTableSourceFailOverITCase.java     | 220 ++++++++++++---------
 .../source/reader/FlinkSourceSplitReaderTest.java  |  23 +++
 3 files changed, 163 insertions(+), 102 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
index 9a2ab7aeb..bd5305a36 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
@@ -104,8 +104,7 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
 
     @Nullable private final LakeSource<LakeSplit> lakeSource;
 
-    // table id, will be null when haven't received any split
-    private Long tableId;
+    private final Long tableId;
 
     private final Map<TableBucket, Long> stoppingOffsets;
     private LakeSplitReaderGenerator lakeSplitReaderGenerator;
@@ -127,6 +126,7 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
                 new 
FlinkMetricRegistry(flinkSourceReaderMetrics.getSourceReaderMetricGroup());
         this.connection = ConnectionFactory.createConnection(flussConf, 
flinkMetricRegistry);
         this.table = connection.getTable(tablePath);
+        this.tableId = table.getTableInfo().getTableId();
         this.sourceOutputType = sourceOutputType;
         this.boundedSplits = new ArrayDeque<>();
         this.subscribedBuckets = new HashMap<>();
@@ -187,15 +187,15 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
         }
         for (SourceSplitBase sourceSplitBase : splitsChanges.splits()) {
             LOG.info("add split {}", sourceSplitBase.splitId());
-            // init table id
-            if (tableId == null) {
-                tableId = sourceSplitBase.getTableBucket().getTableId();
-            } else {
-                checkArgument(
-                        
tableId.equals(sourceSplitBase.getTableBucket().getTableId()),
-                        "table id not equal across splits {}",
-                        splitsChanges.splits());
-            }
+            checkArgument(
+                    
tableId.equals(sourceSplitBase.getTableBucket().getTableId()),
+                    "Table ID mismatch: expected %s, but split contains %s for 
table '%s'. "
+                            + "This usually happens when a table with the same 
name was dropped and recreated "
+                            + "between job runs, causing metadata 
inconsistency. "
+                            + "To resolve this, please restart the job 
**without** using the previous savepoint or checkpoint.",
+                    tableId,
+                    sourceSplitBase.getTableBucket().getTableId(),
+                    table.getTableInfo().getTablePath());
 
             if (sourceSplitBase.isHybridSnapshotLogSplit()) {
                 HybridSnapshotLogSplit hybridSnapshotLogSplit =
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java
index d228000ac..916b10654 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java
@@ -25,6 +25,7 @@ import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
 import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.utils.types.Tuple2;
 
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
@@ -60,6 +61,7 @@ import static 
org.apache.fluss.flink.utils.FlinkTestBase.createPartitions;
 import static org.apache.fluss.flink.utils.FlinkTestBase.dropPartitions;
 import static org.apache.fluss.flink.utils.FlinkTestBase.waitUntilPartitions;
 import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** IT case for flink table source fail over. */
 abstract class FlinkTableSourceFailOverITCase {
@@ -86,16 +88,27 @@ abstract class FlinkTableSourceFailOverITCase {
     org.apache.fluss.config.Configuration clientConf;
     ZooKeeperClient zkClient;
     Connection conn;
+    MiniClusterWithClientResource cluster;
 
     @BeforeEach
-    protected void beforeEach() {
+    protected void beforeEach() throws Exception {
         clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
         zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
         conn = ConnectionFactory.createConnection(clientConf);
+
+        cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                
.setConfiguration(getFileBasedCheckpointsConfig(savepointDir))
+                                .setNumberTaskManagers(2)
+                                .setNumberSlotsPerTaskManager(2)
+                                .build());
+        cluster.before();
     }
 
     @AfterEach
     protected void afterEach() throws Exception {
+        cluster.after();
         conn.close();
     }
 
@@ -121,100 +134,125 @@ abstract class FlinkTableSourceFailOverITCase {
 
     @Test
     void testRestore() throws Exception {
-        final int numTaskManagers = 2;
-        final int numSlotsPerTaskManager = 2;
+        TablePath tablePath = TablePath.of("fluss", "test_recreate_table");
+        Tuple2<String, CloseableIterator<Row>> savepointPathAndResults =
+                runWithSavepoint(tablePath);
+        StreamTableEnvironment tEnv = 
initTableEnvironment(savepointPathAndResults.f0);
+        CloseableIterator<Row> results = savepointPathAndResults.f1;
+        TableResult insertResult =
+                tEnv.executeSql(
+                        String.format(
+                                "insert into result_table select * from %s",
+                                tablePath.getTableName()));
+        // append a new row again to check if the source can restore the state 
correctly
+        Table table = conn.getTable(tablePath);
+        AppendWriter writer = table.newAppend().createWriter();
+        writer.append(row(5, "5000")).get();
+        List<String> expected = new ArrayList<>();
+        expected.add("+I[5, 5000]");
+        assertResultsIgnoreOrder(results, expected, true);
+        // cancel the insert job
+        insertResult.getJobClient().get().cancel().get();
+    }
 
-        // Start Flink
-        MiniClusterWithClientResource cluster =
-                new MiniClusterWithClientResource(
-                        new MiniClusterResourceConfiguration.Builder()
-                                
.setConfiguration(getFileBasedCheckpointsConfig(savepointDir))
-                                .setNumberTaskManagers(numTaskManagers)
-                                
.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
-                                .build());
+    @Test
+    void testRestoreWithRecreateTable() throws Exception {
+        TablePath tablePath = TablePath.of("fluss", "test_recreate_table");
+        Tuple2<String, CloseableIterator<Row>> savepointPathAndResults =
+                runWithSavepoint(tablePath);
+        StreamTableEnvironment tEnv = 
initTableEnvironment(savepointPathAndResults.f0);
 
-        cluster.before();
+        // drop and recreate the table.
+        tEnv.executeSql(String.format("drop table %s", 
tablePath.getTableName()));
+        tEnv.executeSql(
+                String.format(
+                        "create table %s (" + "a int, b varchar" + ") 
partitioned by (b) ",
+                        tablePath.getTableName()));
+
+        TableResult insertResult =
+                tEnv.executeSql(
+                        String.format(
+                                "insert into result_table select * from %s",
+                                tablePath.getTableName()));
+        assertThatThrownBy(() -> 
insertResult.getJobClient().get().getJobExecutionResult().get())
+                .rootCause()
+                .hasMessageContaining(
+                        "Table ID mismatch: expected 2, but split contains 0 
for table 'fluss.test_recreate_table'. "
+                                + "This usually happens when a table with the 
same name was dropped and recreated between job runs, "
+                                + "causing metadata inconsistency. To resolve 
this, please restart the job **without** "
+                                + "using the previous savepoint or 
checkpoint.");
+    }
+
+    private Tuple2<String, CloseableIterator<Row>> runWithSavepoint(TablePath 
tablePath)
+            throws Exception {
+        StreamTableEnvironment tEnv = initTableEnvironment(null);
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ("
+                                + "a int, b varchar"
+                                + ") partitioned by (b) "
+                                + "with ("
+                                + "'table.auto-partition.enabled' = 'true',"
+                                + "'table.auto-partition.time-unit' = 'year',"
+                                + "'scan.partition.discovery.interval' = 
'100ms',"
+                                + "'table.auto-partition.num-precreate' = 
'1')",
+                        tablePath.getTableName()));
+        tEnv.executeSql("create table result_table (a int, b varchar)");
+
+        // create a partition manually
+        createPartitions(zkClient, tablePath, 
Collections.singletonList("4000"));
+        waitUntilPartitions(zkClient, tablePath, 2);
 
-        try {
-            StreamTableEnvironment tEnv = initTableEnvironment(null);
-            tEnv.executeSql(
-                    "create table test_partitioned ("
-                            + "a int, b varchar"
-                            + ") partitioned by (b) "
-                            + "with ("
-                            + "'table.auto-partition.enabled' = 'true',"
-                            + "'table.auto-partition.time-unit' = 'year',"
-                            + "'scan.partition.discovery.interval' = '100ms',"
-                            + "'table.auto-partition.num-precreate' = '1')");
-            tEnv.executeSql("create table result_table (a int, b varchar)");
-
-            TablePath tablePath = TablePath.of("fluss", "test_partitioned");
-
-            // create a partition manually
-            createPartitions(zkClient, tablePath, 
Collections.singletonList("4000"));
-            waitUntilPartitions(zkClient, tablePath, 2);
-
-            // append 3 records for each partition
-            Table table = conn.getTable(tablePath);
-            AppendWriter writer = table.newAppend().createWriter();
-            String thisYear = String.valueOf(Year.now().getValue());
-            List<String> expected = new ArrayList<>();
-            for (int i = 0; i < 3; i++) {
-                writer.append(row(i, thisYear));
-                writer.append(row(i, "4000"));
-                expected.add("+I[" + i + ", " + thisYear + "]");
-                expected.add("+I[" + i + ", 4000]");
-            }
-            writer.flush();
-
-            // execute the query to fetch logs from the table
-            TableResult insertResult =
-                    tEnv.executeSql("insert into result_table select * from 
test_partitioned");
-            // we have to create a intermediate table to collect result,
-            // because CollectSink can't be restored from savepoint
-            CloseableIterator<Row> results =
-                    tEnv.executeSql("select * from result_table").collect();
-            assertResultsIgnoreOrder(results, expected, false);
-            expected.clear();
-
-            // drop the partition manually
-            dropPartitions(zkClient, tablePath, Collections.singleton("4000"));
-            waitUntilPartitions(zkClient, tablePath, 1);
-
-            // create a new partition again and append records into it
-            createPartitions(zkClient, tablePath, 
Collections.singletonList("5000"));
-            waitUntilPartitions(zkClient, tablePath, 2);
-            writer.append(row(4, "5000")).get();
-            expected.add("+I[4, 5000]");
-            // if the source subscribes the new partition successfully,
-            // it should have removed the old partition successfully
-            assertResultsIgnoreOrder(results, expected, false);
-            expected.clear();
-
-            // now, stop the job with save point
-            String savepointPath =
-                    insertResult
-                            .getJobClient()
-                            .get()
-                            .stopWithSavepoint(
-                                    false,
-                                    savepointDir.getAbsolutePath(),
-                                    SavepointFormatType.CANONICAL)
-                            .get();
-
-            tEnv = initTableEnvironment(savepointPath);
-            insertResult =
-                    tEnv.executeSql("insert into result_table select * from 
test_partitioned");
-            // append a new row again to check if the source can restore the 
state correctly
-            writer.append(row(5, "5000")).get();
-            expected.add("+I[5, 5000]");
-            assertResultsIgnoreOrder(results, expected, true);
-            // cancel the insert job
-            insertResult.getJobClient().get().cancel().get();
-        } finally {
-            // stop the cluster and thereby cancel the job
-            cluster.after();
+        // append 3 records for each partition
+        Table table = conn.getTable(tablePath);
+        AppendWriter writer = table.newAppend().createWriter();
+        String thisYear = String.valueOf(Year.now().getValue());
+        List<String> expected = new ArrayList<>();
+        for (int i = 0; i < 3; i++) {
+            writer.append(row(i, thisYear));
+            writer.append(row(i, "4000"));
+            expected.add("+I[" + i + ", " + thisYear + "]");
+            expected.add("+I[" + i + ", 4000]");
         }
+        writer.flush();
+
+        // execute the query to fetch logs from the table
+        TableResult insertResult =
+                tEnv.executeSql(
+                        String.format(
+                                "insert into result_table select * from %s",
+                                tablePath.getTableName()));
+        // we have to create an intermediate table to collect result,
+        // because CollectSink can't be restored from savepoint
+        CloseableIterator<Row> results = tEnv.executeSql("select * from 
result_table").collect();
+        assertResultsIgnoreOrder(results, expected, false);
+        expected.clear();
+
+        // drop the partition manually
+        dropPartitions(zkClient, tablePath, Collections.singleton("4000"));
+        waitUntilPartitions(zkClient, tablePath, 1);
+
+        // create a new partition again and append records into it
+        createPartitions(zkClient, tablePath, 
Collections.singletonList("5000"));
+        waitUntilPartitions(zkClient, tablePath, 2);
+        writer.append(row(4, "5000")).get();
+        expected.add("+I[4, 5000]");
+        // if the source subscribes the new partition successfully,
+        // it should have removed the old partition successfully
+        assertResultsIgnoreOrder(results, expected, false);
+        expected.clear();
+
+        // now, stop the job with save point
+        String savepointPath =
+                insertResult
+                        .getJobClient()
+                        .get()
+                        .stopWithSavepoint(
+                                false,
+                                savepointDir.getAbsolutePath(),
+                                SavepointFormatType.CANONICAL)
+                        .get();
+        return Tuple2.of(savepointPath, results);
     }
 
     private static Configuration getFileBasedCheckpointsConfig(File 
savepointDir) {
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
index 44bd1e98d..ffdd95652 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
@@ -158,6 +158,29 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase {
         }
     }
 
+    @Test
+    void testTableIdChange() throws Exception {
+        TablePath tablePath = TablePath.of(DEFAULT_DB, 
"test-only-snapshot-table");
+        long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR);
+        try (FlinkSourceSplitReader splitReader =
+                createSplitReader(tablePath, 
DEFAULT_PK_TABLE_SCHEMA.getRowType())) {
+            assertThatThrownBy(
+                            () ->
+                                    splitReader.handleSplitsChanges(
+                                            new SplitsAddition<>(
+                                                    Collections.singletonList(
+                                                            new LogSplit(
+                                                                    new 
TableBucket(tableId + 1, 0),
+                                                                    null,
+                                                                    0)))))
+                    .hasMessageContaining(
+                            "Table ID mismatch: expected 0, but split contains 
1 for table 'test-flink-db.test-only-snapshot-table'. "
+                                    + "This usually happens when a table with 
the same name was dropped and recreated between job runs, "
+                                    + "causing metadata inconsistency. To 
resolve this, please restart the job **without** using "
+                                    + "the previous savepoint or checkpoint.");
+        }
+    }
+
     private Map<String, List<RecordAndPos>> constructRecords(
             Map<TableBucket, List<InternalRow>> rows) {
         Map<String, List<RecordAndPos>> expectedRecords = new HashMap<>();

Reply via email to