[ https://issues.apache.org/jira/browse/FLINK-38197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
JunboWang updated FLINK-38197: ------------------------------ Description: {*}Problem Description{*}: When adding new columns through schema changes, flink cdc defaults to placing them at the end of the table, resulting in inconsistent column structures between the source table (e.g., MySQL) and sink table (e.g., Paimon). For example, if a new column `{{{}schema_test_1`{}}} is specified to be added after `{{{}schema_test`{}}} in the source table, Flink CDC processes it and automatically appends the column to the end of the sink table. After the task restarts with cleared state, mismatched column positions cause data misalignment (e.g., incorrect field value mapping), leading to data loss or task failure. {*}Root Cause{*}: There is a flaw in the schema change handling logic. The {{lenientizeAddColumnEvent}} method does not retain the column position information specified in the source table (e.g., {{AFTER}} a certain column) when processing add-column events. Instead, it forces the column position to {{LAST}} (end) through the default constructor of {{{}AddColumnEvent{}}}, preventing synchronization of column structures between the source and sink tables. {code:java} // code placeholder private static Stream<SchemaChangeEvent> lenientizeAddColumnEvent( AddColumnEvent schemaChangeEvent, TableId tableId) { return Stream.of( new AddColumnEvent( tableId, schemaChangeEvent.getAddedColumns().stream() .map( col -> new AddColumnEvent.ColumnWithPosition( Column.physicalColumn( col.getAddColumn().getName(), col.getAddColumn() .getType() .nullable(), col.getAddColumn().getComment(), col.getAddColumn() .getDefaultValueExpression()))) .collect(Collectors.toList()))); } public ColumnWithPosition(Column addColumn) { this.addColumn = addColumn; position = ColumnPosition.LAST; existedColumnName = null; } {code} h3. Reproduction: *1. Initial Table Structure and Data (Source Table)* The source table (e.g., MySQL) contains columns: {{{}id{}}}, {{{}price{}}}, {{{}schema_test{}}}, {{{}schema_test2{}}}, {{{}schema_test3{}}}, with the following data: ||id||price||schema_test||schema_test3|| |0|300.00|NULL|NULL| |1|4.00|NULL|NULL| |2|100.00|NULL|NULL| |3|1212.00|NULL|NULL| *2. Execute Add Column Operation And Update Data* In the source table, specify that the new column {{schema_test_order_change}} is added after {{{}schema_test{}}}.Update data in column {{schema_test_order_change.}} {{}} ||id||price||schema_test||schema_test3||schema_test_order_change|| |0|300.00|NULL|NULL|1| |1|4.00|NULL|NULL|2| |2|100.00|NULL|NULL|3| |3|1212.00|NULL|NULL|4| *3. Abnormal Result in Synchronized Sink Table (Data Loss)* After synchronization via Flink CDC, the sink table (e.g., Paimon) does not add the new column at the specified position. Instead, it is placed at the end by default, resulting in the column order: {{{}id{}}}, {{{}price{}}}, {{{}schema_test{}}}, {{{}schema_test3{}}}, {{{}schema_test_order_change{}}}, with data loss: ||id||price||schema_test||schema_test3||schema_test_order_change|| |0|300.00|NULL|NULL|NULL| |1|4.00|NULL|NULL|NULL| |2|100.00|NULL|NULL|NULL| |3|1212.00|NULL|NULL|NULL| *4. Data Inconsistency (Misalignment) After Restart* Obvious data misalignment occurs in the synchronized data after restart. The sink table's column order and data are as follows: ||id||price||schema_test||schema_test3||schema_test_order_change|| |0|300.00|NULL|1|NULL| |1|4.00|NULL|2|NULL| |2|100.00|NULL|3|NULL| |3|1212.00|NULL|4|NULL| was: {*}Problem Description{*}: When adding new columns through schema changes, flink cdc defaults to placing them at the end of the table, resulting in inconsistent column structures between the source table (e.g., MySQL) and sink table (e.g., Paimon). For example, if a new column `{{{}schema_test_1`{}}} is specified to be added after `{{{}schema_test`{}}} in the source table, Flink CDC processes it and automatically appends the column to the end of the sink table. After the task restarts with cleared state, mismatched column positions cause data misalignment (e.g., incorrect field value mapping), leading to data loss or task failure. {*}Root Cause{*}: There is a flaw in the schema change handling logic. The {{lenientizeAddColumnEvent}} method does not retain the column position information specified in the source table (e.g., {{AFTER}} a certain column) when processing add-column events. Instead, it forces the column position to {{LAST}} (end) through the default constructor of {{{}AddColumnEvent{}}}, preventing synchronization of column structures between the source and sink tables. {code:java} // code placeholder private static Stream<SchemaChangeEvent> lenientizeAddColumnEvent( AddColumnEvent schemaChangeEvent, TableId tableId) { return Stream.of( new AddColumnEvent( tableId, schemaChangeEvent.getAddedColumns().stream() .map( col -> new AddColumnEvent.ColumnWithPosition( Column.physicalColumn( col.getAddColumn().getName(), col.getAddColumn() .getType() .nullable(), col.getAddColumn().getComment(), col.getAddColumn() .getDefaultValueExpression()))) .collect(Collectors.toList()))); } public ColumnWithPosition(Column addColumn) { this.addColumn = addColumn; position = ColumnPosition.LAST; existedColumnName = null; } {code} h3. Reproduction: # *Initial Table Structure and Data (Source Table)* The source table (e.g., MySQL) contains columns: {{{}id{}}}, {{{}price{}}}, {{{}schema_test{}}}, {{{}schema_test2{}}}, {{{}schema_test3{}}}, with the following data: ||id||price||schema_test||schema_test3|| |0|300.00|NULL|NULL| |1|4.00|NULL|NULL| |2|100.00|NULL|NULL| |3|1212.00|NULL|NULL| # *Execute Add Column Operation And Update Data* In the source table, specify that the new column {{schema_test_order_change}} is added after {{{}schema_test{}}}.Update data in column {{schema_test_order_change.}} {{}} ||id||price||schema_test||schema_test3||schema_test_order_change|| |0|300.00|NULL|NULL|1| |1|4.00|NULL|NULL|2| |2|100.00|NULL|NULL|3| |3|1212.00|NULL|NULL|4| # *Abnormal Result in Synchronized Sink Table (Data Loss)* After synchronization via Flink CDC, the sink table (e.g., Paimon) does not add the new column at the specified position. Instead, it is placed at the end by default, resulting in the column order: {{{}id{}}}, {{{}price{}}}, {{{}schema_test{}}}, {{{}schema_test3{}}}, {{{}schema_test_order_change{}}}, with data loss: ||id||price||schema_test||schema_test3||schema_test_order_change|| |0|300.00|NULL|NULL|NULL| |1|4.00|NULL|NULL|NULL| |2|100.00|NULL|NULL|NULL| |3|1212.00|NULL|NULL|NULL| # *Data Inconsistency (Misalignment) After Restart* Obvious data misalignment occurs in the synchronized data after restart. The sink table's column order and data are as follows: ||id||price||schema_test||schema_test3||schema_test_order_change|| |0|300.00|NULL|1|NULL| |1|4.00|NULL|2|NULL| |2|100.00|NULL|3|NULL| |3|1212.00|NULL|4|NULL| > Data Errors Caused by Default Added Columns at End Position > ----------------------------------------------------------- > > Key: FLINK-38197 > URL: https://issues.apache.org/jira/browse/FLINK-38197 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: cdc-3.4.0 > Reporter: JunboWang > Priority: Critical > > {*}Problem Description{*}: > When adding new columns through schema changes, flink cdc defaults to placing > them at the end of the table, resulting in inconsistent column structures > between the source table (e.g., MySQL) and sink table (e.g., Paimon). For > example, if a new column `{{{}schema_test_1`{}}} is specified to be added > after `{{{}schema_test`{}}} in the source table, Flink CDC processes it and > automatically appends the column to the end of the sink table. After the task > restarts with cleared state, mismatched column positions cause data > misalignment (e.g., incorrect field value mapping), leading to data loss or > task failure. > > {*}Root Cause{*}: > There is a flaw in the schema change handling logic. The > {{lenientizeAddColumnEvent}} method does not retain the column position > information specified in the source table (e.g., {{AFTER}} a certain column) > when processing add-column events. Instead, it forces the column position to > {{LAST}} (end) through the default constructor of {{{}AddColumnEvent{}}}, > preventing synchronization of column structures between the source and sink > tables. > > {code:java} > // code placeholder > private static Stream<SchemaChangeEvent> lenientizeAddColumnEvent( > AddColumnEvent schemaChangeEvent, TableId tableId) { > return Stream.of( > new AddColumnEvent( > tableId, > schemaChangeEvent.getAddedColumns().stream() > .map( > col -> > new > AddColumnEvent.ColumnWithPosition( > Column.physicalColumn( > > col.getAddColumn().getName(), > col.getAddColumn() > .getType() > > .nullable(), > > col.getAddColumn().getComment(), > col.getAddColumn() > > .getDefaultValueExpression()))) > .collect(Collectors.toList()))); > } > public ColumnWithPosition(Column addColumn) { > this.addColumn = addColumn; > position = ColumnPosition.LAST; > existedColumnName = null; > } {code} > h3. Reproduction: > *1. Initial Table Structure and Data (Source Table)* > The source table (e.g., MySQL) contains columns: {{{}id{}}}, {{{}price{}}}, > {{{}schema_test{}}}, {{{}schema_test2{}}}, {{{}schema_test3{}}}, with the > following data: > ||id||price||schema_test||schema_test3|| > |0|300.00|NULL|NULL| > |1|4.00|NULL|NULL| > |2|100.00|NULL|NULL| > |3|1212.00|NULL|NULL| > *2. Execute Add Column Operation And Update Data* > In the source table, specify that the new column {{schema_test_order_change}} > is added after {{{}schema_test{}}}.Update data in column > {{schema_test_order_change.}} > {{}} > ||id||price||schema_test||schema_test3||schema_test_order_change|| > |0|300.00|NULL|NULL|1| > |1|4.00|NULL|NULL|2| > |2|100.00|NULL|NULL|3| > |3|1212.00|NULL|NULL|4| > *3. Abnormal Result in Synchronized Sink Table (Data Loss)* > After synchronization via Flink CDC, the sink table (e.g., Paimon) does not > add the new column at the specified position. Instead, it is placed at the > end by default, resulting in the column order: {{{}id{}}}, {{{}price{}}}, > {{{}schema_test{}}}, {{{}schema_test3{}}}, {{{}schema_test_order_change{}}}, > with data loss: > ||id||price||schema_test||schema_test3||schema_test_order_change|| > |0|300.00|NULL|NULL|NULL| > |1|4.00|NULL|NULL|NULL| > |2|100.00|NULL|NULL|NULL| > |3|1212.00|NULL|NULL|NULL| > *4. Data Inconsistency (Misalignment) After Restart* > Obvious data misalignment occurs in the synchronized data after restart. The > sink table's column order and data are as follows: > ||id||price||schema_test||schema_test3||schema_test_order_change|| > |0|300.00|NULL|1|NULL| > |1|4.00|NULL|2|NULL| > |2|100.00|NULL|3|NULL| > |3|1212.00|NULL|4|NULL| -- This message was sent by Atlassian Jira (v8.20.10#820010)