This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 11a23af64c [Improve][Oracle-CDC] Fix oracle rename ddl event missing 
column type (#9314)
11a23af64c is described below

commit 11a23af64ca224e071211e59c7bc32cbc6539603
Author: hailin0 <[email protected]>
AuthorDate: Sat May 17 09:36:21 2025 +0800

    [Improve][Oracle-CDC] Fix oracle rename ddl event missing column type 
(#9314)
---
 .../base/schema/AbstractSchemaChangeResolver.java  | 63 +++++++++++++-
 .../cdc/base/schema/SchemaChangeResolver.java      |  5 +-
 .../row/SeaTunnelRowDebeziumDeserializeSchema.java |  2 +-
 .../schema/AbstractSchemaChangeResolverTest.java   | 95 ++++++++++++++++++++++
 4 files changed, 160 insertions(+), 5 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolver.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolver.java
index 8401f86236..e3daee5a0f 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolver.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolver.java
@@ -19,12 +19,14 @@ package org.apache.seatunnel.connectors.cdc.base.schema;
 
 import org.apache.seatunnel.shade.com.google.common.collect.Lists;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
 import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
 import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
 import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
 
@@ -40,6 +42,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 @Slf4j
 public abstract class AbstractSchemaChangeResolver implements 
SchemaChangeResolver {
@@ -71,7 +74,7 @@ public abstract class AbstractSchemaChangeResolver implements 
SchemaChangeResolv
     }
 
     @Override
-    public SchemaChangeEvent resolve(SourceRecord record, SeaTunnelDataType 
dataType) {
+    public SchemaChangeEvent resolve(SourceRecord record, List<CatalogTable> 
catalogTables) {
         TablePath tablePath = SourceRecordUtils.getTablePath(record);
         String ddl = SourceRecordUtils.getDdl(record);
         if (Objects.isNull(ddlParser)) {
@@ -85,6 +88,7 @@ public abstract class AbstractSchemaChangeResolver implements 
SchemaChangeResolv
         // Parse DDL statement using Debezium's Antlr parser
         ddlParser.parse(ddl, tables);
         List<AlterTableColumnEvent> parsedEvents = getAndClearParsedEvents();
+        parsedEvents = completionEvent(parsedEvents, catalogTables);
         parsedEvents.forEach(e -> 
e.setSourceDialectName(getSourceDialectName()));
         AlterTableColumnsEvent alterTableColumnsEvent =
                 new AlterTableColumnsEvent(
@@ -99,6 +103,61 @@ public abstract class AbstractSchemaChangeResolver 
implements SchemaChangeResolv
         return parsedEvents.isEmpty() ? null : alterTableColumnsEvent;
     }
 
+    List<AlterTableColumnEvent> completionEvent(
+            List<AlterTableColumnEvent> events, List<CatalogTable> 
catalogTables) {
+        return events.stream()
+                .map(
+                        columnEvent -> {
+                            
columnEvent.setSourceDialectName(getSourceDialectName());
+                            if (catalogTables == null || 
catalogTables.isEmpty()) {
+                                return columnEvent;
+                            }
+                            if (!(columnEvent instanceof 
AlterTableChangeColumnEvent)) {
+                                return columnEvent;
+                            }
+
+                            AlterTableChangeColumnEvent changeColumnEvent =
+                                    (AlterTableChangeColumnEvent) columnEvent;
+                            if (changeColumnEvent.getColumn().getDataType() != 
null) {
+                                return columnEvent;
+                            }
+                            CatalogTable table =
+                                    catalogTables.stream()
+                                            .filter(
+                                                    catalogTable ->
+                                                            catalogTable
+                                                                    
.getTablePath()
+                                                                    .equals(
+                                                                            
columnEvent
+                                                                               
     .getTablePath()))
+                                            .findFirst()
+                                            .orElse(null);
+                            if (table != null) {
+                                Column oldColumn =
+                                        table.getTableSchema()
+                                                
.getColumn(changeColumnEvent.getOldColumn());
+                                Column newColumn =
+                                        
oldColumn.rename(changeColumnEvent.getColumn().getName());
+                                AlterTableChangeColumnEvent newEvent =
+                                        new AlterTableChangeColumnEvent(
+                                                
changeColumnEvent.getTableIdentifier(),
+                                                
changeColumnEvent.getOldColumn(),
+                                                newColumn,
+                                                changeColumnEvent.isFirst(),
+                                                
changeColumnEvent.getAfterColumn());
+                                
newEvent.setSourceDialectName(getSourceDialectName());
+                                return newEvent;
+                            } else {
+                                log.warn(
+                                        "Ignoring rename column {} type 
completion for table {}",
+                                        changeColumnEvent.getOldColumn(),
+                                        changeColumnEvent.getTablePath());
+                            }
+                            return columnEvent;
+                        })
+                .collect(Collectors.toList());
+    }
+
     protected abstract DdlParser createDdlParser(TablePath tablePath);
 
     protected abstract List<AlterTableColumnEvent> getAndClearParsedEvents();
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/SchemaChangeResolver.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/SchemaChangeResolver.java
index c4d403b8d2..a391358fd9 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/SchemaChangeResolver.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/schema/SchemaChangeResolver.java
@@ -17,16 +17,17 @@
 
 package org.apache.seatunnel.connectors.cdc.base.schema;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 
 import org.apache.kafka.connect.source.SourceRecord;
 
 import java.io.Serializable;
+import java.util.List;
 
 public interface SchemaChangeResolver extends Serializable {
 
     boolean support(SourceRecord record);
 
-    SchemaChangeEvent resolve(SourceRecord record, SeaTunnelDataType dataType);
+    SchemaChangeEvent resolve(SourceRecord record, List<CatalogTable> 
catalogTables);
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
index e568154ba6..920731d489 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
@@ -126,7 +126,7 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
         SchemaChangeEvent schemaChangeEvent = null;
         try {
             if (schemaChangeResolver != null) {
-                schemaChangeEvent = schemaChangeResolver.resolve(record, null);
+                schemaChangeEvent = schemaChangeResolver.resolve(record, 
tables);
             }
         } catch (Exception e) {
             log.warn("Failed to resolve schemaChangeEvent, just skip.", e);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolverTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolverTest.java
new file mode 100644
index 0000000000..1565a25eb9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/schema/AbstractSchemaChangeResolverTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.schema;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.relational.ddl.DdlParser;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+
+public class AbstractSchemaChangeResolverTest {
+
+    @Test
+    void testCompletionEvent() {
+        JdbcSourceConfig config = mock(JdbcSourceConfig.class);
+        AbstractSchemaChangeResolver resolver =
+                new AbstractSchemaChangeResolver(config) {
+                    @Override
+                    protected DdlParser createDdlParser(TablePath tablePath) {
+                        return null;
+                    }
+
+                    @Override
+                    protected List<AlterTableColumnEvent> 
getAndClearParsedEvents() {
+                        return Collections.emptyList();
+                    }
+
+                    @Override
+                    protected String getSourceDialectName() {
+                        return "mysql";
+                    }
+                };
+
+        AlterTableChangeColumnEvent changeColumnEvent =
+                AlterTableChangeColumnEvent.change(
+                        TableIdentifier.of(null, "test_db", "test_table"),
+                        "old_column",
+                        PhysicalColumn.builder().name("new_column").build());
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        TableIdentifier.of(null, "test_db", "test_table"),
+                        TableSchema.builder()
+                                .column(
+                                        PhysicalColumn.builder()
+                                                .name("old_column")
+                                                
.dataType(BasicType.STRING_TYPE)
+                                                .columnLength(1L)
+                                                .comment("column comment")
+                                                .build())
+                                .build(),
+                        Collections.emptyMap(),
+                        Collections.emptyList(),
+                        null,
+                        null);
+
+        List<AlterTableColumnEvent> events =
+                resolver.completionEvent(
+                        Arrays.asList(changeColumnEvent), 
Arrays.asList(catalogTable));
+        changeColumnEvent = (AlterTableChangeColumnEvent) events.get(0);
+        Assertions.assertEquals("mysql", 
changeColumnEvent.getSourceDialectName());
+        Assertions.assertEquals(BasicType.STRING_TYPE, 
changeColumnEvent.getColumn().getDataType());
+        Assertions.assertEquals(1L, 
changeColumnEvent.getColumn().getColumnLength());
+        Assertions.assertEquals("column comment", 
changeColumnEvent.getColumn().getComment());
+    }
+}

Reply via email to