Ran Tao created FLINK-39965:
-------------------------------
Summary: Flink CDC should clear schema state and table-level
caches after DropTableEvent
Key: FLINK-39965
URL: https://issues.apache.org/jira/browse/FLINK-39965
Project: Flink
Issue Type: Bug
Components: Flink CDC
Affects Versions: cdc-3.5.0, cdc-3.6.0, cdc-3.7.0
Reporter: Ran Tao
When a Flink CDC pipeline job synchronizes MySQL tables to a sink that supports
schema evolution, a table lifecycle like:
1. initial snapshot + binlog sync succeeds
2. DROP TABLE on MySQL
3. CREATE TABLE again with the same table identifier
4. INSERT into the recreated table
the job will fail.
The observed failure was reproduced with MySQL pipeline source and StarRocks
pipeline sink. StarRocks reports that the target table is unknown during
stream-load flush, because Flink CDC has already applied the DROP TABLE to the
sink, but the following CreateTableEvent is considered redundant and is skipped.
Although the failure is observed with StarRocks, the root cause appears to be
in the Flink CDC runtime schema state / table-level caches, so this may affect
other sinks that support DROP_TABLE as well (such as paimon below).
Minimal pipeline YAML:
{code:yaml}
source:
type: mysql
name: MySQL Source
hostname: mysql.example.com
port: 3306
username: flink
password: ****
tables: flink.orders_.*
server-id: 5100-5104
scan.startup.mode: initial
scan.newly-added-table.enabled: false
scan.binlog.newly-added-table.enabled: true
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://starrocks-fe.example.com:9030
load-url: starrocks-fe.example.com:8030
username: flink
password: ****
pipeline:
name: mysql-to-starrocks-drop-recreate
schema.change.behavior: EVOLVE
parallelism: 2
{code}
Minimal SQL to reproduce:
{code:sql}
CREATE DATABASE IF NOT EXISTS flink;
CREATE TABLE flink.orders_0 (
id BIGINT NOT NULL,
order_no VARCHAR(64) NOT NULL,
user_name VARCHAR(64),
status VARCHAR(32) NOT NULL,
total_amount DECIMAL(18, 2) NOT NULL,
create_at TIMESTAMP NOT NULL,
PRIMARY KEY (id)
);
INSERT INTO flink.orders_0 VALUES
(1, 'order-1', 'Alice', 'NEW', 10.00, CURRENT_TIMESTAMP);
-- Wait until the initial snapshot and binlog events are synchronized.
DROP TABLE flink.orders_0;
CREATE TABLE flink.orders_0 (
id BIGINT NOT NULL,
order_no VARCHAR(64) NOT NULL,
user_name VARCHAR(64),
status VARCHAR(32) NOT NULL,
total_amount DECIMAL(18, 2) NOT NULL,
create_at TIMESTAMP NOT NULL,
PRIMARY KEY (id)
);
INSERT INTO flink.orders_0 VALUES
(2, 'order-2', 'Bob', 'NEW', 20.00, CURRENT_TIMESTAMP);
{code}
*Observed behavior*
The DROP TABLE event is applied to the sink successfully:
{code}
Successfully applied schema change event
DropTableEvent\{tableId=flink.orders_0} to external system.
{code}
Then the recreated table's CreateTableEvent is skipped as redundant:
{code}
Schema change event CreateTableEvent\{tableId=flink.orders_0, schema=...} is
redundant for current schema ..., just skip it.
{code}
Later, on checkpoint flush, the StarRocks sink fails because the sink table was
dropped and was not created again:
{code}
Caused by: com.starrocks.data.load.stream.exception.StreamLoadFailException:
Transaction start failed, db: flink, table: orders_0, responseBody: {
"Status": "ANALYSIS_ERROR",
"Message": "unknown table \"flink.orders_0\""
}
at
com.starrocks.data.load.stream.TransactionStreamLoader.doBegin(TransactionStreamLoader.java:161)
at
com.starrocks.data.load.stream.TransactionStreamLoader.begin(TransactionStreamLoader.java:103)
at
com.starrocks.data.load.stream.DefaultStreamLoader.send(DefaultStreamLoader.java:175)
at
com.starrocks.data.load.stream.v2.TransactionTableRegion.streamLoad(TransactionTableRegion.java:378)
at
com.starrocks.data.load.stream.v2.TransactionTableRegion.flush(TransactionTableRegion.java:238)
at
com.starrocks.data.load.stream.v2.StreamLoadManagerV2.flush(StreamLoadManagerV2.java:356)
at
com.starrocks.connector.flink.table.sink.v2.StarRocksWriter.flush(StarRocksWriter.java:153)
{code}
*Expected behavior*
After a DROP TABLE event ends the table lifecycle, Flink CDC should clear the
corresponding original/evolved schema state and table-level caches. A later
CREATE TABLE event with the same table identifier should not be treated as a
duplicate CreateTableEvent from the previous lifecycle.
The recreated table should be initialized again in the sink, and subsequent
data changes should be written successfully.
*Suspected root cause*
The runtime keeps schema state and table-level caches for the dropped table id.
Therefore, when a CreateTableEvent for the same table id arrives later,
SchemaUtils.isSchemaChangeEventRedundant(...) may treat it as already applied
because a previous schema is still present.
Related runtime states/caches include:
* SchemaManager original/evolved schema state
* regular/distributed schema coordinator upstream schema views
* sink wrapper processed table id cache
* transform / partitioning table-level caches
Relation to FLINK-39328:
This issue is related to FLINK-39328, where a MySQL to Paimon pipeline fails
immediately after DROP TABLE because RegularPrePartitionOperator tries to
recreate the table hash function after the downstream Paimon table has already
been dropped.
The root cause is in the same area: DropTableEvent is not fully treated as the
end of a table lifecycle in the Flink CDC runtime, so table-level runtime
state/caches may still be reused after the downstream table has been physically
removed.
This issue covers a broader scenario than FLINK-39328:
* FLINK-39328 fails during DROP TABLE handling because the pre-partition hash
function is recreated for a dropped table.
* This issue fails after DROP TABLE followed by CREATE TABLE with the same
table identifier, because the recreated table's CreateTableEvent can be
considered redundant and skipped, leaving the downstream table dropped while
later data changes are still written.
The proposed fix also covers FLINK-39328 by making DropTableEvent invalidate
the pre-partition hash function cache and broadcast the event directly, instead
of treating it as a normal SchemaChangeEvent that reloads the latest evolved
schema / recreates the hash function for an already dropped table.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)