[ 
https://issues.apache.org/jira/browse/FLINK-39965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18090501#comment-18090501
 ] 

Ran Tao commented on FLINK-39965:
---------------------------------

the proposed fix covers FLINK-39328

> Flink CDC should clear schema state and table-level caches after 
> DropTableEvent
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-39965
>                 URL: https://issues.apache.org/jira/browse/FLINK-39965
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0, cdc-3.6.0, cdc-3.7.0
>            Reporter: Ran Tao
>            Priority: Major
>
> When a Flink CDC pipeline job synchronizes MySQL tables to a sink that 
> supports schema evolution, a table lifecycle like:
>   1. initial snapshot + binlog sync succeeds
>   2. DROP TABLE on MySQL
>   3. CREATE TABLE again with the same table identifier
>   4. INSERT into the recreated table
> the job will fail.
> The observed failure was reproduced with MySQL pipeline source and StarRocks 
> pipeline sink. StarRocks reports that the target table is unknown during 
> stream-load flush, because Flink CDC has already applied the DROP TABLE to 
> the sink, but the following CreateTableEvent is considered redundant and is 
> skipped.
> Although the failure is observed with StarRocks, the root cause appears to be 
> in the Flink CDC runtime schema state / table-level caches, so this may 
> affect other sinks that support DROP_TABLE as well (such as paimon below).
> Minimal pipeline YAML:
> {code:yaml}
>   source:
>     type: mysql
>     name: MySQL Source
>     hostname: mysql.example.com
>     port: 3306
>     username: flink
>     password: ****
>     tables: flink.orders_.*
>     server-id: 5100-5104
>     scan.startup.mode: initial
>     scan.newly-added-table.enabled: false
>     scan.binlog.newly-added-table.enabled: true
>   sink:
>     type: starrocks
>     name: StarRocks Sink
>     jdbc-url: jdbc:mysql://starrocks-fe.example.com:9030
>     load-url: starrocks-fe.example.com:8030
>     username: flink
>     password: ****
>   pipeline:
>     name: mysql-to-starrocks-drop-recreate
>     schema.change.behavior: EVOLVE
>     parallelism: 2
>   {code}
>  Minimal SQL to reproduce:
>   {code:sql}
>   CREATE DATABASE IF NOT EXISTS flink;
>   CREATE TABLE flink.orders_0 (
>     id BIGINT NOT NULL,
>     order_no VARCHAR(64) NOT NULL,
>     user_name VARCHAR(64),
>     status VARCHAR(32) NOT NULL,
>     total_amount DECIMAL(18, 2) NOT NULL,
>     create_at TIMESTAMP NOT NULL,
>     PRIMARY KEY (id)
>   );
>   INSERT INTO flink.orders_0 VALUES
>     (1, 'order-1', 'Alice', 'NEW', 10.00, CURRENT_TIMESTAMP);
>   -- Wait until the initial snapshot and binlog events are synchronized.
>   DROP TABLE flink.orders_0;
>   CREATE TABLE flink.orders_0 (
>     id BIGINT NOT NULL,
>     order_no VARCHAR(64) NOT NULL,
>     user_name VARCHAR(64),
>     status VARCHAR(32) NOT NULL,
>     total_amount DECIMAL(18, 2) NOT NULL,
>     create_at TIMESTAMP NOT NULL,
>     PRIMARY KEY (id)
>   );
>   INSERT INTO flink.orders_0 VALUES
>     (2, 'order-2', 'Bob', 'NEW', 20.00, CURRENT_TIMESTAMP);
>   {code}
> *Observed behavior*
> The DROP TABLE event is applied to the sink successfully:
> {code}
> Successfully applied schema change event 
> DropTableEvent\{tableId=flink.orders_0} to external system.
> {code}
> Then the recreated table's CreateTableEvent is skipped as redundant:
> {code}
> Schema change event CreateTableEvent\{tableId=flink.orders_0, schema=...} is 
> redundant for current schema ..., just skip it.
> {code}
> Later, on checkpoint flush, the StarRocks sink fails because the sink table 
> was dropped and was not created again:
> {code}
>   Caused by: com.starrocks.data.load.stream.exception.StreamLoadFailException:
>   Transaction start failed, db: flink, table: orders_0, responseBody: {
>       "Status": "ANALYSIS_ERROR",
>       "Message": "unknown table \"flink.orders_0\""
>   }
>       at 
> com.starrocks.data.load.stream.TransactionStreamLoader.doBegin(TransactionStreamLoader.java:161)
>       at 
> com.starrocks.data.load.stream.TransactionStreamLoader.begin(TransactionStreamLoader.java:103)
>       at 
> com.starrocks.data.load.stream.DefaultStreamLoader.send(DefaultStreamLoader.java:175)
>       at 
> com.starrocks.data.load.stream.v2.TransactionTableRegion.streamLoad(TransactionTableRegion.java:378)
>       at 
> com.starrocks.data.load.stream.v2.TransactionTableRegion.flush(TransactionTableRegion.java:238)
>       at 
> com.starrocks.data.load.stream.v2.StreamLoadManagerV2.flush(StreamLoadManagerV2.java:356)
>       at 
> com.starrocks.connector.flink.table.sink.v2.StarRocksWriter.flush(StarRocksWriter.java:153)
> {code}
> *Expected behavior*
> After a DROP TABLE event ends the table lifecycle, Flink CDC should clear the 
> corresponding original/evolved schema state and table-level caches. A later 
> CREATE TABLE event with the same table identifier should not be treated as a 
> duplicate CreateTableEvent from the previous lifecycle.
> The recreated table should be initialized again in the sink, and subsequent 
> data changes should be written successfully.
> *Suspected root cause*
> The runtime keeps schema state and table-level caches for the dropped table 
> id. Therefore, when a CreateTableEvent for the same table id arrives later, 
> SchemaUtils.isSchemaChangeEventRedundant(...) may treat it as already applied 
> because a previous schema is still present.
> Related runtime states/caches include:
>   * SchemaManager original/evolved schema state
>   * regular/distributed schema coordinator upstream schema views
>   * sink wrapper processed table id cache
>   * transform / partitioning table-level caches
> *Relation to FLINK-39328*
> This issue is related to FLINK-39328, where a MySQL to Paimon pipeline fails 
> immediately after DROP TABLE because RegularPrePartitionOperator tries to 
> recreate the table hash function after the downstream Paimon table has 
> already been dropped.
> The root cause is in the same area: DropTableEvent is not fully treated as 
> the end of a table lifecycle in the Flink CDC runtime, so table-level runtime 
> state/caches may still be reused after the downstream table has been 
> physically removed.
> This issue covers a broader scenario than FLINK-39328:
>   * FLINK-39328 fails during DROP TABLE handling because the pre-partition 
> hash function is recreated for a dropped table.
>   * This issue fails after DROP TABLE followed by CREATE TABLE with the same 
> table identifier, because the recreated table's CreateTableEvent can be 
> considered redundant and skipped, leaving the downstream table dropped while 
> later data changes are still written.
> The proposed fix also covers FLINK-39328 by making DropTableEvent invalidate 
> the pre-partition hash function cache and broadcast the event directly, 
> instead of treating it as a normal SchemaChangeEvent that reloads the latest 
> evolved schema / recreates the hash function for an already dropped table.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to