Vinay Sagar Gonabavi created FLINK-38830:
--------------------------------------------

             Summary: PreTransformOperator needs to avoid crashing with "Field 
names must be unique" error and handle processing a duplicate AddColumnEvent 
better
                 Key: FLINK-38830
                 URL: https://issues.apache.org/jira/browse/FLINK-38830
             Project: Flink
          Issue Type: New Feature
          Components: Flink CDC
    Affects Versions: cdc-3.4.0
         Environment: * Flink CDC Version: 3.3+ (issue found on 3.4.0, present 
on master f5204243)
 * Flink Version: 1.20+
 * Migration Tool: gh-ost
 * Configuration: Shadow tables properly excluded via `tables.exclude`
            Reporter: Vinay Sagar Gonabavi


Flink CDC pipelines can crash upon duplicate ADD COLUMN events (happened in our 
case during MySQL source gh-ost online schema migrations) with

 
{code:java}
Caused by: java.lang.IllegalArgumentException: Field names must be unique. 
Found duplicates: [new_column_name]{code}
 

Even with gh-ost shadow tables properly excluded, noticed duplicate 
`AddColumnEvent`s for the same column, causing operators to fail

 

*Steps to Reproduce*

1. Configure Flink CDC pipeline with gh-ost shadow table exclusion:
{code:java}
source:
   type: mysql
   tables.exclude: '^\.+.\.+(\_del$|\_ghc$|\_gho)$'
{code}
2. Run gh-ost migration (In our case) but any form of duplication ADD COLUMN 
events
{code:java}
gh-ost --alter="ADD COLUMN new_column INT" --table=users --force-table-names 
{table_prefix}{code}
 

3. Operators crash: "Field names must be unique"

*Expected Behavior*
 * Operators detect column already exists in schema
 * Skip duplicate `AddColumnEvent` gracefully
 * Continue processing without crashes

*Actual Behavior*
 * Operators crashes and pipeline fails with duplicate column error
 * Requires manual intervention and job restart
 * Blocks zero-downtime schema migrations

*Proposed Solution*

Add idempotent `AddColumnEvent` handling - operators should check if column 
exists before applying:
 * *Core Logic*

 
{code:java}
if (event instanceof AddColumnEvent) {
    AddColumnEvent addEvent = (AddColumnEvent) event;
    Schema currentSchema = getCurrentSchema(addEvent.tableId());
    Set<String> existingColumns = new HashSet<>(currentSchema.getColumnNames());
    // Filter columns that already exist
    List<ColumnWithPosition> columnsToAdd =
        addEvent.getAddedColumns().stream()
            .filter(col -> 
!existingColumns.contains(col.getAddColumn().getName()))
            .collect(Collectors.toList());
    if (columnsToAdd.isEmpty()) {
        LOG.info("Skipping duplicate AddColumnEvent for table {} " +
                "(likely from gh-ost cutover)", addEvent.tableId());
        return Optional.empty(); // Skip event
    }
    // Create new event with only non-duplicate columns
    event = new AddColumnEvent(addEvent.tableId(), columnsToAdd);
}
{code}
 * *Apply to Multiple Operators*

1. **PreTransformOperator**: Filter duplicates before applying to source schema
2. **PostTransformOperator**: Filter duplicates before applying to 
post-transform schema
3. **Sink Operators** (e.g., Paimon BucketAssignOperator): Filter before 
broadcasting
 * *Handle Position References*

When filtering columns, adjust position references:
- If position references a filtered column → change to `LAST`
- Prevents dangling references to non-existent columns
 * *Includes thread-safety fixes for serializers*
 * *Preserves state consistency*

 

I have a working patch validated in production that I can contribute after 
community review.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to