This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e4602648ae [INLONG-7882][Sort] Oracle CDC reduces the number of 
session connections (#8406)
e4602648ae is described below

commit e4602648aec64de4c07c2e67da4b942f0076f169
Author: emhui <[email protected]>
AuthorDate: Wed Jul 5 10:52:33 2023 +0800

    [INLONG-7882][Sort] Oracle CDC reduces the number of session connections 
(#8406)
---
 .../sort/cdc/oracle/source/OracleDialect.java      |  4 +---
 .../reader/fetch/OracleSourceFetchTaskContext.java | 10 ++++----
 .../oracle/source/utils/OracleConnectionUtils.java |  5 ++--
 .../sort/cdc/oracle/source/utils/OracleUtils.java  | 28 +---------------------
 4 files changed, 9 insertions(+), 38 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/OracleDialect.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/OracleDialect.java
index 4f06b74fb8..143311ec98 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/OracleDialect.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/OracleDialect.java
@@ -144,9 +144,7 @@ public class OracleDialect implements JdbcDataSourceDialect 
{
     @Override
     public OracleSourceFetchTaskContext createFetchTaskContext(
             SourceSplitBase sourceSplitBase, JdbcSourceConfig 
taskSourceConfig) {
-        final OracleConnection jdbcConnection =
-                createOracleConnection(taskSourceConfig.getDbzConfiguration());
-        return new OracleSourceFetchTaskContext(taskSourceConfig, this, 
jdbcConnection);
+        return new OracleSourceFetchTaskContext(taskSourceConfig, this);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
index 74be0e7c6a..ffe106b488 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
@@ -62,6 +62,8 @@ import org.slf4j.LoggerFactory;
 import java.time.Instant;
 import java.util.Map;
 
+import static 
org.apache.inlong.sort.cdc.oracle.source.utils.OracleConnectionUtils.createOracleConnection;
+
 /** The context for fetch task that fetching data of snapshot split from 
Oracle data source.
  *  Copy from com.ververica:flink-connector-oracle-cdc:2.3.0
  */
@@ -83,11 +85,9 @@ public class OracleSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
     private OracleErrorHandler errorHandler;
 
     public OracleSourceFetchTaskContext(
-            JdbcSourceConfig sourceConfig,
-            JdbcDataSourceDialect dataSourceDialect,
-            OracleConnection connection) {
+            JdbcSourceConfig sourceConfig, JdbcDataSourceDialect 
dataSourceDialect) {
         super(sourceConfig, dataSourceDialect);
-        this.connection = connection;
+        this.connection = 
createOracleConnection(sourceConfig.getDbzConfiguration());
         this.metadataProvider = new OracleEventMetadataProvider();
     }
 
@@ -101,7 +101,7 @@ public class OracleSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
                         .getDbzConfiguration()
                         
.getString(EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
                 sourceSplitBase.getTableSchemas().values());
-        this.databaseSchema = 
OracleUtils.createOracleDatabaseSchema(connectorConfig);
+        this.databaseSchema = 
OracleUtils.createOracleDatabaseSchema(connectorConfig, connection);
         // todo logMiner or xStream
         this.offsetContext =
                 loadStartingOffsetState(
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/utils/OracleConnectionUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/utils/OracleConnectionUtils.java
index 41b21cb88b..860676396b 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/utils/OracleConnectionUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/utils/OracleConnectionUtils.java
@@ -42,8 +42,7 @@ import static 
io.debezium.config.CommonConnectorConfig.DATABASE_CONFIG_PREFIX;
  */
 public class OracleConnectionUtils {
 
-    private static final Logger LOG = LoggerFactory.getLogger(
-            
com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(OracleConnectionUtils.class);
 
     /** Returned by column metadata in Oracle if no scale is set. */
     private static final int ORACLE_UNSET_SCALE = -127;
@@ -56,7 +55,7 @@ public class OracleConnectionUtils {
         Configuration configuration = 
dbzConfiguration.subset(DATABASE_CONFIG_PREFIX, true);
         return new OracleConnection(
                 configuration.isEmpty() ? dbzConfiguration : configuration,
-                
com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.class::getClassLoader);
+                OracleConnectionUtils.class::getClassLoader);
     }
 
     /** Fetch current redoLog offsets in Oracle Server. */
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/utils/OracleUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/utils/OracleUtils.java
index 2844e58e6a..7c268088ac 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/utils/OracleUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/utils/OracleUtils.java
@@ -19,9 +19,7 @@ package org.apache.inlong.sort.cdc.oracle.source.utils;
 
 import org.apache.inlong.sort.cdc.oracle.source.meta.offset.RedoLogOffset;
 
-import com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils;
 import com.ververica.cdc.connectors.oracle.source.utils.OracleTypeUtils;
-import io.debezium.config.Configuration;
 import io.debezium.connector.oracle.OracleConnection;
 import io.debezium.connector.oracle.OracleConnectorConfig;
 import io.debezium.connector.oracle.OracleDatabaseSchema;
@@ -267,12 +265,9 @@ public class OracleUtils {
 
     /** Creates a new {@link OracleDatabaseSchema} to monitor the latest 
oracle database schemas. */
     public static OracleDatabaseSchema createOracleDatabaseSchema(
-            OracleConnectorConfig dbzOracleConfig) {
+            OracleConnectorConfig dbzOracleConfig, OracleConnection 
oracleConnection) {
         TopicSelector<TableId> topicSelector = 
OracleTopicSelector.defaultSelector(dbzOracleConfig);
         SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
-        OracleConnection oracleConnection =
-                
OracleConnectionUtils.createOracleConnection(dbzOracleConfig.getJdbcConfig());
-        // OracleConnectionUtils.createOracleConnection((Configuration) 
dbzOracleConfig);
         OracleValueConverters oracleValueConverters =
                 new OracleValueConverters(dbzOracleConfig, oracleConnection);
         StreamingAdapter.TableNameCaseSensitivity tableNameCaseSensitivity =
@@ -285,27 +280,6 @@ public class OracleUtils {
                 tableNameCaseSensitivity);
     }
 
-    /** Creates a new {@link OracleDatabaseSchema} to monitor the latest 
oracle database schemas. */
-    public static OracleDatabaseSchema createOracleDatabaseSchema(
-            OracleConnectorConfig dbzOracleConfig, boolean 
tableIdCaseInsensitive) {
-        TopicSelector<TableId> topicSelector = 
OracleTopicSelector.defaultSelector(dbzOracleConfig);
-        SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
-        OracleConnection oracleConnection =
-                OracleConnectionUtils.createOracleConnection((Configuration) 
dbzOracleConfig);
-        OracleValueConverters oracleValueConverters =
-                new OracleValueConverters(dbzOracleConfig, oracleConnection);
-        StreamingAdapter.TableNameCaseSensitivity tableNameCaseSensitivity =
-                tableIdCaseInsensitive
-                        ? StreamingAdapter.TableNameCaseSensitivity.SENSITIVE
-                        : 
StreamingAdapter.TableNameCaseSensitivity.INSENSITIVE;
-        return new OracleDatabaseSchema(
-                dbzOracleConfig,
-                oracleValueConverters,
-                schemaNameAdjuster,
-                topicSelector,
-                tableNameCaseSensitivity);
-    }
-
     public static RedoLogOffset getRedoLogPosition(SourceRecord dataRecord) {
         return getRedoLogPosition(dataRecord.sourceOffset());
     }

Reply via email to