lllmao created FLINK-39354:
------------------------------
Summary: SchemaUtils.applyAddColumnEvent fails when referenced
existing column name differs only by case
Key: FLINK-39354
URL: https://issues.apache.org/jira/browse/FLINK-39354
Project: Flink
Issue Type: Bug
Reporter: lllmao
* Module: [flink-cdc-common]
* Problem: {{SchemaUtils.applyAddColumnEvent}} throws
{{IllegalArgumentException}} when {{AddColumnEvent.ColumnWithPosition}}
references an existing column name with different case.
* Reproduction:
** {{SchemaUtilsTest.java}}
** Code
{code:java}
void testApplyAddColumnEventWithCaseInsensitiveExistingColumnName() {
TableId tableId = TableId.parse("gc_fps_receivable.billstat");
Schema schema =
Schema.newBuilder().physicalColumn("changetype",
DataTypes.STRING()).build();
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("origintypecode", DataTypes.STRING()),
AddColumnEvent.ColumnPosition.AFTER,
"ChangeType"));
AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns);
schema = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
Assertions.assertThat(schema)
.isEqualTo(
Schema.newBuilder()
.physicalColumn("changetype", DataTypes.STRING())
.physicalColumn("origintypecode",
DataTypes.STRING())
.build());
}
{code}
* Actual behavior:
** {{SchemaUtils.applyAddColumnEvent}} fails to find the referenced column
** Exception message: {{"ChangeType of AddColumnEvent is not existed"}}
* Root cause:
** existing-column lookup is strict case-sensitive
* Expected behavior:
** {{AFTER}} / {{BEFORE}} references should match existing schema column names
case-insensitively when exact match fails
--
This message was sent by Atlassian Jira
(v8.20.10#820010)