Vinay Sagar Gonabavi created FLINK-38830:
--------------------------------------------
Summary: PreTransformOperator needs to avoid crashing with "Field
names must be unique" error and handle processing a duplicate AddColumnEvent
better
Key: FLINK-38830
URL: https://issues.apache.org/jira/browse/FLINK-38830
Project: Flink
Issue Type: New Feature
Components: Flink CDC
Affects Versions: cdc-3.4.0
Environment: * Flink CDC Version: 3.3+ (issue found on 3.4.0, present
on master f5204243)
* Flink Version: 1.20+
* Migration Tool: gh-ost
* Configuration: Shadow tables properly excluded via `tables.exclude`
Reporter: Vinay Sagar Gonabavi
Flink CDC pipelines can crash upon duplicate ADD COLUMN events (happened in our
case during MySQL source gh-ost online schema migrations) with
{code:java}
Caused by: java.lang.IllegalArgumentException: Field names must be unique.
Found duplicates: [new_column_name]{code}
Even with gh-ost shadow tables properly excluded, noticed duplicate
`AddColumnEvent`s for the same column, causing operators to fail
*Steps to Reproduce*
1. Configure Flink CDC pipeline with gh-ost shadow table exclusion:
{code:java}
source:
type: mysql
tables.exclude: '^\.+.\.+(\_del$|\_ghc$|\_gho)$'
{code}
2. Run gh-ost migration (In our case) but any form of duplication ADD COLUMN
events
{code:java}
gh-ost --alter="ADD COLUMN new_column INT" --table=users --force-table-names
{table_prefix}{code}
3. Operators crash: "Field names must be unique"
*Expected Behavior*
* Operators detect column already exists in schema
* Skip duplicate `AddColumnEvent` gracefully
* Continue processing without crashes
*Actual Behavior*
* Operators crashes and pipeline fails with duplicate column error
* Requires manual intervention and job restart
* Blocks zero-downtime schema migrations
*Proposed Solution*
Add idempotent `AddColumnEvent` handling - operators should check if column
exists before applying:
* *Core Logic*
{code:java}
if (event instanceof AddColumnEvent) {
AddColumnEvent addEvent = (AddColumnEvent) event;
Schema currentSchema = getCurrentSchema(addEvent.tableId());
Set<String> existingColumns = new HashSet<>(currentSchema.getColumnNames());
// Filter columns that already exist
List<ColumnWithPosition> columnsToAdd =
addEvent.getAddedColumns().stream()
.filter(col ->
!existingColumns.contains(col.getAddColumn().getName()))
.collect(Collectors.toList());
if (columnsToAdd.isEmpty()) {
LOG.info("Skipping duplicate AddColumnEvent for table {} " +
"(likely from gh-ost cutover)", addEvent.tableId());
return Optional.empty(); // Skip event
}
// Create new event with only non-duplicate columns
event = new AddColumnEvent(addEvent.tableId(), columnsToAdd);
}
{code}
* *Apply to Multiple Operators*
1. **PreTransformOperator**: Filter duplicates before applying to source schema
2. **PostTransformOperator**: Filter duplicates before applying to
post-transform schema
3. **Sink Operators** (e.g., Paimon BucketAssignOperator): Filter before
broadcasting
* *Handle Position References*
When filtering columns, adjust position references:
- If position references a filtered column → change to `LAST`
- Prevents dangling references to non-existent columns
* *Includes thread-safety fixes for serializers*
* *Preserves state consistency*
I have a working patch validated in production that I can contribute after
community review.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)