Ran Tao created FLINK-39965:
-------------------------------

             Summary: Flink CDC should clear schema state and table-level 
caches after DropTableEvent
                 Key: FLINK-39965
                 URL: https://issues.apache.org/jira/browse/FLINK-39965
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.5.0, cdc-3.6.0, cdc-3.7.0
            Reporter: Ran Tao


When a Flink CDC pipeline job synchronizes MySQL tables to a sink that supports 
schema evolution, a table lifecycle like:

  1. initial snapshot + binlog sync succeeds
  2. DROP TABLE on MySQL
  3. CREATE TABLE again with the same table identifier
  4. INSERT into the recreated table

the job will fail.

The observed failure was reproduced with MySQL pipeline source and StarRocks 
pipeline sink. StarRocks reports that the target table is unknown during 
stream-load flush, because Flink CDC has already applied the DROP TABLE to the 
sink, but the following CreateTableEvent is considered redundant and is skipped.

Although the failure is observed with StarRocks, the root cause appears to be 
in the Flink CDC runtime schema state / table-level caches, so this may affect 
other sinks that support DROP_TABLE as well (such as paimon below).

Minimal pipeline YAML:

{code:yaml}
  source:
    type: mysql
    name: MySQL Source
    hostname: mysql.example.com
    port: 3306
    username: flink
    password: ****
    tables: flink.orders_.*
    server-id: 5100-5104
    scan.startup.mode: initial
    scan.newly-added-table.enabled: false
    scan.binlog.newly-added-table.enabled: true

  sink:
    type: starrocks
    name: StarRocks Sink
    jdbc-url: jdbc:mysql://starrocks-fe.example.com:9030
    load-url: starrocks-fe.example.com:8030
    username: flink
    password: ****

  pipeline:
    name: mysql-to-starrocks-drop-recreate
    schema.change.behavior: EVOLVE
    parallelism: 2
  {code}

  Minimal SQL to reproduce:

  {code:sql}
  CREATE DATABASE IF NOT EXISTS flink;

  CREATE TABLE flink.orders_0 (
    id BIGINT NOT NULL,
    order_no VARCHAR(64) NOT NULL,
    user_name VARCHAR(64),
    status VARCHAR(32) NOT NULL,
    total_amount DECIMAL(18, 2) NOT NULL,
    create_at TIMESTAMP NOT NULL,
    PRIMARY KEY (id)
  );

  INSERT INTO flink.orders_0 VALUES
    (1, 'order-1', 'Alice', 'NEW', 10.00, CURRENT_TIMESTAMP);

  -- Wait until the initial snapshot and binlog events are synchronized.

  DROP TABLE flink.orders_0;

  CREATE TABLE flink.orders_0 (
    id BIGINT NOT NULL,
    order_no VARCHAR(64) NOT NULL,
    user_name VARCHAR(64),
    status VARCHAR(32) NOT NULL,
    total_amount DECIMAL(18, 2) NOT NULL,
    create_at TIMESTAMP NOT NULL,
    PRIMARY KEY (id)
  );

  INSERT INTO flink.orders_0 VALUES
    (2, 'order-2', 'Bob', 'NEW', 20.00, CURRENT_TIMESTAMP);
  {code}

*Observed behavior*

The DROP TABLE event is applied to the sink successfully:

{code}
Successfully applied schema change event 
DropTableEvent\{tableId=flink.orders_0} to external system.
{code}

Then the recreated table's CreateTableEvent is skipped as redundant:

{code}
Schema change event CreateTableEvent\{tableId=flink.orders_0, schema=...} is 
redundant for current schema ..., just skip it.
{code}

Later, on checkpoint flush, the StarRocks sink fails because the sink table was 
dropped and was not created again:

{code}
  Caused by: com.starrocks.data.load.stream.exception.StreamLoadFailException:
  Transaction start failed, db: flink, table: orders_0, responseBody: {
      "Status": "ANALYSIS_ERROR",
      "Message": "unknown table \"flink.orders_0\""
  }
      at 
com.starrocks.data.load.stream.TransactionStreamLoader.doBegin(TransactionStreamLoader.java:161)
      at 
com.starrocks.data.load.stream.TransactionStreamLoader.begin(TransactionStreamLoader.java:103)
      at 
com.starrocks.data.load.stream.DefaultStreamLoader.send(DefaultStreamLoader.java:175)
      at 
com.starrocks.data.load.stream.v2.TransactionTableRegion.streamLoad(TransactionTableRegion.java:378)
      at 
com.starrocks.data.load.stream.v2.TransactionTableRegion.flush(TransactionTableRegion.java:238)
      at 
com.starrocks.data.load.stream.v2.StreamLoadManagerV2.flush(StreamLoadManagerV2.java:356)
      at 
com.starrocks.connector.flink.table.sink.v2.StarRocksWriter.flush(StarRocksWriter.java:153)
{code}

*Expected behavior*

After a DROP TABLE event ends the table lifecycle, Flink CDC should clear the 
corresponding original/evolved schema state and table-level caches. A later 
CREATE TABLE event with the same table identifier should not be treated as a 
duplicate CreateTableEvent from the previous lifecycle.

The recreated table should be initialized again in the sink, and subsequent 
data changes should be written successfully.

*Suspected root cause*

The runtime keeps schema state and table-level caches for the dropped table id. 
Therefore, when a CreateTableEvent for the same table id arrives later, 
SchemaUtils.isSchemaChangeEventRedundant(...) may treat it as already applied 
because a previous schema is still present.

Related runtime states/caches include:

  * SchemaManager original/evolved schema state
  * regular/distributed schema coordinator upstream schema views
  * sink wrapper processed table id cache
  * transform / partitioning table-level caches

Relation to FLINK-39328:

This issue is related to FLINK-39328, where a MySQL to Paimon pipeline fails 
immediately after DROP TABLE because RegularPrePartitionOperator tries to 
recreate the table hash function after the downstream Paimon table has already 
been dropped.

The root cause is in the same area: DropTableEvent is not fully treated as the 
end of a table lifecycle in the Flink CDC runtime, so table-level runtime 
state/caches may still be reused after the downstream table has been physically 
removed.

This issue covers a broader scenario than FLINK-39328:

  * FLINK-39328 fails during DROP TABLE handling because the pre-partition hash 
function is recreated for a dropped table.
  * This issue fails after DROP TABLE followed by CREATE TABLE with the same 
table identifier, because the recreated table's CreateTableEvent can be 
considered redundant and skipped, leaving the downstream table dropped while 
later data changes are still written.

The proposed fix also covers FLINK-39328 by making DropTableEvent invalidate 
the pre-partition hash function cache and broadcast the event directly, instead 
of treating it as a normal SchemaChangeEvent that reloads the latest evolved 
schema / recreates the hash function for an already dropped table.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to