This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e4602648ae [INLONG-7882][Sort] Oracle CDC reduces the number of
session connections (#8406)
e4602648ae is described below
commit e4602648aec64de4c07c2e67da4b942f0076f169
Author: emhui <[email protected]>
AuthorDate: Wed Jul 5 10:52:33 2023 +0800
[INLONG-7882][Sort] Oracle CDC reduces the number of session connections
(#8406)
---
.../sort/cdc/oracle/source/OracleDialect.java | 4 +---
.../reader/fetch/OracleSourceFetchTaskContext.java | 10 ++++----
.../oracle/source/utils/OracleConnectionUtils.java | 5 ++--
.../sort/cdc/oracle/source/utils/OracleUtils.java | 28 +---------------------
4 files changed, 9 insertions(+), 38 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/OracleDialect.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/OracleDialect.java
index 4f06b74fb8..143311ec98 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/OracleDialect.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/OracleDialect.java
@@ -144,9 +144,7 @@ public class OracleDialect implements JdbcDataSourceDialect
{
@Override
public OracleSourceFetchTaskContext createFetchTaskContext(
SourceSplitBase sourceSplitBase, JdbcSourceConfig
taskSourceConfig) {
- final OracleConnection jdbcConnection =
- createOracleConnection(taskSourceConfig.getDbzConfiguration());
- return new OracleSourceFetchTaskContext(taskSourceConfig, this,
jdbcConnection);
+ return new OracleSourceFetchTaskContext(taskSourceConfig, this);
}
@Override
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
index 74be0e7c6a..ffe106b488 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
@@ -62,6 +62,8 @@ import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.Map;
+import static
org.apache.inlong.sort.cdc.oracle.source.utils.OracleConnectionUtils.createOracleConnection;
+
/** The context for fetch task that fetching data of snapshot split from
Oracle data source.
* Copy from com.ververica:flink-connector-oracle-cdc:2.3.0
*/
@@ -83,11 +85,9 @@ public class OracleSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
private OracleErrorHandler errorHandler;
public OracleSourceFetchTaskContext(
- JdbcSourceConfig sourceConfig,
- JdbcDataSourceDialect dataSourceDialect,
- OracleConnection connection) {
+ JdbcSourceConfig sourceConfig, JdbcDataSourceDialect
dataSourceDialect) {
super(sourceConfig, dataSourceDialect);
- this.connection = connection;
+ this.connection =
createOracleConnection(sourceConfig.getDbzConfiguration());
this.metadataProvider = new OracleEventMetadataProvider();
}
@@ -101,7 +101,7 @@ public class OracleSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
.getDbzConfiguration()
.getString(EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
sourceSplitBase.getTableSchemas().values());
- this.databaseSchema =
OracleUtils.createOracleDatabaseSchema(connectorConfig);
+ this.databaseSchema =
OracleUtils.createOracleDatabaseSchema(connectorConfig, connection);
// todo logMiner or xStream
this.offsetContext =
loadStartingOffsetState(
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/utils/OracleConnectionUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/utils/OracleConnectionUtils.java
index 41b21cb88b..860676396b 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/utils/OracleConnectionUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/utils/OracleConnectionUtils.java
@@ -42,8 +42,7 @@ import static
io.debezium.config.CommonConnectorConfig.DATABASE_CONFIG_PREFIX;
*/
public class OracleConnectionUtils {
- private static final Logger LOG = LoggerFactory.getLogger(
-
com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(OracleConnectionUtils.class);
/** Returned by column metadata in Oracle if no scale is set. */
private static final int ORACLE_UNSET_SCALE = -127;
@@ -56,7 +55,7 @@ public class OracleConnectionUtils {
Configuration configuration =
dbzConfiguration.subset(DATABASE_CONFIG_PREFIX, true);
return new OracleConnection(
configuration.isEmpty() ? dbzConfiguration : configuration,
-
com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.class::getClassLoader);
+ OracleConnectionUtils.class::getClassLoader);
}
/** Fetch current redoLog offsets in Oracle Server. */
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/utils/OracleUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/utils/OracleUtils.java
index 2844e58e6a..7c268088ac 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/utils/OracleUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/utils/OracleUtils.java
@@ -19,9 +19,7 @@ package org.apache.inlong.sort.cdc.oracle.source.utils;
import org.apache.inlong.sort.cdc.oracle.source.meta.offset.RedoLogOffset;
-import com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils;
import com.ververica.cdc.connectors.oracle.source.utils.OracleTypeUtils;
-import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
@@ -267,12 +265,9 @@ public class OracleUtils {
/** Creates a new {@link OracleDatabaseSchema} to monitor the latest
oracle database schemas. */
public static OracleDatabaseSchema createOracleDatabaseSchema(
- OracleConnectorConfig dbzOracleConfig) {
+ OracleConnectorConfig dbzOracleConfig, OracleConnection
oracleConnection) {
TopicSelector<TableId> topicSelector =
OracleTopicSelector.defaultSelector(dbzOracleConfig);
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
- OracleConnection oracleConnection =
-
OracleConnectionUtils.createOracleConnection(dbzOracleConfig.getJdbcConfig());
- // OracleConnectionUtils.createOracleConnection((Configuration)
dbzOracleConfig);
OracleValueConverters oracleValueConverters =
new OracleValueConverters(dbzOracleConfig, oracleConnection);
StreamingAdapter.TableNameCaseSensitivity tableNameCaseSensitivity =
@@ -285,27 +280,6 @@ public class OracleUtils {
tableNameCaseSensitivity);
}
- /** Creates a new {@link OracleDatabaseSchema} to monitor the latest
oracle database schemas. */
- public static OracleDatabaseSchema createOracleDatabaseSchema(
- OracleConnectorConfig dbzOracleConfig, boolean
tableIdCaseInsensitive) {
- TopicSelector<TableId> topicSelector =
OracleTopicSelector.defaultSelector(dbzOracleConfig);
- SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
- OracleConnection oracleConnection =
- OracleConnectionUtils.createOracleConnection((Configuration)
dbzOracleConfig);
- OracleValueConverters oracleValueConverters =
- new OracleValueConverters(dbzOracleConfig, oracleConnection);
- StreamingAdapter.TableNameCaseSensitivity tableNameCaseSensitivity =
- tableIdCaseInsensitive
- ? StreamingAdapter.TableNameCaseSensitivity.SENSITIVE
- :
StreamingAdapter.TableNameCaseSensitivity.INSENSITIVE;
- return new OracleDatabaseSchema(
- dbzOracleConfig,
- oracleValueConverters,
- schemaNameAdjuster,
- topicSelector,
- tableNameCaseSensitivity);
- }
-
public static RedoLogOffset getRedoLogPosition(SourceRecord dataRecord) {
return getRedoLogPosition(dataRecord.sourceOffset());
}