[ 
https://issues.apache.org/jira/browse/FLINK-38741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-38741:
-----------------------------------
    Labels: pull-request-available  (was: )

> MODIFY COLUMN position change events are ignored causing schema evolution 
> failure
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-38741
>                 URL: https://issues.apache.org/jira/browse/FLINK-38741
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>         Environment: * {*}Flink CDC Version{*}: 3.5.0
>  * {*}Flink Version{*}: 1.19.1
>  * {*}Source Database{*}: MySQL 8.0
>  * {*}Sink{*}: Apache Paimon
>  * {*}Deployment Mode{*}: Pipeline
>            Reporter: cheng qian
>            Priority: Major
>              Labels: pull-request-available
>
> h3. Summary
> When using MySQL CDC Pipeline to synchronize data to Paimon, {{ALTER TABLE 
> MODIFY COLUMN}} operations that change column order are *ignored* and not 
> propagated to the downstream sink, causing schema evolution failure and 
> potential data inconsistency.
> h3. Problem Statement
> When executing DDL statements like {{ALTER TABLE MODIFY COLUMN col2 
> VARCHAR(100) AFTER col3}} to change column order in MySQL, the schema change 
> event is:
>  # *Filtered out* by schema evolution framework, OR
>  # *Not processed* by Paimon MetadataApplier
> This causes:
>  * Schema inconsistency between source and sink
>  * Schema evolution failure
>  * Potential data corruption after job restart
> h2. Steps to Reproduce
> h3. 1. Environment Setup
> {*}MySQL Table Structure{*}:
>  
> {code:java}
> CREATE TABLE test_column_order (
>     id INT PRIMARY KEY,
>     col1 int,
>     col2 VARCHAR(50),
>     col3 VARCHAR(50)
> );INSERT INTO test_column_order VALUES 
>     (1, 1, 'b1', 'c1'),
>     (2, 2, 'b2', 'c2'),
>     (3, 3, 'b3', 'c3'); {code}
> *Flink CDC Pipeline Configuration* ({{{}mysql-to-paimon.yaml{}}}):
> {code:java}
> source:
>   type: mysql
>   hostname: localhost
>   port: 3306
>   username: root
>   password: password
>   database-name: test_db
>   table-name: test_column_order
>   server-id: 5400-5404
> sink:
>   type: paimon
>   catalog-type: filesystem
>   warehouse: /tmp/paimon
>   
> pipeline:
>   name: MySQL to Paimon Pipeline
>   parallelism: 1
>   
> route:
>   - source-table: test_db.test_column_order
>     sink-table: paimon_db.test_column_order {code}
> 2. Start Pipeline
> {code:java}
> ./bin/flink-cdc.sh mysql-to-paimon.yaml {code}
> 3. Verify Initial Synchronization
> {code:java}
> -- Query in Paimon
> SELECT * FROM paimon_db.test_column_order; {code}
> Expected column order: id, col1, col2, col3
> h3. 4. Execute Column Order Change
> {code:java}
> -- Execute in MySQL
> ALTER TABLE test_column_order 
> MODIFY COLUMN col2 VARCHAR(50) AFTER col3;
> -- Verify column order in MySQL
> DESC test_column_order; {code}
> MySQL column order becomes: id, col1, col3, col2 ✅
> h3. 5. Check Paimon Table Structure
> {code:java}
> -- Query table structure in Paimon
> DESC paimon_db.test_column_order; {code}
> Expected Result: Column order should be id, col1, col3, col2 Actual Result: 
> Column order remains id, col1, col2, col3 ❌
> h3. 6. Insert New Data for Verification
>  
> {code:java}
> -- Insert new data in MySQL
> INSERT INTO test_column_order VALUES (4, 4, 'c4', 'b4');
> -- Note: Column order is now id, col1, col3, col2 {code}
> {*}Expected{*}: Data in Paimon should be {{(4, 4, 'c4', 'b4')}}{*}Actual{*}: 
> May encounter data misalignment or synchronization failure
>  
> h2. Expected Behavior
>  # MySQL CDC should capture {{MODIFY COLUMN}} position change events
>  # Schema evolution framework should handle column reordering events
>  # Paimon MetadataApplier should execute column reordering operations
>  # Sink table column order should remain consistent with source table
> h2. Actual Behavior
>  # {{MODIFY COLUMN}} position change events are ignored
>  # Sink table column order remains unchanged
>  # No error logs or warning messages
>  # Schema evolution fails silently
> h2. Root Cause Analysis
> h3. Possible Causes
> h4. 1. MySQL CDC Connector Not Capturing Column Position Information
> At the {{MySqlStreamingChangeEventSource}} or Debezium level, only column 
> type changes may be captured while position changes are ignored.
> h4. 2. Schema Evolution Framework Filtering "Position-Only" Events
> In {{{}SchemaChangeEventVisitor{}}}, "position-only changes" might be 
> considered invalid schema changes and filtered out.
> h4. 3. Paimon MetadataApplier Not Supporting Column Reordering
> {{PaimonMetadataApplier}} may not have implemented column reordering logic.
> h3. Related Code Locations
>  # When executing ALTER TABLE MODIFY/CHANGE COLUMN statements, the CDC 
> program captures AlterColumnTypeEvent events, but these events only contain 
> the modified column type information without column position information.
>  
> {code:java}
> // 
> flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaChangeEventVisitor.java
> public class AlterColumnTypeEvent implements SchemaChangeEventWithPreSchema, 
> SchemaChangeEvent {
>     private static final long serialVersionUID = 1L;
>     private final TableId tableId;
>     /** key => column name, value => column type after changing. */
>     private final Map<String, DataType> typeMapping;
>     private final Map<String, DataType> oldTypeMapping;
>     ...
> }{code}
>  # AlterColumnTypeEvent does not change the column type during modification, 
> so the validation here discards it, and therefore this event is not 
> propagated to downstream.
>  
> {code:java}
> //flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
> public static boolean isSchemaChangeEventRedundant(
>             @Nullable Schema currentSchema, SchemaChangeEvent event) {
>         Optional<Schema> latestSchema = Optional.ofNullable(currentSchema);
>         return Boolean.TRUE.equals(
>                 SchemaChangeEventVisitor.visit(
>                         event,
>                         addColumnEvent -> {
>                            // 
>                         },
>                         alterColumnTypeEvent -> {
>                             // It has not been applied if schema does not 
> even exist
>                             if (!latestSchema.isPresent()) {
>                                 return false;
>                             }
>                             Schema schema = latestSchema.get();
>                             // It has been applied only if all column types 
> are set as expected
>                             for (Map.Entry<String, DataType> entry :
>                                     
> alterColumnTypeEvent.getTypeMapping().entrySet()) {
>                                 if 
> (!schema.getColumn(entry.getKey()).isPresent()
>                                         || !schema.getColumn(entry.getKey())
>                                                 .get()
>                                                 .getType()
>                                                 .equals(entry.getValue())) {
>                                     return false;
>                                 }
>                             }
>                             return true;
>                         }
>                       //
> }
>  {code}
>  
>  
> h2. Impact
> h3. Severity: Critical
>  # {*}Data Consistency Issues{*}: Schema inconsistency between source and 
> sink tables
>  # {*}Data Misalignment Risk{*}: May cause data to be written to wrong 
> columns after restart
>  # {*}Production Environment Risk{*}: Affects all users using Pipeline mode
>  # {*}Silent Failure{*}: No error messages, making the issue hard to detect
> {{}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to