This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 67ca9401 [Improve] Update flink cdc version to 3.1.1 (#438)
67ca9401 is described below
commit 67ca9401ea8e991cd09cd98c9237c06c93305790
Author: wudi <[email protected]>
AuthorDate: Wed Jul 24 10:10:23 2024 +0800
[Improve] Update flink cdc version to 3.1.1 (#438)
---
flink-doris-connector/pom.xml | 28 +++++++++++----
.../DorisJsonDebeziumDeserializationSchema.java | 12 +++----
.../tools/cdc/mongodb/MongoDBDatabaseSync.java | 16 ++++-----
.../tools/cdc/mysql/DateToStringConverter.java | 3 +-
.../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 20 +++++------
.../flink/tools/cdc/oracle/OracleDatabaseSync.java | 34 +++++++++---------
.../tools/cdc/oracle/OracleDateConverter.java | 3 +-
.../tools/cdc/postgres/PostgresDatabaseSync.java | 42 +++++++++++-----------
.../tools/cdc/postgres/PostgresDateConverter.java | 3 +-
.../tools/cdc/sqlserver/SqlServerDatabaseSync.java | 38 ++++++++++----------
.../cdc/sqlserver/SqlServerDateConverter.java | 3 +-
.../apache/doris/flink/CDCSchemaChangeExample.java | 6 ++--
.../doris/flink/utils/DateToStringConverter.java | 3 +-
13 files changed, 116 insertions(+), 95 deletions(-)
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 052180c4..b6e90bea 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -70,7 +70,7 @@ under the License.
<revision>1.6.2-SNAPSHOT</revision>
<flink.version>1.18.0</flink.version>
<flink.major.version>1.18</flink.major.version>
- <flink.sql.cdc.version>2.4.2</flink.sql.cdc.version>
+ <flink.sql.cdc.version>3.1.1</flink.sql.cdc.version>
<flink.python.id>flink-python</flink.python.id>
<libthrift.version>0.16.0</libthrift.version>
<arrow.version>13.0.0</arrow.version>
@@ -93,6 +93,8 @@ under the License.
<junit.version>4.12</junit.version>
<hamcrest.version>1.3</hamcrest.version>
<jsqlparser.version>4.9</jsqlparser.version>
+ <mysql.driver.version>8.0.26</mysql.driver.version>
+ <ojdbc.version>19.3.0.0</ojdbc.version>
</properties>
<dependencies>
@@ -237,7 +239,7 @@ under the License.
</dependency>
<!-- use cdc bundled jar for kafka connect class-->
<dependency>
- <groupId>com.ververica</groupId>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>${flink.sql.cdc.version}</version>
<scope>provided</scope>
@@ -249,7 +251,7 @@ under the License.
</exclusions>
</dependency>
<dependency>
- <groupId>com.ververica</groupId>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<version>${flink.sql.cdc.version}</version>
<scope>provided</scope>
@@ -261,7 +263,7 @@ under the License.
</exclusions>
</dependency>
<dependency>
- <groupId>com.ververica</groupId>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-postgres-cdc</artifactId>
<version>${flink.sql.cdc.version}</version>
<scope>provided</scope>
@@ -273,7 +275,7 @@ under the License.
</exclusions>
</dependency>
<dependency>
- <groupId>com.ververica</groupId>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
<version>${flink.sql.cdc.version}</version>
<scope>provided</scope>
@@ -285,7 +287,7 @@ under the License.
</exclusions>
</dependency>
<dependency>
- <groupId>com.ververica</groupId>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-mongodb-cdc</artifactId>
<!-- the dependency is available only for stable releases. -->
<version>${flink.sql.cdc.version}</version>
@@ -297,6 +299,20 @@ under the License.
</exclusion>
</exclusions>
</dependency>
+ <!-- add drivers for FLINK-35055 -->
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>${mysql.driver.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.oracle.ojdbc</groupId>
+ <artifactId>ojdbc8</artifactId>
+ <version>${ojdbc.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- flink cdc end -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/deserialize/DorisJsonDebeziumDeserializationSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/deserialize/DorisJsonDebeziumDeserializationSchema.java
index d1d91545..b7e4575c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/deserialize/DorisJsonDebeziumDeserializationSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/deserialize/DorisJsonDebeziumDeserializationSchema.java
@@ -19,6 +19,12 @@ package org.apache.doris.flink.tools.cdc.deserialize;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema;
+import
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
+import
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
+import
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
+import
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.util.Collector;
import com.fasterxml.jackson.databind.JsonNode;
@@ -26,12 +32,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema;
-import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
-import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
-import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
-import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
-import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.doris.flink.exception.DorisException;
import java.math.BigDecimal;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
index fe7f33d0..17138c84 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
@@ -18,6 +18,13 @@
package org.apache.doris.flink.tools.cdc.mongodb;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.connectors.base.options.SourceOptions;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
+import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceBuilder;
+import
org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
+import
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -30,13 +37,6 @@ import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoIterable;
-import com.ververica.cdc.connectors.base.options.SourceOptions;
-import com.ververica.cdc.connectors.base.options.StartupOptions;
-import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
-import com.ververica.cdc.connectors.mongodb.source.MongoDBSourceBuilder;
-import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
-import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
-import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
@@ -57,7 +57,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static
com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue;
+import static
org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue;
import static org.apache.flink.util.Preconditions.checkNotNull;
public class MongoDBDatabaseSync extends DatabaseSync {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/DateToStringConverter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/DateToStringConverter.java
index 2080d023..979ce903 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/DateToStringConverter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/DateToStringConverter.java
@@ -17,7 +17,8 @@
package org.apache.doris.flink.tools.cdc.mysql;
-import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+import
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.slf4j.Logger;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index 63522472..c0a2f345 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -18,22 +18,22 @@
package org.apache.doris.flink.tools.cdc.mysql;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
+import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceBuilder;
+import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
+import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
+import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
+import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+import
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
-import com.ververica.cdc.connectors.mysql.source.MySqlSource;
-import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
-import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
-import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
-import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
-import com.ververica.cdc.connectors.mysql.table.StartupOptions;
-import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
-import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
-import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
-import com.ververica.cdc.debezium.table.DebeziumOptions;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceSchema;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index beb6a677..78411681 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -18,20 +18,20 @@
package org.apache.doris.flink.tools.cdc.oracle;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
+import org.apache.flink.cdc.connectors.oracle.OracleSource;
+import org.apache.flink.cdc.connectors.oracle.source.OracleSourceBuilder;
+import
org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceOptions;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.DebeziumSourceFunction;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
-import com.ververica.cdc.connectors.base.options.StartupOptions;
-import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
-import com.ververica.cdc.connectors.oracle.OracleSource;
-import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;
-import com.ververica.cdc.connectors.oracle.source.config.OracleSourceOptions;
-import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
-import com.ververica.cdc.debezium.DebeziumSourceFunction;
-import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
-import com.ververica.cdc.debezium.table.DebeziumOptions;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceSchema;
@@ -51,14 +51,14 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
-import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
-import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
-import static
com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
-import static
com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
-import static
com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
-import static
com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
-import static
com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
+import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
+import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
+import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
public class OracleDatabaseSync extends DatabaseSync {
private static final Logger LOG =
LoggerFactory.getLogger(OracleDatabaseSync.class);
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDateConverter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDateConverter.java
index b847476e..dd2afc13 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDateConverter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDateConverter.java
@@ -17,7 +17,8 @@
package org.apache.doris.flink.tools.cdc.oracle;
-import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+import
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import oracle.sql.TIMESTAMP;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
index 74390325..65981138 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
@@ -18,20 +18,20 @@
package org.apache.doris.flink.tools.cdc.postgres;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.connectors.base.options.SourceOptions;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
+import org.apache.flink.cdc.connectors.postgres.PostgreSQLSource;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder;
+import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.DebeziumSourceFunction;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
-import com.ververica.cdc.connectors.base.options.SourceOptions;
-import com.ververica.cdc.connectors.base.options.StartupOptions;
-import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
-import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
-import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
-import
com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
-import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
-import com.ververica.cdc.debezium.DebeziumSourceFunction;
-import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
-import com.ververica.cdc.debezium.table.DebeziumOptions;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceSchema;
@@ -50,17 +50,17 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
-import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
-import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
-import static
com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
-import static
com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
-import static
com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
-import static
com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
-import static
com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
-import static
com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.DECODING_PLUGIN_NAME;
-import static
com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL;
-import static
com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.SLOT_NAME;
+import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
+import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
+import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
+import static
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.DECODING_PLUGIN_NAME;
+import static
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL;
+import static
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SLOT_NAME;
public class PostgresDatabaseSync extends DatabaseSync {
private static final Logger LOG =
LoggerFactory.getLogger(PostgresDatabaseSync.class);
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDateConverter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDateConverter.java
index 68562384..717ae88d 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDateConverter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDateConverter.java
@@ -17,7 +17,8 @@
package org.apache.doris.flink.tools.cdc.postgres;
-import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+import
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.slf4j.Logger;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
index 3f674fb2..577c8530 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
@@ -18,21 +18,21 @@
package org.apache.doris.flink.tools.cdc.sqlserver;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions;
+import org.apache.flink.cdc.connectors.base.options.SourceOptions;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
+import
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.flink.cdc.connectors.sqlserver.SqlServerSource;
+import org.apache.flink.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.DebeziumSourceFunction;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
-import com.ververica.cdc.connectors.base.options.JdbcSourceOptions;
-import com.ververica.cdc.connectors.base.options.SourceOptions;
-import com.ververica.cdc.connectors.base.options.StartupOptions;
-import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
-import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
-import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
-import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
-import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
-import com.ververica.cdc.debezium.DebeziumSourceFunction;
-import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
-import com.ververica.cdc.debezium.table.DebeziumOptions;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceSchema;
@@ -51,14 +51,14 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
-import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
-import static
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
-import static
com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
-import static
com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
-import static
com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
-import static
com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
-import static
com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
+import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
+import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
+import static
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
public class SqlServerDatabaseSync extends DatabaseSync {
private static final Logger LOG =
LoggerFactory.getLogger(SqlServerDatabaseSync.class);
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java
index abab9f60..0f604347 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java
@@ -17,7 +17,8 @@
package org.apache.doris.flink.tools.cdc.sqlserver;
-import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+import
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.slf4j.Logger;
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
index 74e635ba..01c33833 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
@@ -18,11 +18,11 @@
package org.apache.doris.flink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
+import
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import com.ververica.cdc.connectors.mysql.source.MySqlSource;
-import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
-import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java
index 342e8d16..a6322893 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java
@@ -17,7 +17,8 @@
package org.apache.doris.flink.utils;
-import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+import
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.slf4j.Logger;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]