VinaySagarGonabavi commented on code in PR #4279:
URL: https://github.com/apache/flink-cdc/pull/4279#discussion_r2921457044
##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java:
##########
@@ -120,7 +126,35 @@ public static Schema applySchemaChangeEvent(Schema schema,
SchemaChangeEvent eve
private static Schema applyAddColumnEvent(AddColumnEvent event, Schema
oldSchema) {
LinkedList<Column> columns = new LinkedList<>(oldSchema.getColumns());
+ Set<String> existingColumnNames =
+ columns.stream()
+ .map(Column::getName)
+ .collect(Collectors.toCollection(HashSet::new));
for (AddColumnEvent.ColumnWithPosition columnWithPosition :
event.getAddedColumns()) {
+ // Skip columns that already exist in the schema to handle
duplicate AddColumnEvents
+ // (e.g., from gh-ost online schema migrations)
+ if
(existingColumnNames.contains(columnWithPosition.getAddColumn().getName())) {
+ Column incomingColumn = columnWithPosition.getAddColumn();
+ columns.stream()
+ .filter(c ->
c.getName().equals(incomingColumn.getName()))
+ .findFirst()
+ .ifPresent(
+ existingColumn -> {
+ if (!existingColumn
+ .getType()
+ .equals(incomingColumn.getType()))
{
Review Comment:
No coercions or implicit casting are performed. When a duplicate column is
detected by name, the existing column definition is preserved as-is and the
incoming duplicate is skipped. If the types differ, a WARN log is emitted:
"Skipping duplicate column '{}' for table {} but types differ: existing={},
incoming={}". This is intentional for the gh-ost use case. Duplicate
AddColumnEvents from online schema migration tools should have matching types.
A type mismatch indicates a potential upstream inconsistency that should be
investigated, but we don't want to crash the pipeline over it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]