[
https://issues.apache.org/jira/browse/FLINK-38741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-38741:
-----------------------------------
Labels: pull-request-available (was: )
> MODIFY COLUMN position change events are ignored causing schema evolution
> failure
> ---------------------------------------------------------------------------------
>
> Key: FLINK-38741
> URL: https://issues.apache.org/jira/browse/FLINK-38741
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Affects Versions: cdc-3.5.0
> Environment: * {*}Flink CDC Version{*}: 3.5.0
> * {*}Flink Version{*}: 1.19.1
> * {*}Source Database{*}: MySQL 8.0
> * {*}Sink{*}: Apache Paimon
> * {*}Deployment Mode{*}: Pipeline
> Reporter: cheng qian
> Priority: Major
> Labels: pull-request-available
>
> h3. Summary
> When using MySQL CDC Pipeline to synchronize data to Paimon, {{ALTER TABLE
> MODIFY COLUMN}} operations that change column order are *ignored* and not
> propagated to the downstream sink, causing schema evolution failure and
> potential data inconsistency.
> h3. Problem Statement
> When executing DDL statements like {{ALTER TABLE MODIFY COLUMN col2
> VARCHAR(100) AFTER col3}} to change column order in MySQL, the schema change
> event is:
> # *Filtered out* by schema evolution framework, OR
> # *Not processed* by Paimon MetadataApplier
> This causes:
> * Schema inconsistency between source and sink
> * Schema evolution failure
> * Potential data corruption after job restart
> h2. Steps to Reproduce
> h3. 1. Environment Setup
> {*}MySQL Table Structure{*}:
>
> {code:java}
> CREATE TABLE test_column_order (
> id INT PRIMARY KEY,
> col1 int,
> col2 VARCHAR(50),
> col3 VARCHAR(50)
> );INSERT INTO test_column_order VALUES
> (1, 1, 'b1', 'c1'),
> (2, 2, 'b2', 'c2'),
> (3, 3, 'b3', 'c3'); {code}
> *Flink CDC Pipeline Configuration* ({{{}mysql-to-paimon.yaml{}}}):
> {code:java}
> source:
> type: mysql
> hostname: localhost
> port: 3306
> username: root
> password: password
> database-name: test_db
> table-name: test_column_order
> server-id: 5400-5404
> sink:
> type: paimon
> catalog-type: filesystem
> warehouse: /tmp/paimon
>
> pipeline:
> name: MySQL to Paimon Pipeline
> parallelism: 1
>
> route:
> - source-table: test_db.test_column_order
> sink-table: paimon_db.test_column_order {code}
> 2. Start Pipeline
> {code:java}
> ./bin/flink-cdc.sh mysql-to-paimon.yaml {code}
> 3. Verify Initial Synchronization
> {code:java}
> -- Query in Paimon
> SELECT * FROM paimon_db.test_column_order; {code}
> Expected column order: id, col1, col2, col3
> h3. 4. Execute Column Order Change
> {code:java}
> -- Execute in MySQL
> ALTER TABLE test_column_order
> MODIFY COLUMN col2 VARCHAR(50) AFTER col3;
> -- Verify column order in MySQL
> DESC test_column_order; {code}
> MySQL column order becomes: id, col1, col3, col2 ✅
> h3. 5. Check Paimon Table Structure
> {code:java}
> -- Query table structure in Paimon
> DESC paimon_db.test_column_order; {code}
> Expected Result: Column order should be id, col1, col3, col2 Actual Result:
> Column order remains id, col1, col2, col3 ❌
> h3. 6. Insert New Data for Verification
>
> {code:java}
> -- Insert new data in MySQL
> INSERT INTO test_column_order VALUES (4, 4, 'c4', 'b4');
> -- Note: Column order is now id, col1, col3, col2 {code}
> {*}Expected{*}: Data in Paimon should be {{(4, 4, 'c4', 'b4')}}{*}Actual{*}:
> May encounter data misalignment or synchronization failure
>
> h2. Expected Behavior
> # MySQL CDC should capture {{MODIFY COLUMN}} position change events
> # Schema evolution framework should handle column reordering events
> # Paimon MetadataApplier should execute column reordering operations
> # Sink table column order should remain consistent with source table
> h2. Actual Behavior
> # {{MODIFY COLUMN}} position change events are ignored
> # Sink table column order remains unchanged
> # No error logs or warning messages
> # Schema evolution fails silently
> h2. Root Cause Analysis
> h3. Possible Causes
> h4. 1. MySQL CDC Connector Not Capturing Column Position Information
> At the {{MySqlStreamingChangeEventSource}} or Debezium level, only column
> type changes may be captured while position changes are ignored.
> h4. 2. Schema Evolution Framework Filtering "Position-Only" Events
> In {{{}SchemaChangeEventVisitor{}}}, "position-only changes" might be
> considered invalid schema changes and filtered out.
> h4. 3. Paimon MetadataApplier Not Supporting Column Reordering
> {{PaimonMetadataApplier}} may not have implemented column reordering logic.
> h3. Related Code Locations
> # When executing ALTER TABLE MODIFY/CHANGE COLUMN statements, the CDC
> program captures AlterColumnTypeEvent events, but these events only contain
> the modified column type information without column position information.
>
> {code:java}
> //
> flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaChangeEventVisitor.java
> public class AlterColumnTypeEvent implements SchemaChangeEventWithPreSchema,
> SchemaChangeEvent {
> private static final long serialVersionUID = 1L;
> private final TableId tableId;
> /** key => column name, value => column type after changing. */
> private final Map<String, DataType> typeMapping;
> private final Map<String, DataType> oldTypeMapping;
> ...
> }{code}
> # AlterColumnTypeEvent does not change the column type during modification,
> so the validation here discards it, and therefore this event is not
> propagated to downstream.
>
> {code:java}
> //flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
> public static boolean isSchemaChangeEventRedundant(
> @Nullable Schema currentSchema, SchemaChangeEvent event) {
> Optional<Schema> latestSchema = Optional.ofNullable(currentSchema);
> return Boolean.TRUE.equals(
> SchemaChangeEventVisitor.visit(
> event,
> addColumnEvent -> {
> //
> },
> alterColumnTypeEvent -> {
> // It has not been applied if schema does not
> even exist
> if (!latestSchema.isPresent()) {
> return false;
> }
> Schema schema = latestSchema.get();
> // It has been applied only if all column types
> are set as expected
> for (Map.Entry<String, DataType> entry :
>
> alterColumnTypeEvent.getTypeMapping().entrySet()) {
> if
> (!schema.getColumn(entry.getKey()).isPresent()
> || !schema.getColumn(entry.getKey())
> .get()
> .getType()
> .equals(entry.getValue())) {
> return false;
> }
> }
> return true;
> }
> //
> }
> {code}
>
>
> h2. Impact
> h3. Severity: Critical
> # {*}Data Consistency Issues{*}: Schema inconsistency between source and
> sink tables
> # {*}Data Misalignment Risk{*}: May cause data to be written to wrong
> columns after restart
> # {*}Production Environment Risk{*}: Affects all users using Pipeline mode
> # {*}Silent Failure{*}: No error messages, making the issue hard to detect
> {{}}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)