This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 919a91032a [Improve][Oracle-CDC] Clean unused code (#6212)
919a91032a is described below

commit 919a91032ac13222079c1f4b7df8fe5d687ef6eb
Author: hailin0 <[email protected]>
AuthorDate: Tue Jan 16 11:10:44 2024 +0800

    [Improve][Oracle-CDC] Clean unused code (#6212)
---
 .../cdc/oracle/source/OracleIncrementalSource.java | 28 +---------------------
 .../cdc/oracle/utils/OracleTypeUtils.java          | 16 -------------
 2 files changed, 1 insertion(+), 43 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
index 5c942f8b50..933c8cdc37 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
@@ -37,7 +37,6 @@ import 
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchem
 import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
 import 
org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema;
 import 
org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema;
-import 
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfigFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.offset.RedoLogOffsetFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
@@ -45,24 +44,18 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions
 import org.apache.kafka.connect.data.Struct;
 
 import com.google.auto.service.AutoService;
-import io.debezium.connector.oracle.OracleConnection;
 import io.debezium.jdbc.JdbcConnection;
-import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.ConnectTableChangeSerializer;
 import io.debezium.relational.history.TableChanges;
 import lombok.NoArgsConstructor;
 
-import java.sql.SQLException;
 import java.time.ZoneId;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleConnectionUtils.createOracleConnection;
-import static 
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleTypeUtils.convertFromTable;
-
 @NoArgsConstructor
 @AutoService(SeaTunnelSource.class)
 public class OracleIncrementalSource<T> extends IncrementalSource<T, 
JdbcSourceConfig>
@@ -116,26 +109,7 @@ public class OracleIncrementalSource<T> extends 
IncrementalSource<T, JdbcSourceC
                             config.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES));
         }
 
-        SeaTunnelDataType<SeaTunnelRow> physicalRowType;
-        if (dataType == null) {
-            OracleSourceConfig oracleSourceConfig =
-                    (OracleSourceConfig) this.configFactory.create(0);
-            TableId tableId =
-                    
this.dataSourceDialect.discoverDataCollections(oracleSourceConfig).get(0);
-            Table table;
-            try (OracleConnection oracleConnection =
-                    
createOracleConnection(oracleSourceConfig.getDbzConfiguration())) {
-                table =
-                        ((OracleDialect) dataSourceDialect)
-                                .queryTableSchema(oracleConnection, tableId)
-                                .getTable();
-            } catch (SQLException e) {
-                throw new SeaTunnelException(e);
-            }
-            physicalRowType = convertFromTable(table);
-        } else {
-            physicalRowType = dataType;
-        }
+        SeaTunnelDataType<SeaTunnelRow> physicalRowType = dataType;
         String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
         return (DebeziumDeserializationSchema<T>)
                 SeaTunnelRowDebeziumDeserializeSchema.builder()
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleTypeUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleTypeUtils.java
index 8a7ddc91d8..7e23fbf3a1 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleTypeUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleTypeUtils.java
@@ -22,14 +22,11 @@ import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
 import io.debezium.relational.Column;
-import io.debezium.relational.Table;
 import oracle.jdbc.OracleTypes;
 
 import java.sql.Types;
-import java.util.List;
 
 /** Utilities for converting from oracle types to SeaTunnel types. */
 public class OracleTypeUtils {
@@ -78,17 +75,4 @@ public class OracleTypeUtils {
                                 column.typeName(), column.jdbcType()));
         }
     }
-
-    public static SeaTunnelRowType convertFromTable(Table table) {
-
-        List<Column> columns = table.columns();
-        String[] fieldNames = 
columns.stream().map(Column::name).toArray(String[]::new);
-
-        SeaTunnelDataType<?>[] fieldTypes =
-                columns.stream()
-                        .map(OracleTypeUtils::convertFromColumn)
-                        .toArray(SeaTunnelDataType[]::new);
-
-        return new SeaTunnelRowType(fieldNames, fieldTypes);
-    }
 }

Reply via email to