This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 920809f80 [flink] Flink Source need to check tableId on recovery in
case that table is recreated. (#2154)
920809f80 is described below
commit 920809f8008c6f612ecfed0fbcee1d3511324577
Author: Hongshun Wang <[email protected]>
AuthorDate: Mon Dec 29 21:44:44 2025 +0800
[flink] Flink Source need to check tableId on recovery in case that table
is recreated. (#2154)
---
.../source/reader/FlinkSourceSplitReader.java | 22 +--
.../source/FlinkTableSourceFailOverITCase.java | 220 ++++++++++++---------
.../source/reader/FlinkSourceSplitReaderTest.java | 23 +++
3 files changed, 163 insertions(+), 102 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
index 9a2ab7aeb..bd5305a36 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
@@ -104,8 +104,7 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
@Nullable private final LakeSource<LakeSplit> lakeSource;
- // table id, will be null when haven't received any split
- private Long tableId;
+ private final Long tableId;
private final Map<TableBucket, Long> stoppingOffsets;
private LakeSplitReaderGenerator lakeSplitReaderGenerator;
@@ -127,6 +126,7 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
new
FlinkMetricRegistry(flinkSourceReaderMetrics.getSourceReaderMetricGroup());
this.connection = ConnectionFactory.createConnection(flussConf,
flinkMetricRegistry);
this.table = connection.getTable(tablePath);
+ this.tableId = table.getTableInfo().getTableId();
this.sourceOutputType = sourceOutputType;
this.boundedSplits = new ArrayDeque<>();
this.subscribedBuckets = new HashMap<>();
@@ -187,15 +187,15 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
}
for (SourceSplitBase sourceSplitBase : splitsChanges.splits()) {
LOG.info("add split {}", sourceSplitBase.splitId());
- // init table id
- if (tableId == null) {
- tableId = sourceSplitBase.getTableBucket().getTableId();
- } else {
- checkArgument(
-
tableId.equals(sourceSplitBase.getTableBucket().getTableId()),
- "table id not equal across splits {}",
- splitsChanges.splits());
- }
+ checkArgument(
+
tableId.equals(sourceSplitBase.getTableBucket().getTableId()),
+ "Table ID mismatch: expected %s, but split contains %s for
table '%s'. "
+ + "This usually happens when a table with the same
name was dropped and recreated "
+ + "between job runs, causing metadata
inconsistency. "
+ + "To resolve this, please restart the job
**without** using the previous savepoint or checkpoint.",
+ tableId,
+ sourceSplitBase.getTableBucket().getTableId(),
+ table.getTableInfo().getTablePath());
if (sourceSplitBase.isHybridSnapshotLogSplit()) {
HybridSnapshotLogSplit hybridSnapshotLogSplit =
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java
index d228000ac..916b10654 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java
@@ -25,6 +25,7 @@ import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.testutils.FlussClusterExtension;
import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.utils.types.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
@@ -60,6 +61,7 @@ import static
org.apache.fluss.flink.utils.FlinkTestBase.createPartitions;
import static org.apache.fluss.flink.utils.FlinkTestBase.dropPartitions;
import static org.apache.fluss.flink.utils.FlinkTestBase.waitUntilPartitions;
import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** IT case for flink table source fail over. */
abstract class FlinkTableSourceFailOverITCase {
@@ -86,16 +88,27 @@ abstract class FlinkTableSourceFailOverITCase {
org.apache.fluss.config.Configuration clientConf;
ZooKeeperClient zkClient;
Connection conn;
+ MiniClusterWithClientResource cluster;
@BeforeEach
- protected void beforeEach() {
+ protected void beforeEach() throws Exception {
clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
conn = ConnectionFactory.createConnection(clientConf);
+
+ cluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+
.setConfiguration(getFileBasedCheckpointsConfig(savepointDir))
+ .setNumberTaskManagers(2)
+ .setNumberSlotsPerTaskManager(2)
+ .build());
+ cluster.before();
}
@AfterEach
protected void afterEach() throws Exception {
+ cluster.after();
conn.close();
}
@@ -121,100 +134,125 @@ abstract class FlinkTableSourceFailOverITCase {
@Test
void testRestore() throws Exception {
- final int numTaskManagers = 2;
- final int numSlotsPerTaskManager = 2;
+ TablePath tablePath = TablePath.of("fluss", "test_recreate_table");
+ Tuple2<String, CloseableIterator<Row>> savepointPathAndResults =
+ runWithSavepoint(tablePath);
+ StreamTableEnvironment tEnv =
initTableEnvironment(savepointPathAndResults.f0);
+ CloseableIterator<Row> results = savepointPathAndResults.f1;
+ TableResult insertResult =
+ tEnv.executeSql(
+ String.format(
+ "insert into result_table select * from %s",
+ tablePath.getTableName()));
+ // append a new row again to check if the source can restore the state
correctly
+ Table table = conn.getTable(tablePath);
+ AppendWriter writer = table.newAppend().createWriter();
+ writer.append(row(5, "5000")).get();
+ List<String> expected = new ArrayList<>();
+ expected.add("+I[5, 5000]");
+ assertResultsIgnoreOrder(results, expected, true);
+ // cancel the insert job
+ insertResult.getJobClient().get().cancel().get();
+ }
- // Start Flink
- MiniClusterWithClientResource cluster =
- new MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
-
.setConfiguration(getFileBasedCheckpointsConfig(savepointDir))
- .setNumberTaskManagers(numTaskManagers)
-
.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
- .build());
+ @Test
+ void testRestoreWithRecreateTable() throws Exception {
+ TablePath tablePath = TablePath.of("fluss", "test_recreate_table");
+ Tuple2<String, CloseableIterator<Row>> savepointPathAndResults =
+ runWithSavepoint(tablePath);
+ StreamTableEnvironment tEnv =
initTableEnvironment(savepointPathAndResults.f0);
- cluster.before();
+ // drop and recreate the table.
+ tEnv.executeSql(String.format("drop table %s",
tablePath.getTableName()));
+ tEnv.executeSql(
+ String.format(
+ "create table %s (" + "a int, b varchar" + ")
partitioned by (b) ",
+ tablePath.getTableName()));
+
+ TableResult insertResult =
+ tEnv.executeSql(
+ String.format(
+ "insert into result_table select * from %s",
+ tablePath.getTableName()));
+ assertThatThrownBy(() ->
insertResult.getJobClient().get().getJobExecutionResult().get())
+ .rootCause()
+ .hasMessageContaining(
+ "Table ID mismatch: expected 2, but split contains 0
for table 'fluss.test_recreate_table'. "
+ + "This usually happens when a table with the
same name was dropped and recreated between job runs, "
+ + "causing metadata inconsistency. To resolve
this, please restart the job **without** "
+ + "using the previous savepoint or
checkpoint.");
+ }
+
+ private Tuple2<String, CloseableIterator<Row>> runWithSavepoint(TablePath
tablePath)
+ throws Exception {
+ StreamTableEnvironment tEnv = initTableEnvironment(null);
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + "a int, b varchar"
+ + ") partitioned by (b) "
+ + "with ("
+ + "'table.auto-partition.enabled' = 'true',"
+ + "'table.auto-partition.time-unit' = 'year',"
+ + "'scan.partition.discovery.interval' =
'100ms',"
+ + "'table.auto-partition.num-precreate' =
'1')",
+ tablePath.getTableName()));
+ tEnv.executeSql("create table result_table (a int, b varchar)");
+
+ // create a partition manually
+ createPartitions(zkClient, tablePath,
Collections.singletonList("4000"));
+ waitUntilPartitions(zkClient, tablePath, 2);
- try {
- StreamTableEnvironment tEnv = initTableEnvironment(null);
- tEnv.executeSql(
- "create table test_partitioned ("
- + "a int, b varchar"
- + ") partitioned by (b) "
- + "with ("
- + "'table.auto-partition.enabled' = 'true',"
- + "'table.auto-partition.time-unit' = 'year',"
- + "'scan.partition.discovery.interval' = '100ms',"
- + "'table.auto-partition.num-precreate' = '1')");
- tEnv.executeSql("create table result_table (a int, b varchar)");
-
- TablePath tablePath = TablePath.of("fluss", "test_partitioned");
-
- // create a partition manually
- createPartitions(zkClient, tablePath,
Collections.singletonList("4000"));
- waitUntilPartitions(zkClient, tablePath, 2);
-
- // append 3 records for each partition
- Table table = conn.getTable(tablePath);
- AppendWriter writer = table.newAppend().createWriter();
- String thisYear = String.valueOf(Year.now().getValue());
- List<String> expected = new ArrayList<>();
- for (int i = 0; i < 3; i++) {
- writer.append(row(i, thisYear));
- writer.append(row(i, "4000"));
- expected.add("+I[" + i + ", " + thisYear + "]");
- expected.add("+I[" + i + ", 4000]");
- }
- writer.flush();
-
- // execute the query to fetch logs from the table
- TableResult insertResult =
- tEnv.executeSql("insert into result_table select * from
test_partitioned");
- // we have to create a intermediate table to collect result,
- // because CollectSink can't be restored from savepoint
- CloseableIterator<Row> results =
- tEnv.executeSql("select * from result_table").collect();
- assertResultsIgnoreOrder(results, expected, false);
- expected.clear();
-
- // drop the partition manually
- dropPartitions(zkClient, tablePath, Collections.singleton("4000"));
- waitUntilPartitions(zkClient, tablePath, 1);
-
- // create a new partition again and append records into it
- createPartitions(zkClient, tablePath,
Collections.singletonList("5000"));
- waitUntilPartitions(zkClient, tablePath, 2);
- writer.append(row(4, "5000")).get();
- expected.add("+I[4, 5000]");
- // if the source subscribes the new partition successfully,
- // it should have removed the old partition successfully
- assertResultsIgnoreOrder(results, expected, false);
- expected.clear();
-
- // now, stop the job with save point
- String savepointPath =
- insertResult
- .getJobClient()
- .get()
- .stopWithSavepoint(
- false,
- savepointDir.getAbsolutePath(),
- SavepointFormatType.CANONICAL)
- .get();
-
- tEnv = initTableEnvironment(savepointPath);
- insertResult =
- tEnv.executeSql("insert into result_table select * from
test_partitioned");
- // append a new row again to check if the source can restore the
state correctly
- writer.append(row(5, "5000")).get();
- expected.add("+I[5, 5000]");
- assertResultsIgnoreOrder(results, expected, true);
- // cancel the insert job
- insertResult.getJobClient().get().cancel().get();
- } finally {
- // stop the cluster and thereby cancel the job
- cluster.after();
+ // append 3 records for each partition
+ Table table = conn.getTable(tablePath);
+ AppendWriter writer = table.newAppend().createWriter();
+ String thisYear = String.valueOf(Year.now().getValue());
+ List<String> expected = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ writer.append(row(i, thisYear));
+ writer.append(row(i, "4000"));
+ expected.add("+I[" + i + ", " + thisYear + "]");
+ expected.add("+I[" + i + ", 4000]");
}
+ writer.flush();
+
+ // execute the query to fetch logs from the table
+ TableResult insertResult =
+ tEnv.executeSql(
+ String.format(
+ "insert into result_table select * from %s",
+ tablePath.getTableName()));
+ // we have to create an intermediate table to collect result,
+ // because CollectSink can't be restored from savepoint
+ CloseableIterator<Row> results = tEnv.executeSql("select * from
result_table").collect();
+ assertResultsIgnoreOrder(results, expected, false);
+ expected.clear();
+
+ // drop the partition manually
+ dropPartitions(zkClient, tablePath, Collections.singleton("4000"));
+ waitUntilPartitions(zkClient, tablePath, 1);
+
+ // create a new partition again and append records into it
+ createPartitions(zkClient, tablePath,
Collections.singletonList("5000"));
+ waitUntilPartitions(zkClient, tablePath, 2);
+ writer.append(row(4, "5000")).get();
+ expected.add("+I[4, 5000]");
+ // if the source subscribes the new partition successfully,
+ // it should have removed the old partition successfully
+ assertResultsIgnoreOrder(results, expected, false);
+ expected.clear();
+
+ // now, stop the job with save point
+ String savepointPath =
+ insertResult
+ .getJobClient()
+ .get()
+ .stopWithSavepoint(
+ false,
+ savepointDir.getAbsolutePath(),
+ SavepointFormatType.CANONICAL)
+ .get();
+ return Tuple2.of(savepointPath, results);
}
private static Configuration getFileBasedCheckpointsConfig(File
savepointDir) {
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
index 44bd1e98d..ffdd95652 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
@@ -158,6 +158,29 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase {
}
}
+ @Test
+ void testTableIdChange() throws Exception {
+ TablePath tablePath = TablePath.of(DEFAULT_DB,
"test-only-snapshot-table");
+ long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR);
+ try (FlinkSourceSplitReader splitReader =
+ createSplitReader(tablePath,
DEFAULT_PK_TABLE_SCHEMA.getRowType())) {
+ assertThatThrownBy(
+ () ->
+ splitReader.handleSplitsChanges(
+ new SplitsAddition<>(
+ Collections.singletonList(
+ new LogSplit(
+ new
TableBucket(tableId + 1, 0),
+ null,
+ 0)))))
+ .hasMessageContaining(
+ "Table ID mismatch: expected 0, but split contains
1 for table 'test-flink-db.test-only-snapshot-table'. "
+ + "This usually happens when a table with
the same name was dropped and recreated between job runs, "
+ + "causing metadata inconsistency. To
resolve this, please restart the job **without** using "
+ + "the previous savepoint or checkpoint.");
+ }
+ }
+
private Map<String, List<RecordAndPos>> constructRecords(
Map<TableBucket, List<InternalRow>> rows) {
Map<String, List<RecordAndPos>> expectedRecords = new HashMap<>();