This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new f559ea2c [Feature](cdc) add DB2 database sync (#316)
f559ea2c is described below
commit f559ea2cda6d367260b73a209898aa4af08bf6de
Author: Petrichor <[email protected]>
AuthorDate: Fri Jul 26 10:44:26 2024 +0800
[Feature](cdc) add DB2 database sync (#316)
---
flink-doris-connector/pom.xml | 12 ++
.../jsondebezium/JsonDebeziumChangeUtils.java | 4 +
.../org/apache/doris/flink/tools/cdc/CdcTools.java | 13 ++
.../doris/flink/tools/cdc/DatabaseSyncConfig.java | 2 +
.../doris/flink/tools/cdc/SourceConnector.java | 3 +-
.../doris/flink/tools/cdc/db2/Db2DatabaseSync.java | 227 +++++++++++++++++++++
.../flink/tools/cdc/db2/Db2DateConverter.java | 133 ++++++++++++
.../{SourceConnector.java => db2/Db2Schema.java} | 32 +--
.../apache/doris/flink/tools/cdc/db2/Db2Type.java | 94 +++++++++
.../flink/tools/cdc/CdcDb2SyncDatabaseCase.java | 104 ++++++++++
.../doris/flink/tools/cdc/db2/Db2TypeTest.java | 49 +++++
11 files changed, 660 insertions(+), 13 deletions(-)
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index b6e90bea..2d7b2875 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -286,6 +286,18 @@ under the License.
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-connector-db2-cdc</artifactId>
+ <version>${flink.sql.cdc.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>flink-shaded-guava</artifactId>
+ <groupId>org.apache.flink</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-mongodb-cdc</artifactId>
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
index 571bacfb..492a7d29 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.doris.flink.tools.cdc.db2.Db2Type;
import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
import org.apache.doris.flink.tools.cdc.oracle.OracleType;
import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
@@ -84,6 +85,9 @@ public class JsonDebeziumChangeUtils {
case SQLSERVER:
dorisTypeName = SqlServerType.toDorisType(dataType, length,
scale);
break;
+ case DB2:
+ dorisTypeName = Db2Type.toDorisType(dataType, length, scale);
+ break;
default:
String errMsg = sourceConnector + " not support " + dataType +
" schema change.";
throw new UnsupportedOperationException(errMsg);
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 3ab38a19..b62f0f52 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -24,6 +24,7 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
+import org.apache.doris.flink.tools.cdc.db2.Db2DatabaseSync;
import org.apache.doris.flink.tools.cdc.mongodb.MongoDBDatabaseSync;
import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync;
@@ -61,6 +62,9 @@ public class CdcTools {
case DatabaseSyncConfig.MONGODB_SYNC_DATABASE:
createMongoDBSyncDatabase(opArgs);
break;
+ case DatabaseSyncConfig.DB2_SYNC_DATABASE:
+ createDb2SyncDatabase(opArgs);
+ break;
default:
System.out.println("Unknown operation " + operation);
System.exit(1);
@@ -112,6 +116,15 @@ public class CdcTools {
syncDatabase(params, databaseSync, mongoConfig,
SourceConnector.MONGODB);
}
+ private static void createDb2SyncDatabase(String[] opArgs) throws
Exception {
+ MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+ Preconditions.checkArgument(params.has(DatabaseSyncConfig.DB2_CONF));
+ Map<String, String> db2Map = getConfigMap(params,
DatabaseSyncConfig.DB2_CONF);
+ Configuration db2Config = Configuration.fromMap(db2Map);
+ DatabaseSync databaseSync = new Db2DatabaseSync();
+ syncDatabase(params, databaseSync, db2Config, SourceConnector.DB2);
+ }
+
private static void syncDatabase(
MultipleParameterTool params,
DatabaseSync databaseSync,
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java
index 62b77645..6c78d5cd 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java
@@ -24,12 +24,14 @@ public class DatabaseSyncConfig {
public static final String POSTGRES_SYNC_DATABASE =
"postgres-sync-database";
public static final String SQLSERVER_SYNC_DATABASE =
"sqlserver-sync-database";
public static final String MONGODB_SYNC_DATABASE = "mongodb-sync-database";
+ public static final String DB2_SYNC_DATABASE = "db2-sync-database";
public static final String MYSQL_CONF = "mysql-conf";
public static final String ORACLE_CONF = "oracle-conf";
public static final String POSTGRES_CONF = "postgres-conf";
public static final String SQLSERVER_CONF = "sqlserver-conf";
public static final String MONGODB_CONF = "mongodb-conf";
+ public static final String DB2_CONF = "db2-conf";
///////////// source-conf ////////
public static final String DATABASE_NAME = "database-name";
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java
index 27e9600e..47c8dfba 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java
@@ -22,7 +22,8 @@ public enum SourceConnector {
ORACLE("oracle"),
POSTGRES("postgres"),
SQLSERVER("sqlserver"),
- MONGODB("mongodb");
+ MONGODB("mongodb"),
+ DB2("db2");
public final String connectorName;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java
new file mode 100644
index 00000000..2dcd21a9
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.db2;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions;
+import org.apache.flink.cdc.connectors.base.options.SourceOptions;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
+import org.apache.flink.cdc.connectors.db2.Db2Source;
+import org.apache.flink.cdc.connectors.db2.source.Db2SourceBuilder;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.DebeziumSourceFunction;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.table.DebeziumOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.doris.flink.catalog.doris.DataModel;
+import org.apache.doris.flink.tools.cdc.DatabaseSync;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import
org.apache.doris.flink.tools.cdc.deserialize.DorisJsonDebeziumDeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
+import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
+import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
+import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.SERVER_TIME_ZONE;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
+
+public class Db2DatabaseSync extends DatabaseSync {
+ public static final ConfigOption<Integer> PORT =
+ ConfigOptions.key("port")
+ .intType()
+ .defaultValue(50000)
+ .withDescription("Integer port number of the DB2 database
server.");
+ private static final Logger LOG =
LoggerFactory.getLogger(Db2DatabaseSync.class);
+
+ private static final String JDBC_URL = "jdbc:db2://%s:%d/%s";
+
+ public Db2DatabaseSync() throws SQLException {
+ super();
+ }
+
+ @Override
+ public void registerDriver() throws SQLException {
+ try {
+ Class.forName("com.ibm.db2.jcc.DB2Driver");
+ LOG.info(" Loaded the JDBC driver");
+ } catch (ClassNotFoundException ex) {
+ throw new SQLException(
+ "No suitable driver found, can not found class
com.ibm.db2.jcc.DB2Driver");
+ }
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ String jdbcUrl =
+ String.format(
+ JDBC_URL,
+ config.get(JdbcSourceOptions.HOSTNAME),
+ config.get(PORT),
+ config.get(JdbcSourceOptions.DATABASE_NAME));
+ Properties pro = new Properties();
+ pro.setProperty("user", config.get(JdbcSourceOptions.USERNAME));
+ pro.setProperty("password", config.get(JdbcSourceOptions.PASSWORD));
+ return DriverManager.getConnection(jdbcUrl, pro);
+ }
+
+ @Override
+ public List<SourceSchema> getSchemaList() throws Exception {
+ String databaseName = config.get(JdbcSourceOptions.DATABASE_NAME);
+ String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME);
+ List<SourceSchema> schemaList = new ArrayList<>();
+ LOG.info("database-name {}, schema-name {}", databaseName, schemaName);
+ try (Connection conn = getConnection()) {
+ DatabaseMetaData metaData = conn.getMetaData();
+ try (ResultSet tables =
+ metaData.getTables(null, schemaName, "%", new String[]
{"TABLE"})) {
+ while (tables.next()) {
+ String tableName = tables.getString("TABLE_NAME");
+ String tableComment = tables.getString("REMARKS");
+ if (!isSyncNeeded(tableName)) {
+ continue;
+ }
+ SourceSchema sourceSchema =
+ new Db2Schema(
+ metaData, databaseName, schemaName,
tableName, tableComment);
+ sourceSchema.setModel(
+ !sourceSchema.primaryKeys.isEmpty()
+ ? DataModel.UNIQUE
+ : DataModel.DUPLICATE);
+ schemaList.add(sourceSchema);
+ }
+ }
+ }
+ return schemaList;
+ }
+
+ @Override
+ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment
env) {
+ String databaseName = config.get(JdbcSourceOptions.DATABASE_NAME);
+ String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME);
+ Preconditions.checkNotNull(databaseName, "database-name in DB2 is
required");
+ Preconditions.checkNotNull(schemaName, "schema-name in DB2 is
required");
+
+ String tableName = config.get(JdbcSourceOptions.TABLE_NAME);
+ String hostname = config.get(JdbcSourceOptions.HOSTNAME);
+ Integer port = config.get(PORT);
+ String username = config.get(JdbcSourceOptions.USERNAME);
+ String password = config.get(JdbcSourceOptions.PASSWORD);
+
+ StartupOptions startupOptions = StartupOptions.initial();
+ String startupMode = config.get(SourceOptions.SCAN_STARTUP_MODE);
+ if ("initial".equalsIgnoreCase(startupMode)) {
+ startupOptions = StartupOptions.initial();
+ } else if ("latest-offset".equalsIgnoreCase(startupMode)) {
+ startupOptions = StartupOptions.latest();
+ }
+
+ // debezium properties set
+ Properties debeziumProperties = new Properties();
+ debeziumProperties.putAll(Db2DateConverter.DEFAULT_PROPS);
+ debeziumProperties.put("decimal.handling.mode", "string");
+
+ for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
+ debeziumProperties.put(
+
key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
+ }
+ }
+
+ DebeziumDeserializationSchema<String> schema;
+ if (ignoreDefaultValue) {
+ schema = new DorisJsonDebeziumDeserializationSchema();
+ } else {
+ Map<String, Object> customConverterConfigs = new HashMap<>();
+ schema = new JsonDebeziumDeserializationSchema(false,
customConverterConfigs);
+ }
+
+ if (config.getBoolean(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED,
true)) {
+ JdbcIncrementalSource<String> db2IncrementalSource =
+ Db2SourceBuilder.Db2IncrementalSource.<String>builder()
+ .hostname(hostname)
+ .port(port)
+ .databaseList(databaseName)
+ .tableList(tableName)
+ .username(username)
+ .password(password)
+ .deserializer(schema)
+ .debeziumProperties(debeziumProperties)
+ .startupOptions(startupOptions)
+ .includeSchemaChanges(true)
+ .debeziumProperties(debeziumProperties)
+ .serverTimeZone(config.get(SERVER_TIME_ZONE))
+
.splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
+
.splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
+ .fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
+ .connectTimeout(config.get(CONNECT_TIMEOUT))
+
.connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
+ .connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
+ .distributionFactorUpper(
+
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
+ .distributionFactorLower(
+
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
+ .build();
+ return env.fromSource(
+ db2IncrementalSource, WatermarkStrategy.noWatermarks(),
"Db2IncrementalSource");
+
+ } else {
+ DebeziumSourceFunction<String> db2Source =
+ Db2Source.<String>builder()
+ .hostname(hostname)
+ .port(port)
+ .database(databaseName)
+ .tableList(tableName)
+ .username(username)
+ .password(password)
+ .debeziumProperties(debeziumProperties)
+ .startupOptions(startupOptions)
+ .deserializer(schema)
+ .build();
+ return env.addSource(db2Source, "Db2 Source");
+ }
+ }
+
+ @Override
+ public String getTableListPrefix() {
+ return config.get(JdbcSourceOptions.SCHEMA_NAME);
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DateConverter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DateConverter.java
new file mode 100644
index 00000000..9d681d8a
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DateConverter.java
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.db2;
+
+import
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+
+import io.debezium.spi.converter.CustomConverter;
+import io.debezium.spi.converter.RelationalColumn;
+import org.apache.doris.flink.tools.cdc.DatabaseSyncConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.DateTimeException;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+public class Db2DateConverter implements CustomConverter<SchemaBuilder,
RelationalColumn> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(Db2DateConverter.class);
+ private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
+ private DateTimeFormatter timestampFormatter =
DateTimeFormatter.ISO_DATE_TIME;
+ private final DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
+
+ protected static final Properties DEFAULT_PROPS = new Properties();
+
+ static {
+ DEFAULT_PROPS.setProperty(DatabaseSyncConfig.CONVERTERS,
DatabaseSyncConfig.DATE);
+ DEFAULT_PROPS.setProperty(
+ DatabaseSyncConfig.DATE_TYPE,
+ "org.apache.doris.flink.tools.cdc.db2.Db2DateConverter");
+ DEFAULT_PROPS.setProperty(
+ DatabaseSyncConfig.DATE_FORMAT_DATE,
DatabaseSyncConfig.YEAR_MONTH_DAY_FORMAT);
+ DEFAULT_PROPS.setProperty(
+ DatabaseSyncConfig.DATE_FORMAT_TIMESTAMP,
DatabaseSyncConfig.DATETIME_MICRO_FORMAT);
+ }
+
+ @Override
+ public void configure(Properties props) {
+ readProps(props, "format.date", p -> dateFormatter =
DateTimeFormatter.ofPattern(p));
+ readProps(
+ props,
+ "format.timestamp",
+ p -> timestampFormatter = DateTimeFormatter.ofPattern(p));
+ }
+
+ private void readProps(Properties properties, String settingKey,
Consumer<String> callback) {
+ String settingValue = (String) properties.get(settingKey);
+ if (settingValue == null || settingValue.isEmpty()) {
+ return;
+ }
+ try {
+ callback.accept(settingValue.trim());
+ } catch (IllegalArgumentException | DateTimeException e) {
+ LOGGER.error("setting {} is illegal:{}", settingKey, settingValue);
+ throw e;
+ }
+ }
+
+ @Override
+ public void converterFor(
+ RelationalColumn column,
+ CustomConverter.ConverterRegistration<SchemaBuilder> registration)
{
+ String sqlType = column.typeName().toUpperCase();
+ SchemaBuilder schemaBuilder = null;
+ CustomConverter.Converter converter = null;
+ if (DatabaseSyncConfig.UPPERCASE_DATE.equals(sqlType)) {
+ schemaBuilder = SchemaBuilder.string().optional();
+ converter = this::convertDate;
+ }
+ if (DatabaseSyncConfig.TIME.equals(sqlType)) {
+ schemaBuilder = SchemaBuilder.string().optional();
+ converter = this::convertTime;
+ }
+ if (DatabaseSyncConfig.TIMESTAMP.equals(sqlType)) {
+ schemaBuilder = SchemaBuilder.string().optional();
+ converter = this::convertTimestamp;
+ }
+ if (schemaBuilder != null) {
+ registration.register(schemaBuilder, converter);
+ }
+ }
+
+ private String convertDate(Object input) {
+ if (input instanceof LocalDate) {
+ return dateFormatter.format((LocalDate) input);
+ } else if (input instanceof Integer) {
+ LocalDate date = LocalDate.ofEpochDay((Integer) input);
+ return dateFormatter.format(date);
+ } else if (input instanceof Date) {
+ return dateFormatter.format(((Date) input).toLocalDate());
+ }
+ return null;
+ }
+
+ private String convertTime(Object input) {
+ if (input instanceof Time) {
+ return timeFormatter.format(((Time) input).toLocalTime());
+ }
+ return null;
+ }
+
+ private String convertTimestamp(Object input) {
+ if (input instanceof Timestamp) {
+ return timestampFormatter.format(((Timestamp)
input).toLocalDateTime());
+ } else if (input instanceof Instant) {
+ LocalDateTime ldt = LocalDateTime.ofInstant(((Instant) input),
ZoneOffset.UTC);
+ return timestampFormatter.format(ldt);
+ }
+ return null;
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java
similarity index 51%
copy from
flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java
copy to
flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java
index 27e9600e..5aaf8cea 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java
@@ -15,22 +15,30 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.flink.tools.cdc;
+package org.apache.doris.flink.tools.cdc.db2;
-public enum SourceConnector {
- MYSQL("mysql"),
- ORACLE("oracle"),
- POSTGRES("postgres"),
- SQLSERVER("sqlserver"),
- MONGODB("mongodb");
+import org.apache.doris.flink.tools.cdc.JdbcSourceSchema;
- public final String connectorName;
+import java.sql.DatabaseMetaData;
- SourceConnector(String connectorName) {
- this.connectorName = connectorName;
+public class Db2Schema extends JdbcSourceSchema {
+ public Db2Schema(
+ DatabaseMetaData metaData,
+ String databaseName,
+ String schemaName,
+ String tableName,
+ String tableComment)
+ throws Exception {
+ super(metaData, databaseName, schemaName, tableName, tableComment);
}
- public String getConnectorName() {
- return connectorName;
+ @Override
+ public String convertToDorisType(String fieldType, Integer precision,
Integer scale) {
+ return Db2Type.toDorisType(fieldType, precision, scale);
+ }
+
+ @Override
+ public String getCdcTableName() {
+ return schemaName + "\\." + tableName;
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Type.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Type.java
new file mode 100644
index 00000000..1255d1e7
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Type.java
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.db2;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.doris.flink.catalog.doris.DorisType;
+
+public class Db2Type {
+ private static final String BOOLEAN = "BOOLEAN";
+ private static final String SMALLINT = "SMALLINT";
+ private static final String INTEGER = "INTEGER";
+ private static final String INT = "INT";
+ private static final String BIGINT = "BIGINT";
+ private static final String REAL = "REAL";
+ private static final String DECFLOAT = "DECFLOAT";
+ private static final String DOUBLE = "DOUBLE";
+ private static final String DECIMAL = "DECIMAL";
+ private static final String NUMERIC = "NUMERIC";
+ private static final String DATE = "DATE";
+ private static final String TIME = "TIME";
+ private static final String TIMESTAMP = "TIMESTAMP";
+ private static final String CHARACTER = "CHARACTER";
+ private static final String CHAR = "CHAR";
+ private static final String LONG_VARCHAR = "LONG VARCHAR";
+ private static final String VARCHAR = "VARCHAR";
+ private static final String XML = "XML";
+ private static final String VARGRAPHIC = "VARGRAPHIC";
+
+ public static String toDorisType(String db2Type, Integer precision,
Integer scale) {
+ db2Type = db2Type.toUpperCase();
+ switch (db2Type) {
+ case BOOLEAN:
+ return DorisType.BOOLEAN;
+ case SMALLINT:
+ return DorisType.SMALLINT;
+ case INTEGER:
+ case INT:
+ return DorisType.INT;
+ case BIGINT:
+ return DorisType.BIGINT;
+ case REAL:
+ return DorisType.FLOAT;
+ case DOUBLE:
+ return DorisType.DOUBLE;
+ case DATE:
+ return DorisType.DATE_V2;
+ case DECFLOAT:
+ case DECIMAL:
+ case NUMERIC:
+ if (precision != null && precision > 0 && precision <= 38) {
+ if (scale != null && scale >= 0) {
+ return String.format("%s(%s,%s)",
DorisType.DECIMAL_V3, precision, scale);
+ }
+ return String.format("%s(%s,%s)", DorisType.DECIMAL_V3,
precision, 0);
+ } else {
+ return DorisType.STRING;
+ }
+ case CHARACTER:
+ case CHAR:
+ case VARCHAR:
+ case LONG_VARCHAR:
+ Preconditions.checkNotNull(precision);
+ return precision * 3 > 65533
+ ? DorisType.STRING
+ : String.format("%s(%s)", DorisType.VARCHAR, precision
* 3);
+ case TIMESTAMP:
+ return String.format(
+ "%s(%s)", DorisType.DATETIME_V2, Math.min(scale ==
null ? 0 : scale, 6));
+ case TIME:
+ case VARGRAPHIC:
+ // Currently, the Flink CDC connector does not support the XML
data type from DB2.
+ // Case XML:
+ return DorisType.STRING;
+ default:
+ throw new UnsupportedOperationException("Unsupported DB2 Type:
" + db2Type);
+ }
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
new file mode 100644
index 00000000..77b8931d
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.doris.flink.tools.cdc.db2.Db2DatabaseSync;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcDb2SyncDatabaseCase {
+
+ public static void main(String[] args) throws Exception {
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.disableOperatorChaining();
+ env.enableCheckpointing(10000);
+
+ // Map<String,String> flinkMap = new HashMap<>();
+ // flinkMap.put("execution.checkpointing.interval","10s");
+ // flinkMap.put("pipeline.operator-chaining","false");
+ // flinkMap.put("parallelism.default","1");
+
+ // Configuration configuration = Configuration.fromMap(flinkMap);
+ // env.configure(configuration);
+
+ String database = "db2_test";
+ String tablePrefix = "";
+ String tableSuffix = "";
+ Map<String, String> sourceConfig = new HashMap<>();
+ sourceConfig.put("database-name", "testdb");
+ sourceConfig.put("schema-name", "DB2INST1");
+ sourceConfig.put("hostname", "127.0.0.1");
+ sourceConfig.put("port", "50000");
+ sourceConfig.put("username", "db2inst1");
+ sourceConfig.put("password", "=doris123456");
+ //
sourceConfig.put("debezium.database.tablename.case.insensitive","false");
+ sourceConfig.put("scan.incremental.snapshot.enabled", "true");
+ // sourceConfig.put("debezium.include.schema.changes","false");
+
+ Configuration config = Configuration.fromMap(sourceConfig);
+
+ Map<String, String> sinkConfig = new HashMap<>();
+ sinkConfig.put("fenodes", "127.0.0.1:8030");
+ // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040,
10.20.30.3:8040");
+ sinkConfig.put("username", "root");
+ sinkConfig.put("password", "123456");
+ sinkConfig.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030");
+ sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
+ Configuration sinkConf = Configuration.fromMap(sinkConfig);
+
+ Map<String, String> tableConfig = new HashMap<>();
+ tableConfig.put("replication_num", "1");
+ // tableConfig.put("table-buckets",
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
+ String includingTables = "FULL_TYPES";
+ String excludingTables = null;
+ String multiToOneOrigin = null;
+ String multiToOneTarget = null;
+ boolean ignoreDefaultValue = false;
+ boolean useNewSchemaChange = true;
+ boolean singleSink = false;
+ boolean ignoreIncompatible = false;
+ DatabaseSync databaseSync = new Db2DatabaseSync();
+ databaseSync
+ .setEnv(env)
+ .setDatabase(database)
+ .setConfig(config)
+ .setTablePrefix(tablePrefix)
+ .setTableSuffix(tableSuffix)
+ .setIncludingTables(includingTables)
+ .setExcludingTables(excludingTables)
+ .setMultiToOneOrigin(multiToOneOrigin)
+ .setMultiToOneTarget(multiToOneTarget)
+ .setIgnoreDefaultValue(ignoreDefaultValue)
+ .setSinkConfig(sinkConf)
+ .setTableConfig(tableConfig)
+ .setCreateTableOnly(false)
+ .setNewSchemaChange(useNewSchemaChange)
+ .setSingleSink(singleSink)
+ .setIgnoreIncompatible(ignoreIncompatible)
+ .create();
+ databaseSync.build();
+ env.execute(String.format("DB2-Doris Database Sync: %s", database));
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/db2/Db2TypeTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/db2/Db2TypeTest.java
new file mode 100644
index 00000000..22656902
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/db2/Db2TypeTest.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.db2;
+
+import org.apache.doris.flink.catalog.doris.DorisType;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class Db2TypeTest {
+ @Test
+ public void db2FullTypeTest() {
+ assertEquals(DorisType.BOOLEAN, Db2Type.toDorisType("BOOLEAN", 1,
null));
+ assertEquals(DorisType.SMALLINT, Db2Type.toDorisType("SMALLINT", 5,
0));
+ assertEquals(DorisType.INT, Db2Type.toDorisType("INTEGER", 10, 0));
+ assertEquals(DorisType.BIGINT, Db2Type.toDorisType("BIGINT", 10, 0));
+ assertEquals(DorisType.FLOAT, Db2Type.toDorisType("REAL", 24, null));
+ assertEquals(DorisType.DOUBLE, Db2Type.toDorisType("DOUBLE", 53,
null));
+ assertEquals("DECIMALV3(34,0)", Db2Type.toDorisType("DECFLOAT", 34,
null));
+ assertEquals("DECIMALV3(31,0)", Db2Type.toDorisType("DECIMAL", 31, 0));
+ assertEquals("DECIMALV3(31,31)", Db2Type.toDorisType("DECIMAL", 31,
31));
+ assertEquals("DECIMALV3(31,0)", Db2Type.toDorisType("NUMERIC", 31, 0));
+ assertEquals("DECIMALV3(31,31)", Db2Type.toDorisType("NUMERIC", 31,
31));
+ assertEquals("VARCHAR(600)", Db2Type.toDorisType("VARCHAR", 200,
null));
+ assertEquals(DorisType.STRING, Db2Type.toDorisType("VARCHAR", 32672,
null));
+ assertEquals(DorisType.VARCHAR + "(3)", Db2Type.toDorisType("CHAR", 1,
null));
+ assertEquals(DorisType.VARCHAR + "(765)", Db2Type.toDorisType("CHAR",
255, null));
+ assertEquals(DorisType.DATETIME_V2 + "(0)",
Db2Type.toDorisType("TIMESTAMP", 26, 0));
+ assertEquals(DorisType.DATETIME_V2 + "(6)",
Db2Type.toDorisType("TIMESTAMP", 26, 6));
+ assertEquals(DorisType.DATETIME_V2 + "(6)",
Db2Type.toDorisType("TIMESTAMP", 26, 9));
+ assertEquals(DorisType.DATE_V2, Db2Type.toDorisType("DATE", 10, null));
+ assertEquals(DorisType.STRING, Db2Type.toDorisType("TIME", 8, 0));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]