This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 356538de8 [Improve][CDC] Add mysql-cdc source factory (#3791)
356538de8 is described below

commit 356538de8ade3fd2103377cbe8d0b5b366ff6d8a
Author: hailin0 <[email protected]>
AuthorDate: Fri Dec 23 16:17:27 2022 +0800

    [Improve][CDC] Add mysql-cdc source factory (#3791)
    
    * [Improve][CDC] Add mysql-cdc source factory
    
    * update CONNECT_TIMEOUT_MS options data type
---
 .../cdc/base/config/JdbcSourceConfig.java          | 11 ++---
 .../cdc/base/config/JdbcSourceConfigFactory.java   |  9 ++--
 .../cdc/base/option/JdbcSourceOptions.java         | 10 ++--
 .../connection/JdbcConnectionPoolFactory.java      |  2 +-
 .../cdc/mysql/config/MySqlSourceConfig.java        |  5 +-
 .../cdc/mysql/config/MySqlSourceConfigFactory.java |  4 +-
 .../cdc/mysql/source/MySqlIncrementalSource.java   |  4 +-
 .../source/MySqlIncrementalSourceFactory.java      | 54 ++++++++++++++++++++++
 .../source/MySqlIncrementalSourceFactoryTest.java  | 28 +++++++++++
 .../source/config/SqlServerSourceConfig.java       |  5 +-
 .../config/SqlServerSourceConfigFactory.java       |  2 +-
 11 files changed, 106 insertions(+), 28 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
index 72a7434db..c2a602540 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
@@ -21,7 +21,6 @@ import 
org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
 
 import io.debezium.relational.RelationalDatabaseConnectorConfig;
 
-import java.time.Duration;
 import java.util.List;
 import java.util.Properties;
 
@@ -39,7 +38,7 @@ public abstract class JdbcSourceConfig extends 
BaseSourceConfig {
     protected final List<String> tableList;
     protected final int fetchSize;
     protected final String serverTimeZone;
-    protected final Duration connectTimeout;
+    protected final long connectTimeoutMillis;
     protected final int connectMaxRetries;
     protected final int connectionPoolSize;
 
@@ -59,7 +58,7 @@ public abstract class JdbcSourceConfig extends 
BaseSourceConfig {
             String password,
             int fetchSize,
             String serverTimeZone,
-            Duration connectTimeout,
+            long connectTimeoutMillis,
             int connectMaxRetries,
             int connectionPoolSize) {
         super(
@@ -78,7 +77,7 @@ public abstract class JdbcSourceConfig extends 
BaseSourceConfig {
         this.tableList = tableList;
         this.fetchSize = fetchSize;
         this.serverTimeZone = serverTimeZone;
-        this.connectTimeout = connectTimeout;
+        this.connectTimeoutMillis = connectTimeoutMillis;
         this.connectMaxRetries = connectMaxRetries;
         this.connectionPoolSize = connectionPoolSize;
     }
@@ -121,8 +120,8 @@ public abstract class JdbcSourceConfig extends 
BaseSourceConfig {
         return serverTimeZone;
     }
 
-    public Duration getConnectTimeout() {
-        return connectTimeout;
+    public long getConnectTimeoutMillis() {
+        return connectTimeoutMillis;
     }
 
     public int getConnectMaxRetries() {
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
index 8263863f2..8b764bac4 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
 
-import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -47,7 +46,7 @@ public abstract class JdbcSourceConfigFactory implements 
SourceConfig.Factory<Jd
     protected int splitSize = SourceOptions.SNAPSHOT_SPLIT_SIZE.defaultValue();
     protected int fetchSize = SourceOptions.SNAPSHOT_FETCH_SIZE.defaultValue();
     protected String serverTimeZone = 
JdbcSourceOptions.SERVER_TIME_ZONE.defaultValue();
-    protected Duration connectTimeout = 
JdbcSourceOptions.CONNECT_TIMEOUT.defaultValue();
+    protected long connectTimeoutMillis = 
JdbcSourceOptions.CONNECT_TIMEOUT_MS.defaultValue();
     protected int connectMaxRetries = 
JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue();
     protected int connectionPoolSize = 
JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue();
     protected Properties dbzProperties;
@@ -144,8 +143,8 @@ public abstract class JdbcSourceConfigFactory implements 
SourceConfig.Factory<Jd
      * The maximum time that the connector should wait after trying to connect 
to the database
      * server before timing out.
      */
-    public JdbcSourceConfigFactory connectTimeout(Duration connectTimeout) {
-        this.connectTimeout = connectTimeout;
+    public JdbcSourceConfigFactory connectTimeoutMillis(long 
connectTimeoutMillis) {
+        this.connectTimeoutMillis = connectTimeoutMillis;
         return this;
     }
 
@@ -199,7 +198,7 @@ public abstract class JdbcSourceConfigFactory implements 
SourceConfig.Factory<Jd
         this.splitSize = config.get(SourceOptions.SNAPSHOT_SPLIT_SIZE);
         this.fetchSize = config.get(SourceOptions.SNAPSHOT_FETCH_SIZE);
         this.serverTimeZone = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
-        this.connectTimeout = config.get(JdbcSourceOptions.CONNECT_TIMEOUT);
+        this.connectTimeoutMillis = 
config.get(JdbcSourceOptions.CONNECT_TIMEOUT_MS);
         this.connectMaxRetries = 
config.get(JdbcSourceOptions.CONNECT_MAX_RETRIES);
         this.connectionPoolSize = 
config.get(JdbcSourceOptions.CONNECTION_POOL_SIZE);
         this.dbzProperties = new Properties();
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
index dec804e01..4839ecefe 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
@@ -21,8 +21,6 @@ import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
 
-import java.time.Duration;
-
 /** Configurations for {@link IncrementalSource} of JDBC data source. */
 @SuppressWarnings("checkstyle:MagicNumber")
 public class JdbcSourceOptions extends SourceOptions {
@@ -83,10 +81,10 @@ public class JdbcSourceOptions extends SourceOptions {
                                     + "so it can read the binlog. By default, 
a random number is generated between"
                                     + " 5400 and 6400, though we recommend 
setting an explicit value.");
 
-    public static final Option<Duration> CONNECT_TIMEOUT =
-            Options.key("connect.timeout")
-                    .durationType()
-                    .defaultValue(Duration.ofSeconds(30))
+    public static final Option<Long> CONNECT_TIMEOUT_MS =
+            Options.key("connect.timeout.ms")
+                    .longType()
+                    .defaultValue(30000L)
                     .withDescription(
                             "The maximum time that the connector should wait 
after trying to connect to the database server before timing out.");
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPoolFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPoolFactory.java
index 71556373a..63740a623 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPoolFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPoolFactory.java
@@ -41,7 +41,7 @@ public abstract class JdbcConnectionPoolFactory {
         config.setPassword(sourceConfig.getPassword());
         config.setMinimumIdle(MINIMUM_POOL_SIZE);
         config.setMaximumPoolSize(sourceConfig.getConnectionPoolSize());
-        
config.setConnectionTimeout(sourceConfig.getConnectTimeout().toMillis());
+        config.setConnectionTimeout(sourceConfig.getConnectTimeoutMillis());
         config.addDataSourceProperty(SERVER_TIMEZONE_KEY, 
sourceConfig.getServerTimeZone());
         config.setDriverClassName(sourceConfig.getDriverClassName());
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
index 653a3fa0a..5177398b9 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
@@ -24,7 +24,6 @@ import 
org.apache.seatunnel.connectors.cdc.base.config.StopConfig;
 import io.debezium.connector.mysql.MySqlConnectorConfig;
 import io.debezium.relational.RelationalTableFilters;
 
-import java.time.Duration;
 import java.util.List;
 import java.util.Properties;
 
@@ -52,7 +51,7 @@ public class MySqlSourceConfig extends JdbcSourceConfig {
             String password,
             int fetchSize,
             String serverTimeZone,
-            Duration connectTimeout,
+            long connectTimeoutMillis,
             int connectMaxRetries,
             int connectionPoolSize) {
         super(
@@ -71,7 +70,7 @@ public class MySqlSourceConfig extends JdbcSourceConfig {
                 password,
                 fetchSize,
                 serverTimeZone,
-                connectTimeout,
+                connectTimeoutMillis,
                 connectMaxRetries,
                 connectionPoolSize);
     }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
index 1f67a6f56..5fd9ea5bb 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
@@ -71,7 +71,7 @@ public class MySqlSourceConfigFactory extends 
JdbcSourceConfigFactory {
         props.setProperty("database.history.skip.unparseable.ddl", 
String.valueOf(true));
         props.setProperty("database.history.refer.ddl", String.valueOf(true));
 
-        props.setProperty("connect.timeout.ms", 
String.valueOf(connectTimeout.toMillis()));
+        props.setProperty("connect.timeout.ms", 
String.valueOf(connectTimeoutMillis));
         // the underlying debezium reader should always capture the schema 
changes and forward them.
         // Note: the includeSchemaChanges parameter is used to control 
emitting the schema record,
         // only DataStream API program need to emit the schema record, the 
Table API need not
@@ -125,7 +125,7 @@ public class MySqlSourceConfigFactory extends 
JdbcSourceConfigFactory {
                 password,
                 fetchSize,
                 serverTimeZone,
-                connectTimeout,
+                connectTimeoutMillis,
                 connectMaxRetries,
                 connectionPoolSize);
     }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index bac7186bf..b473c97a7 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -42,9 +42,11 @@ import java.time.ZoneId;
 
 @AutoService(SeaTunnelSource.class)
 public class MySqlIncrementalSource<T> extends IncrementalSource<T, 
JdbcSourceConfig> {
+    static final String IDENTIFIER = "MySQL-CDC";
+
     @Override
     public String getPluginName() {
-        return "MySQL-CDC";
+        return IDENTIFIER;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
new file mode 100644
index 000000000..3ecb0d58c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class MySqlIncrementalSourceFactory implements TableSourceFactory {
+    @Override
+    public String factoryIdentifier() {
+        return MySqlIncrementalSource.IDENTIFIER;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return JdbcSourceOptions.BASE_RULE
+            .required(
+                JdbcSourceOptions.HOSTNAME,
+                JdbcSourceOptions.USERNAME,
+                JdbcSourceOptions.PASSWORD,
+                JdbcSourceOptions.DATABASE_NAME,
+                JdbcSourceOptions.TABLE_NAME,
+                JdbcCatalogOptions.BASE_URL)
+            .optional(
+                JdbcSourceOptions.PORT,
+                JdbcSourceOptions.SERVER_ID,
+                JdbcSourceOptions.SERVER_TIME_ZONE,
+                JdbcSourceOptions.CONNECT_TIMEOUT_MS,
+                JdbcSourceOptions.CONNECT_MAX_RETRIES,
+                JdbcSourceOptions.CONNECTION_POOL_SIZE)
+            .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactoryTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactoryTest.java
new file mode 100644
index 000000000..37ab496fb
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactoryTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class MySqlIncrementalSourceFactoryTest {
+    @Test
+    public void testOptionRule() {
+        Assertions.assertNotNull((new 
MySqlIncrementalSourceFactory()).optionRule());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfig.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfig.java
index 8266912fc..7d4ea3edf 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfig.java
@@ -24,7 +24,6 @@ import 
org.apache.seatunnel.connectors.cdc.base.config.StopConfig;
 import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
 import io.debezium.relational.RelationalTableFilters;
 
-import java.time.Duration;
 import java.util.List;
 import java.util.Properties;
 
@@ -52,7 +51,7 @@ public class SqlServerSourceConfig extends JdbcSourceConfig {
             String password,
             int fetchSize,
             String serverTimeZone,
-            Duration connectTimeout,
+            long connectTimeoutMillis,
             int connectMaxRetries,
             int connectionPoolSize) {
         super(
@@ -71,7 +70,7 @@ public class SqlServerSourceConfig extends JdbcSourceConfig {
                 password,
                 fetchSize,
                 serverTimeZone,
-                connectTimeout,
+                connectTimeoutMillis,
                 connectMaxRetries,
                 connectionPoolSize);
     }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfigFactory.java
index 52cdb29e6..d12b8aaa0 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfigFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfigFactory.java
@@ -91,7 +91,7 @@ public class SqlServerSourceConfigFactory extends 
JdbcSourceConfigFactory {
                 password,
                 fetchSize,
                 serverTimeZone,
-                connectTimeout,
+                connectTimeoutMillis,
                 connectMaxRetries,
                 connectionPoolSize);
     }

Reply via email to