Copilot commented on code in PR #4279:
URL: https://github.com/apache/flink-cdc/pull/4279#discussion_r2867220304
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java:
##########
@@ -215,6 +220,24 @@ private CreateTableEvent cacheCreateTable(CreateTableEvent
event) {
private Optional<SchemaChangeEvent> cacheChangeSchema(SchemaChangeEvent
event) {
TableId tableId = event.tableId();
PreTransformChangeInfo tableChangeInfo =
preTransformChangeInfoMap.get(tableId);
+
+ // Filter out redundant AddColumnEvent columns that already exist in
the schema
+ // to handle duplicate events from tools like gh-ost online schema
migrations
+ if (event instanceof AddColumnEvent) {
+ AddColumnEvent addColumnEvent = (AddColumnEvent) event;
+ Schema currentSchema = tableChangeInfo.getSourceSchema();
+ Optional<AddColumnEvent> filtered =
+ SchemaUtils.filterRedundantAddColumns(currentSchema,
addColumnEvent);
+ if (!filtered.isPresent()) {
+ LOG.debug(
+ "Skipping fully redundant AddColumnEvent for table {} "
+ + "- all columns already exist",
+ tableId);
+ return Optional.empty();
+ }
+ event = filtered.get();
+ }
Review Comment:
In `processEvent`, `preTransformProcessorMap.remove(tableId)` is executed
for every `SchemaChangeEvent`. With the new early-return here, a
fully-redundant `AddColumnEvent` will return `Optional.empty()` before
`cachePreTransformProcessor(...)` is called, so the processor map entry is
never rebuilt. This can cause subsequent `DataChangeEvent`s for the table to
fail the `processor != null` check and crash the pipeline. Ensure the processor
is re-cached even when the AddColumnEvent is filtered to a no-op (or avoid
removing the processor map entry in the redundant case).
##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java:
##########
@@ -840,6 +840,245 @@ void testSchemaChangeWithPostWildcard() throws Exception {
.runTests("inserting columns at last");
}
+ /**
+ * This case tests that duplicate AddColumnEvents are handled gracefully
by both
+ * PreTransformOperator and PostTransformOperator. When the same column is
added twice (e.g.,
+ * from gh-ost online schema migrations), the second event should be
filtered out.
+ */
+ @Test
+ void testDuplicateAddColumnEventPreTransform() throws Exception {
+ TableId tableId = TableId.tableId("my_company", "my_branch",
"data_changes");
+ TransformWithSchemaEvolveTestCase.of(
+ tableId,
+ "*, id + age as computed",
+ "name <> 'Alice'",
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("id",
DataTypes.INT().notNull())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .physicalColumn("computed", DataTypes.INT())
+ .primaryKey("id")
+ .build())
+ .initializeHarness()
+ .runTests("initializing table")
+ // First AddColumnEvent: add "extras" column
+ .evolveFromSource(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+
Column.physicalColumn("extras", DataTypes.FLOAT()),
+
AddColumnEvent.ColumnPosition.LAST,
+ null))))
+ .expectInPreTransformed(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+
Column.physicalColumn("extras", DataTypes.FLOAT()),
+
AddColumnEvent.ColumnPosition.AFTER,
+ "age"))))
+ .expectInPostTransformed(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+
Column.physicalColumn("extras", DataTypes.FLOAT()),
+
AddColumnEvent.ColumnPosition.AFTER,
+ "age"))))
+ .runTests("adding extras column first time")
+ // Duplicate AddColumnEvent: add "extras" column again (should
be filtered)
+ .evolveFromSource(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+
Column.physicalColumn("extras", DataTypes.FLOAT()),
+
AddColumnEvent.ColumnPosition.LAST,
+ null))))
+ .expectNothingInPreTransformed()
+ .expectNothingInPostTransformed()
+ .runTests("duplicate add extras column should be filtered")
Review Comment:
These duplicate-AddColumnEvent tests stop immediately after asserting the
redundant event is filtered. To prevent regressions like losing internal
transform processors/state, it would be useful to also process at least one
subsequent `DataChangeEvent` after the duplicate schema event and assert the
pipeline continues to transform records correctly (i.e., no operator crash /
missing schema view).
```suggestion
.runTests("duplicate add extras column should be filtered")
// After the duplicate AddColumnEvent has been filtered,
process a subsequent
// DataChangeEvent to verify that the pipeline continues to
transform records
// correctly (i.e., no operator crash or missing schema
view).
// The concrete DataChangeEvent and its expected transformed
output should follow
// the same construction patterns as other tests in this
class.
// For example (pseudocode, to be aligned with existing
helpers in this test):
// .evolveFromSource(
// someInsertOrUpdateDataChangeEventFor(tableId, id,
name, age, extras))
// .expectInPreTransformed(expectedPreTransformEvent)
// .expectInPostTransformed(expectedPostTransformEvent)
// .runTests("data change after duplicate add extras column
is still processed")
```
##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java:
##########
@@ -120,7 +120,16 @@ public static Schema applySchemaChangeEvent(Schema schema,
SchemaChangeEvent eve
private static Schema applyAddColumnEvent(AddColumnEvent event, Schema
oldSchema) {
LinkedList<Column> columns = new LinkedList<>(oldSchema.getColumns());
+ Set<String> existingColumnNames =
+ columns.stream()
+ .map(Column::getName)
+ .collect(Collectors.toCollection(HashSet::new));
for (AddColumnEvent.ColumnWithPosition columnWithPosition :
event.getAddedColumns()) {
+ // Skip columns that already exist in the schema to handle
duplicate AddColumnEvents
+ // (e.g., from gh-ost online schema migrations)
+ if
(existingColumnNames.contains(columnWithPosition.getAddColumn().getName())) {
+ continue;
+ }
Review Comment:
`applyAddColumnEvent` now silently skips adding a column when the name
already exists. This can mask upstream inconsistencies (e.g., same column name
but different type/comment/default), leaving the schema potentially out of sync
with the source without any signal. Consider validating that the existing
column definition matches the incoming `addColumn` (and throw or at least
log/warn when it differs) so only true duplicates are treated as idempotent.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]