[
https://issues.apache.org/jira/browse/FLINK-38830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-38830:
-----------------------------------
Labels: Flink-CDC pull-request-available (was: Flink-CDC)
> PreTransformOperator needs to avoid crashing with "Field names must be
> unique" error and handle processing a duplicate AddColumnEvent better
> --------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-38830
> URL: https://issues.apache.org/jira/browse/FLINK-38830
> Project: Flink
> Issue Type: New Feature
> Components: Flink CDC
> Affects Versions: cdc-3.4.0
> Environment: * Flink CDC Version: 3.3+ (issue found on 3.4.0, present
> on master f5204243)
> * Flink Version: 1.20+
> * Migration Tool: gh-ost
> * Configuration: Shadow tables properly excluded via `tables.exclude`
> Reporter: Vinay Sagar Gonabavi
> Priority: Major
> Labels: Flink-CDC, pull-request-available
>
> Flink CDC pipelines can crash upon duplicate ADD COLUMN events (happened in
> our case during MySQL source gh-ost online schema migrations) with
>
> {code:java}
> Caused by: java.lang.IllegalArgumentException: Field names must be unique.
> Found duplicates: [new_column_name]{code}
>
> Even with gh-ost shadow tables properly excluded, noticed duplicate
> `AddColumnEvent`s for the same column, causing operators to fail
>
> *Steps to Reproduce*
> 1. Configure Flink CDC pipeline with gh-ost shadow table exclusion:
> {code:java}
> source:
> type: mysql
> tables.exclude: '^\.+.\.+(\_del$|\_ghc$|\_gho)$'
> {code}
> 2. Run gh-ost migration (In our case) but any form of duplication ADD COLUMN
> events
> {code:java}
> gh-ost --alter="ADD COLUMN new_column INT" --table=users --force-table-names
> {table_prefix}{code}
>
> 3. Operators crash: "Field names must be unique"
> *Expected Behavior*
> * Operators detect column already exists in schema
> * Skip duplicate `AddColumnEvent` gracefully
> * Continue processing without crashes
> *Actual Behavior*
> * Operators crashes and pipeline fails with duplicate column error
> * Requires manual intervention and job restart
> * Blocks zero-downtime schema migrations
> *Proposed Solution*
> Add idempotent `AddColumnEvent` handling - operators should check if column
> exists before applying:
> * *Core Logic*
>
> {code:java}
> if (event instanceof AddColumnEvent) {
> AddColumnEvent addEvent = (AddColumnEvent) event;
> Schema currentSchema = getCurrentSchema(addEvent.tableId());
> Set<String> existingColumns = new
> HashSet<>(currentSchema.getColumnNames());
> // Filter columns that already exist
> List<ColumnWithPosition> columnsToAdd =
> addEvent.getAddedColumns().stream()
> .filter(col ->
> !existingColumns.contains(col.getAddColumn().getName()))
> .collect(Collectors.toList());
> if (columnsToAdd.isEmpty()) {
> LOG.info("Skipping duplicate AddColumnEvent for table {} " +
> "(likely from gh-ost cutover)", addEvent.tableId());
> return Optional.empty(); // Skip event
> }
> // Create new event with only non-duplicate columns
> event = new AddColumnEvent(addEvent.tableId(), columnsToAdd);
> }
> {code}
> * *Apply to Multiple Operators*
> 1. **PreTransformOperator**: Filter duplicates before applying to source
> schema
> 2. **PostTransformOperator**: Filter duplicates before applying to
> post-transform schema
> 3. **Sink Operators** (e.g., Paimon BucketAssignOperator): Filter before
> broadcasting
> * *Handle Position References*
> When filtering columns, adjust position references:
> - If position references a filtered column → change to `LAST`
> - Prevents dangling references to non-existent columns
> * *Includes thread-safety fixes for serializers*
> * *Preserves state consistency*
>
> I have a working patch validated in production that I can contribute after
> community review.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)