This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new d7b20a4e [Fix] Fix Oracle cdb and pdb model unable to create tables
(#423)
d7b20a4e is described below
commit d7b20a4e0102983e7c2cca6e1a2ed5fb1ba9b0bc
Author: wudi <[email protected]>
AuthorDate: Thu Jul 11 17:26:09 2024 +0800
[Fix] Fix Oracle cdb and pdb model unable to create tables (#423)
---
.../flink/tools/cdc/oracle/OracleDatabaseSync.java | 21 +++++++++++++++++++++
1 file changed, 21 insertions(+)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index 360351e4..beb6a677 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -44,6 +44,7 @@ import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -63,6 +64,7 @@ public class OracleDatabaseSync extends DatabaseSync {
private static final Logger LOG =
LoggerFactory.getLogger(OracleDatabaseSync.class);
private static final String JDBC_URL = "jdbc:oracle:thin:@%s:%d:%s";
+ private static final String PDB_KEY = "debezium.database.pdb.name";
public OracleDatabaseSync() throws SQLException {
super();
@@ -108,9 +110,11 @@ public class OracleDatabaseSync extends DatabaseSync {
public List<SourceSchema> getSchemaList() throws Exception {
String databaseName = config.get(OracleSourceOptions.DATABASE_NAME);
String schemaName = config.get(OracleSourceOptions.SCHEMA_NAME);
+
List<SourceSchema> schemaList = new ArrayList<>();
LOG.info("database-name {}, schema-name {}", databaseName, schemaName);
try (Connection conn = getConnection()) {
+ setSessionToPdb(conn);
DatabaseMetaData metaData = conn.getMetaData();
try (ResultSet tables =
metaData.getTables(databaseName, schemaName, "%", new
String[] {"TABLE"})) {
@@ -134,6 +138,23 @@ public class OracleDatabaseSync extends DatabaseSync {
return schemaList;
}
+ private void setSessionToPdb(Connection conn) throws SQLException {
+ String pdbName = null;
+ for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+ String key = entry.getKey();
+ if (key.equals(PDB_KEY)) {
+ pdbName = entry.getValue();
+ break;
+ }
+ }
+ if (!StringUtils.isNullOrWhitespaceOnly(pdbName)) {
+ LOG.info("Found pdb name in config, set session to pdb to {}",
pdbName);
+ try (Statement statement = conn.createStatement()) {
+ statement.execute("alter session set container=" + pdbName);
+ }
+ }
+ }
+
@Override
public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment
env) {
Properties debeziumProperties = new Properties();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]