This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 4231798 [fix](cdc)fix the issue caused by Oracle table names
containing slash (#355)
4231798 is described below
commit 4231798656f9dc3013d41ff9cc5c4c8fe88805c4
Author: Petrichor <[email protected]>
AuthorDate: Mon Apr 8 14:11:10 2024 +0800
[fix](cdc)fix the issue caused by Oracle table names containing slash (#355)
---
.../doris/flink/tools/cdc/JdbcSourceSchema.java | 21 +++++++++++++++++++--
.../flink/tools/cdc/oracle/OracleDatabaseSync.java | 11 -----------
.../doris/flink/tools/cdc/oracle/OracleSchema.java | 15 +++++++++++++++
3 files changed, 34 insertions(+), 13 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
index 86d6336..b421aff 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
@@ -21,8 +21,10 @@ import org.apache.doris.flink.catalog.doris.FieldSchema;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
+import java.util.List;
/**
* JdbcSourceSchema is a subclass of SourceSchema, used to build metadata
about jdbc-related
@@ -38,7 +40,15 @@ public abstract class JdbcSourceSchema extends SourceSchema {
String tableComment)
throws Exception {
super(databaseName, schemaName, tableName, tableComment);
- fields = new LinkedHashMap<>();
+ fields = getColumnInfo(metaData, databaseName, schemaName, tableName);
+ primaryKeys = getPrimaryKeys(metaData, databaseName, schemaName,
tableName);
+ }
+
+ public LinkedHashMap<String, FieldSchema> getColumnInfo(
+ DatabaseMetaData metaData, String databaseName, String schemaName,
String tableName)
+ throws SQLException {
+ LinkedHashMap<String, FieldSchema> fields = new LinkedHashMap<>();
+ //
try (ResultSet rs = metaData.getColumns(databaseName, schemaName,
tableName, null)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
@@ -57,14 +67,21 @@ public abstract class JdbcSourceSchema extends SourceSchema
{
fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr,
comment));
}
}
+ return fields;
+ }
- primaryKeys = new ArrayList<>();
+ public List<String> getPrimaryKeys(
+ DatabaseMetaData metaData, String databaseName, String schemaName,
String tableName)
+ throws SQLException {
+ List<String> primaryKeys = new ArrayList<>();
try (ResultSet rs = metaData.getPrimaryKeys(databaseName, schemaName,
tableName)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
primaryKeys.add(fieldName);
}
}
+
+ return primaryKeys;
}
public abstract String convertToDorisType(String fieldType, Integer
precision, Integer scale);
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index f3a8c8b..945c839 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -32,8 +32,6 @@ import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import org.apache.doris.flink.catalog.doris.DataModel;
-import org.apache.doris.flink.catalog.doris.TableSchema;
-import org.apache.doris.flink.exception.CreateTableException;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.slf4j.Logger;
@@ -120,15 +118,6 @@ public class OracleDatabaseSync extends DatabaseSync {
if (!isSyncNeeded(tableName)) {
continue;
}
- // Oracle allows table names to contain special characters
such as /, #, $,
- // etc., as in 'A/B'.
- // However, Doris does not support tables with these
characters.
- if (!tableName.matches(TableSchema.DORIS_TABLE_REGEX)) {
- throw new CreateTableException(
- String.format(
- "The table name %s is invalid. Table
names in Doris must match the regex pattern %s. Please consider renaming the
table or use the 'excluding-tables' option to filter it out.",
- tableName,
TableSchema.DORIS_TABLE_REGEX));
- }
SourceSchema sourceSchema =
new OracleSchema(
metaData, databaseName, schemaName,
tableName, tableComment);
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
index e059181..71e4477 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
@@ -17,9 +17,12 @@
package org.apache.doris.flink.tools.cdc.oracle;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.tools.cdc.JdbcSourceSchema;
import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.util.LinkedHashMap;
public class OracleSchema extends JdbcSourceSchema {
@@ -42,4 +45,16 @@ public class OracleSchema extends JdbcSourceSchema {
public String getCdcTableName() {
return schemaName + "\\." + tableName;
}
+
+ @Override
+ public LinkedHashMap<String, FieldSchema> getColumnInfo(
+ DatabaseMetaData metaData, String databaseName, String schemaName,
String tableName)
+ throws SQLException {
+ // Oracle permits table names to include special characters such as /,
+ // etc., as in 'A/B'.
+ // When attempting to fetch column information for `A/B` via JDBC,
+ // it may throw an ORA-01424 error.
+ // Hence, we substitute `/` with '_' to address the issue.
+ return super.getColumnInfo(metaData, databaseName, schemaName,
tableName.replace("/", "_"));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]