This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 994fcb19 [cdc] fix create doris table failed when load data from DB2
(#451)
994fcb19 is described below
commit 994fcb19acc8c940441fe99c2ef4c17168a8d36e
Author: Petrichor <[email protected]>
AuthorDate: Tue Aug 6 19:18:51 2024 +0800
[cdc] fix create doris table failed when load data from DB2 (#451)
---
.../doris/flink/tools/cdc/db2/Db2Schema.java | 10 ++++++
.../flink/tools/cdc/CdcDb2SyncDatabaseCase.java | 42 +++++++++-------------
2 files changed, 27 insertions(+), 25 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java
index 5aaf8cea..c36777f3 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java
@@ -17,9 +17,12 @@
package org.apache.doris.flink.tools.cdc.db2;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.tools.cdc.JdbcSourceSchema;
import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.util.LinkedHashMap;
public class Db2Schema extends JdbcSourceSchema {
public Db2Schema(
@@ -41,4 +44,11 @@ public class Db2Schema extends JdbcSourceSchema {
public String getCdcTableName() {
return schemaName + "\\." + tableName;
}
+
+ @Override
+ public LinkedHashMap<String, FieldSchema> getColumnInfo(
+ DatabaseMetaData metaData, String databaseName, String schemaName,
String tableName)
+ throws SQLException {
+ return super.getColumnInfo(metaData, null, schemaName, tableName);
+ }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
index 77b8931d..0327079a 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
@@ -17,9 +17,12 @@
package org.apache.doris.flink.tools.cdc;
+import org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions;
+import org.apache.flink.cdc.connectors.base.options.SourceOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.doris.flink.table.DorisConfigOptions;
import org.apache.doris.flink.tools.cdc.db2.Db2DatabaseSync;
import java.util.HashMap;
@@ -35,42 +38,31 @@ public class CdcDb2SyncDatabaseCase {
env.disableOperatorChaining();
env.enableCheckpointing(10000);
- // Map<String,String> flinkMap = new HashMap<>();
- // flinkMap.put("execution.checkpointing.interval","10s");
- // flinkMap.put("pipeline.operator-chaining","false");
- // flinkMap.put("parallelism.default","1");
-
- // Configuration configuration = Configuration.fromMap(flinkMap);
- // env.configure(configuration);
-
String database = "db2_test";
String tablePrefix = "";
String tableSuffix = "";
Map<String, String> sourceConfig = new HashMap<>();
- sourceConfig.put("database-name", "testdb");
- sourceConfig.put("schema-name", "DB2INST1");
- sourceConfig.put("hostname", "127.0.0.1");
- sourceConfig.put("port", "50000");
- sourceConfig.put("username", "db2inst1");
- sourceConfig.put("password", "=doris123456");
- //
sourceConfig.put("debezium.database.tablename.case.insensitive","false");
- sourceConfig.put("scan.incremental.snapshot.enabled", "true");
- // sourceConfig.put("debezium.include.schema.changes","false");
+ sourceConfig.put(JdbcSourceOptions.DATABASE_NAME.key(), "testdb");
+ sourceConfig.put(JdbcSourceOptions.SCHEMA_NAME.key(), "DB2INST1");
+ sourceConfig.put(JdbcSourceOptions.HOSTNAME.key(), "127.0.0.1");
+ sourceConfig.put(Db2DatabaseSync.PORT.key(), "50000");
+ sourceConfig.put(JdbcSourceOptions.USERNAME.key(), "db2inst1");
+ sourceConfig.put(JdbcSourceOptions.PASSWORD.key(), "=doris123456");
+
sourceConfig.put(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.key(), "true");
Configuration config = Configuration.fromMap(sourceConfig);
Map<String, String> sinkConfig = new HashMap<>();
- sinkConfig.put("fenodes", "127.0.0.1:8030");
- // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040,
10.20.30.3:8040");
- sinkConfig.put("username", "root");
- sinkConfig.put("password", "123456");
- sinkConfig.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030");
- sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
+ sinkConfig.put(DorisConfigOptions.FENODES.key(), "127.0.0.1:8030");
+ sinkConfig.put(DorisConfigOptions.USERNAME.key(), "root");
+ sinkConfig.put(DorisConfigOptions.PASSWORD.key(), "123456");
+ sinkConfig.put(DorisConfigOptions.JDBC_URL.key(),
"jdbc:mysql://127.0.0.1:9030");
+ sinkConfig.put(DorisConfigOptions.SINK_LABEL_PREFIX.key(),
UUID.randomUUID().toString());
Configuration sinkConf = Configuration.fromMap(sinkConfig);
Map<String, String> tableConfig = new HashMap<>();
- tableConfig.put("replication_num", "1");
- // tableConfig.put("table-buckets",
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
+ tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
+ tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS,
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
String includingTables = "FULL_TYPES";
String excludingTables = null;
String multiToOneOrigin = null;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]