cheng qian created FLINK-38741:
----------------------------------

             Summary: MODIFY COLUMN position change events are ignored causing 
schema evolution failure
                 Key: FLINK-38741
                 URL: https://issues.apache.org/jira/browse/FLINK-38741
             Project: Flink
          Issue Type: Improvement
          Components: Flink CDC
    Affects Versions: cdc-3.5.0
         Environment: * {*}Flink CDC Version{*}: 3.5.0
 * {*}Flink Version{*}: 1.19.1
 * {*}Source Database{*}: MySQL 8.0
 * {*}Sink{*}: Apache Paimon
 * {*}Deployment Mode{*}: Pipeline
            Reporter: cheng qian


h3. Summary

When using MySQL CDC Pipeline to synchronize data to Paimon, {{ALTER TABLE 
MODIFY COLUMN}} operations that change column order are *ignored* and not 
propagated to the downstream sink, causing schema evolution failure and 
potential data inconsistency.
h3. Problem Statement

When executing DDL statements like {{ALTER TABLE MODIFY COLUMN col2 
VARCHAR(100) AFTER col3}} to change column order in MySQL, the schema change 
event is:
 # *Filtered out* by schema evolution framework, OR
 # *Not processed* by Paimon MetadataApplier

This causes:
 * Schema inconsistency between source and sink
 * Schema evolution failure
 * Potential data corruption after job restart

h2. Steps to Reproduce
h3. 1. Environment Setup

{*}MySQL Table Structure{*}:

 
{code:java}
CREATE TABLE test_column_order (
    id INT PRIMARY KEY,
    col1 int,
    col2 VARCHAR(50),
    col3 VARCHAR(50)
);INSERT INTO test_column_order VALUES 
    (1, 1, 'b1', 'c1'),
    (2, 2, 'b2', 'c2'),
    (3, 3, 'b3', 'c3'); {code}
*Flink CDC Pipeline Configuration* ({{{}mysql-to-paimon.yaml{}}}):
{code:java}
source:
  type: mysql
  hostname: localhost
  port: 3306
  username: root
  password: password
  database-name: test_db
  table-name: test_column_order
  server-id: 5400-5404
sink:
  type: paimon
  catalog-type: filesystem
  warehouse: /tmp/paimon
  
pipeline:
  name: MySQL to Paimon Pipeline
  parallelism: 1
  
route:
  - source-table: test_db.test_column_order
    sink-table: paimon_db.test_column_order {code}
2. Start Pipeline
{code:java}
./bin/flink-cdc.sh mysql-to-paimon.yaml {code}
3. Verify Initial Synchronization
{code:java}
-- Query in Paimon
SELECT * FROM paimon_db.test_column_order; {code}
Expected column order: id, col1, col2, col3
h3. 4. Execute Column Order Change
{code:java}
-- Execute in MySQL
ALTER TABLE test_column_order 
MODIFY COLUMN col2 VARCHAR(50) AFTER col3;
-- Verify column order in MySQL
DESC test_column_order; {code}
MySQL column order becomes: id, col1, col3, col2 ✅
h3. 5. Check Paimon Table Structure
{code:java}
-- Query table structure in Paimon
DESC paimon_db.test_column_order; {code}
Expected Result: Column order should be id, col1, col3, col2 Actual Result: 
Column order remains id, col1, col2, col3 ❌
h3. 6. Insert New Data for Verification

 
{code:java}
-- Insert new data in MySQL
INSERT INTO test_column_order VALUES (4, 4, 'c4', 'b4');
-- Note: Column order is now id, col1, col3, col2 {code}
{*}Expected{*}: Data in Paimon should be {{(4, 4, 'c4', 'b4')}}{*}Actual{*}: 
May encounter data misalignment or synchronization failure
 
h2. Expected Behavior
 # MySQL CDC should capture {{MODIFY COLUMN}} position change events
 # Schema evolution framework should handle column reordering events
 # Paimon MetadataApplier should execute column reordering operations
 # Sink table column order should remain consistent with source table

h2. Actual Behavior
 # {{MODIFY COLUMN}} position change events are ignored
 # Sink table column order remains unchanged
 # No error logs or warning messages
 # Schema evolution fails silently

h2. Root Cause Analysis
h3. Possible Causes
h4. 1. MySQL CDC Connector Not Capturing Column Position Information

At the {{MySqlStreamingChangeEventSource}} or Debezium level, only column type 
changes may be captured while position changes are ignored.
h4. 2. Schema Evolution Framework Filtering "Position-Only" Events

In {{{}SchemaChangeEventVisitor{}}}, "position-only changes" might be 
considered invalid schema changes and filtered out.
h4. 3. Paimon MetadataApplier Not Supporting Column Reordering

{{PaimonMetadataApplier}} may not have implemented column reordering logic.
h3. Related Code Locations
 # When executing ALTER TABLE MODIFY/CHANGE COLUMN statements, the CDC program 
captures AlterColumnTypeEvent events, but these events only contain the 
modified column type information without column position information.

 
{code:java}
// 
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaChangeEventVisitor.java
public class AlterColumnTypeEvent implements SchemaChangeEventWithPreSchema, 
SchemaChangeEvent {

    private static final long serialVersionUID = 1L;

    private final TableId tableId;

    /** key => column name, value => column type after changing. */
    private final Map<String, DataType> typeMapping;

    private final Map<String, DataType> oldTypeMapping;

    ...
}{code}
 # AlterColumnTypeEvent does not change the column type during modification, so 
the validation here discards it, and therefore this event is not propagated to 
downstream.

 
{code:java}
//flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
public static boolean isSchemaChangeEventRedundant(
            @Nullable Schema currentSchema, SchemaChangeEvent event) {
        Optional<Schema> latestSchema = Optional.ofNullable(currentSchema);
        return Boolean.TRUE.equals(
                SchemaChangeEventVisitor.visit(
                        event,
                        addColumnEvent -> {
                           // 
                        },
                        alterColumnTypeEvent -> {
                            // It has not been applied if schema does not even 
exist
                            if (!latestSchema.isPresent()) {
                                return false;
                            }
                            Schema schema = latestSchema.get();

                            // It has been applied only if all column types are 
set as expected
                            for (Map.Entry<String, DataType> entry :
                                    
alterColumnTypeEvent.getTypeMapping().entrySet()) {
                                if 
(!schema.getColumn(entry.getKey()).isPresent()
                                        || !schema.getColumn(entry.getKey())
                                                .get()
                                                .getType()
                                                .equals(entry.getValue())) {
                                    return false;
                                }
                            }
                            return true;
                        }
                      //
}
 {code}
 

 
h2. Impact
h3. Severity: Critical
 # {*}Data Consistency Issues{*}: Schema inconsistency between source and sink 
tables
 # {*}Data Misalignment Risk{*}: May cause data to be written to wrong columns 
after restart
 # {*}Production Environment Risk{*}: Affects all users using Pipeline mode
 # {*}Silent Failure{*}: No error messages, making the issue hard to detect
{{}}

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to