This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 11a23af64c [Improve][Oracle-CDC] Fix oracle rename ddl event missing
column type (#9314)
11a23af64c is described below
commit 11a23af64ca224e071211e59c7bc32cbc6539603
Author: hailin0 <[email protected]>
AuthorDate: Sat May 17 09:36:21 2025 +0800
[Improve][Oracle-CDC] Fix oracle rename ddl event missing column type
(#9314)
---
.../base/schema/AbstractSchemaChangeResolver.java | 63 +++++++++++++-
.../cdc/base/schema/SchemaChangeResolver.java | 5 +-
.../row/SeaTunnelRowDebeziumDeserializeSchema.java | 2 +-
.../schema/AbstractSchemaChangeResolverTest.java | 95 ++++++++++++++++++++++
4 files changed, 160 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolver.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolver.java
index 8401f86236..e3daee5a0f 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolver.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolver.java
@@ -19,12 +19,14 @@ package org.apache.seatunnel.connectors.cdc.base.schema;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
@@ -40,6 +42,7 @@ import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
@Slf4j
public abstract class AbstractSchemaChangeResolver implements
SchemaChangeResolver {
@@ -71,7 +74,7 @@ public abstract class AbstractSchemaChangeResolver implements
SchemaChangeResolv
}
@Override
- public SchemaChangeEvent resolve(SourceRecord record, SeaTunnelDataType
dataType) {
+ public SchemaChangeEvent resolve(SourceRecord record, List<CatalogTable>
catalogTables) {
TablePath tablePath = SourceRecordUtils.getTablePath(record);
String ddl = SourceRecordUtils.getDdl(record);
if (Objects.isNull(ddlParser)) {
@@ -85,6 +88,7 @@ public abstract class AbstractSchemaChangeResolver implements
SchemaChangeResolv
// Parse DDL statement using Debezium's Antlr parser
ddlParser.parse(ddl, tables);
List<AlterTableColumnEvent> parsedEvents = getAndClearParsedEvents();
+ parsedEvents = completionEvent(parsedEvents, catalogTables);
parsedEvents.forEach(e ->
e.setSourceDialectName(getSourceDialectName()));
AlterTableColumnsEvent alterTableColumnsEvent =
new AlterTableColumnsEvent(
@@ -99,6 +103,61 @@ public abstract class AbstractSchemaChangeResolver
implements SchemaChangeResolv
return parsedEvents.isEmpty() ? null : alterTableColumnsEvent;
}
+ List<AlterTableColumnEvent> completionEvent(
+ List<AlterTableColumnEvent> events, List<CatalogTable>
catalogTables) {
+ return events.stream()
+ .map(
+ columnEvent -> {
+
columnEvent.setSourceDialectName(getSourceDialectName());
+ if (catalogTables == null ||
catalogTables.isEmpty()) {
+ return columnEvent;
+ }
+ if (!(columnEvent instanceof
AlterTableChangeColumnEvent)) {
+ return columnEvent;
+ }
+
+ AlterTableChangeColumnEvent changeColumnEvent =
+ (AlterTableChangeColumnEvent) columnEvent;
+ if (changeColumnEvent.getColumn().getDataType() !=
null) {
+ return columnEvent;
+ }
+ CatalogTable table =
+ catalogTables.stream()
+ .filter(
+ catalogTable ->
+ catalogTable
+
.getTablePath()
+ .equals(
+
columnEvent
+
.getTablePath()))
+ .findFirst()
+ .orElse(null);
+ if (table != null) {
+ Column oldColumn =
+ table.getTableSchema()
+
.getColumn(changeColumnEvent.getOldColumn());
+ Column newColumn =
+
oldColumn.rename(changeColumnEvent.getColumn().getName());
+ AlterTableChangeColumnEvent newEvent =
+ new AlterTableChangeColumnEvent(
+
changeColumnEvent.getTableIdentifier(),
+
changeColumnEvent.getOldColumn(),
+ newColumn,
+ changeColumnEvent.isFirst(),
+
changeColumnEvent.getAfterColumn());
+
newEvent.setSourceDialectName(getSourceDialectName());
+ return newEvent;
+ } else {
+ log.warn(
+ "Ignoring rename column {} type
completion for table {}",
+ changeColumnEvent.getOldColumn(),
+ changeColumnEvent.getTablePath());
+ }
+ return columnEvent;
+ })
+ .collect(Collectors.toList());
+ }
+
protected abstract DdlParser createDdlParser(TablePath tablePath);
protected abstract List<AlterTableColumnEvent> getAndClearParsedEvents();
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/SchemaChangeResolver.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/SchemaChangeResolver.java
index c4d403b8d2..a391358fd9 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/SchemaChangeResolver.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/SchemaChangeResolver.java
@@ -17,16 +17,17 @@
package org.apache.seatunnel.connectors.cdc.base.schema;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.kafka.connect.source.SourceRecord;
import java.io.Serializable;
+import java.util.List;
public interface SchemaChangeResolver extends Serializable {
boolean support(SourceRecord record);
- SchemaChangeEvent resolve(SourceRecord record, SeaTunnelDataType dataType);
+ SchemaChangeEvent resolve(SourceRecord record, List<CatalogTable>
catalogTables);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
index e568154ba6..920731d489 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
@@ -126,7 +126,7 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
SchemaChangeEvent schemaChangeEvent = null;
try {
if (schemaChangeResolver != null) {
- schemaChangeEvent = schemaChangeResolver.resolve(record, null);
+ schemaChangeEvent = schemaChangeResolver.resolve(record,
tables);
}
} catch (Exception e) {
log.warn("Failed to resolve schemaChangeEvent, just skip.", e);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolverTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolverTest.java
new file mode 100644
index 0000000000..1565a25eb9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolverTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.schema;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.relational.ddl.DdlParser;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+
+public class AbstractSchemaChangeResolverTest {
+
+ @Test
+ void testCompletionEvent() {
+ JdbcSourceConfig config = mock(JdbcSourceConfig.class);
+ AbstractSchemaChangeResolver resolver =
+ new AbstractSchemaChangeResolver(config) {
+ @Override
+ protected DdlParser createDdlParser(TablePath tablePath) {
+ return null;
+ }
+
+ @Override
+ protected List<AlterTableColumnEvent>
getAndClearParsedEvents() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ protected String getSourceDialectName() {
+ return "mysql";
+ }
+ };
+
+ AlterTableChangeColumnEvent changeColumnEvent =
+ AlterTableChangeColumnEvent.change(
+ TableIdentifier.of(null, "test_db", "test_table"),
+ "old_column",
+ PhysicalColumn.builder().name("new_column").build());
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of(null, "test_db", "test_table"),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.builder()
+ .name("old_column")
+
.dataType(BasicType.STRING_TYPE)
+ .columnLength(1L)
+ .comment("column comment")
+ .build())
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ null,
+ null);
+
+ List<AlterTableColumnEvent> events =
+ resolver.completionEvent(
+ Arrays.asList(changeColumnEvent),
Arrays.asList(catalogTable));
+ changeColumnEvent = (AlterTableChangeColumnEvent) events.get(0);
+ Assertions.assertEquals("mysql",
changeColumnEvent.getSourceDialectName());
+ Assertions.assertEquals(BasicType.STRING_TYPE,
changeColumnEvent.getColumn().getDataType());
+ Assertions.assertEquals(1L,
changeColumnEvent.getColumn().getColumnLength());
+ Assertions.assertEquals("column comment",
changeColumnEvent.getColumn().getComment());
+ }
+}