This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 356538de8 [Improve][CDC] Add mysql-cdc source factory (#3791)
356538de8 is described below
commit 356538de8ade3fd2103377cbe8d0b5b366ff6d8a
Author: hailin0 <[email protected]>
AuthorDate: Fri Dec 23 16:17:27 2022 +0800
[Improve][CDC] Add mysql-cdc source factory (#3791)
* [Improve][CDC] Add mysql-cdc source factory
* update CONNECT_TIMEOUT_MS options data type
---
.../cdc/base/config/JdbcSourceConfig.java | 11 ++---
.../cdc/base/config/JdbcSourceConfigFactory.java | 9 ++--
.../cdc/base/option/JdbcSourceOptions.java | 10 ++--
.../connection/JdbcConnectionPoolFactory.java | 2 +-
.../cdc/mysql/config/MySqlSourceConfig.java | 5 +-
.../cdc/mysql/config/MySqlSourceConfigFactory.java | 4 +-
.../cdc/mysql/source/MySqlIncrementalSource.java | 4 +-
.../source/MySqlIncrementalSourceFactory.java | 54 ++++++++++++++++++++++
.../source/MySqlIncrementalSourceFactoryTest.java | 28 +++++++++++
.../source/config/SqlServerSourceConfig.java | 5 +-
.../config/SqlServerSourceConfigFactory.java | 2 +-
11 files changed, 106 insertions(+), 28 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
index 72a7434db..c2a602540 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
@@ -21,7 +21,6 @@ import
org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
-import java.time.Duration;
import java.util.List;
import java.util.Properties;
@@ -39,7 +38,7 @@ public abstract class JdbcSourceConfig extends
BaseSourceConfig {
protected final List<String> tableList;
protected final int fetchSize;
protected final String serverTimeZone;
- protected final Duration connectTimeout;
+ protected final long connectTimeoutMillis;
protected final int connectMaxRetries;
protected final int connectionPoolSize;
@@ -59,7 +58,7 @@ public abstract class JdbcSourceConfig extends
BaseSourceConfig {
String password,
int fetchSize,
String serverTimeZone,
- Duration connectTimeout,
+ long connectTimeoutMillis,
int connectMaxRetries,
int connectionPoolSize) {
super(
@@ -78,7 +77,7 @@ public abstract class JdbcSourceConfig extends
BaseSourceConfig {
this.tableList = tableList;
this.fetchSize = fetchSize;
this.serverTimeZone = serverTimeZone;
- this.connectTimeout = connectTimeout;
+ this.connectTimeoutMillis = connectTimeoutMillis;
this.connectMaxRetries = connectMaxRetries;
this.connectionPoolSize = connectionPoolSize;
}
@@ -121,8 +120,8 @@ public abstract class JdbcSourceConfig extends
BaseSourceConfig {
return serverTimeZone;
}
- public Duration getConnectTimeout() {
- return connectTimeout;
+ public long getConnectTimeoutMillis() {
+ return connectTimeoutMillis;
}
public int getConnectMaxRetries() {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
index 8263863f2..8b764bac4 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
-import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -47,7 +46,7 @@ public abstract class JdbcSourceConfigFactory implements
SourceConfig.Factory<Jd
protected int splitSize = SourceOptions.SNAPSHOT_SPLIT_SIZE.defaultValue();
protected int fetchSize = SourceOptions.SNAPSHOT_FETCH_SIZE.defaultValue();
protected String serverTimeZone =
JdbcSourceOptions.SERVER_TIME_ZONE.defaultValue();
- protected Duration connectTimeout =
JdbcSourceOptions.CONNECT_TIMEOUT.defaultValue();
+ protected long connectTimeoutMillis =
JdbcSourceOptions.CONNECT_TIMEOUT_MS.defaultValue();
protected int connectMaxRetries =
JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue();
protected int connectionPoolSize =
JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue();
protected Properties dbzProperties;
@@ -144,8 +143,8 @@ public abstract class JdbcSourceConfigFactory implements
SourceConfig.Factory<Jd
* The maximum time that the connector should wait after trying to connect
to the database
* server before timing out.
*/
- public JdbcSourceConfigFactory connectTimeout(Duration connectTimeout) {
- this.connectTimeout = connectTimeout;
+ public JdbcSourceConfigFactory connectTimeoutMillis(long
connectTimeoutMillis) {
+ this.connectTimeoutMillis = connectTimeoutMillis;
return this;
}
@@ -199,7 +198,7 @@ public abstract class JdbcSourceConfigFactory implements
SourceConfig.Factory<Jd
this.splitSize = config.get(SourceOptions.SNAPSHOT_SPLIT_SIZE);
this.fetchSize = config.get(SourceOptions.SNAPSHOT_FETCH_SIZE);
this.serverTimeZone = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
- this.connectTimeout = config.get(JdbcSourceOptions.CONNECT_TIMEOUT);
+ this.connectTimeoutMillis =
config.get(JdbcSourceOptions.CONNECT_TIMEOUT_MS);
this.connectMaxRetries =
config.get(JdbcSourceOptions.CONNECT_MAX_RETRIES);
this.connectionPoolSize =
config.get(JdbcSourceOptions.CONNECTION_POOL_SIZE);
this.dbzProperties = new Properties();
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
index dec804e01..4839ecefe 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
@@ -21,8 +21,6 @@ import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
-import java.time.Duration;
-
/** Configurations for {@link IncrementalSource} of JDBC data source. */
@SuppressWarnings("checkstyle:MagicNumber")
public class JdbcSourceOptions extends SourceOptions {
@@ -83,10 +81,10 @@ public class JdbcSourceOptions extends SourceOptions {
+ "so it can read the binlog. By default,
a random number is generated between"
+ " 5400 and 6400, though we recommend
setting an explicit value.");
- public static final Option<Duration> CONNECT_TIMEOUT =
- Options.key("connect.timeout")
- .durationType()
- .defaultValue(Duration.ofSeconds(30))
+ public static final Option<Long> CONNECT_TIMEOUT_MS =
+ Options.key("connect.timeout.ms")
+ .longType()
+ .defaultValue(30000L)
.withDescription(
"The maximum time that the connector should wait
after trying to connect to the database server before timing out.");
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPoolFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPoolFactory.java
index 71556373a..63740a623 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPoolFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPoolFactory.java
@@ -41,7 +41,7 @@ public abstract class JdbcConnectionPoolFactory {
config.setPassword(sourceConfig.getPassword());
config.setMinimumIdle(MINIMUM_POOL_SIZE);
config.setMaximumPoolSize(sourceConfig.getConnectionPoolSize());
-
config.setConnectionTimeout(sourceConfig.getConnectTimeout().toMillis());
+ config.setConnectionTimeout(sourceConfig.getConnectTimeoutMillis());
config.addDataSourceProperty(SERVER_TIMEZONE_KEY,
sourceConfig.getServerTimeZone());
config.setDriverClassName(sourceConfig.getDriverClassName());
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
index 653a3fa0a..5177398b9 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java
@@ -24,7 +24,6 @@ import
org.apache.seatunnel.connectors.cdc.base.config.StopConfig;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.relational.RelationalTableFilters;
-import java.time.Duration;
import java.util.List;
import java.util.Properties;
@@ -52,7 +51,7 @@ public class MySqlSourceConfig extends JdbcSourceConfig {
String password,
int fetchSize,
String serverTimeZone,
- Duration connectTimeout,
+ long connectTimeoutMillis,
int connectMaxRetries,
int connectionPoolSize) {
super(
@@ -71,7 +70,7 @@ public class MySqlSourceConfig extends JdbcSourceConfig {
password,
fetchSize,
serverTimeZone,
- connectTimeout,
+ connectTimeoutMillis,
connectMaxRetries,
connectionPoolSize);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
index 1f67a6f56..5fd9ea5bb 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
@@ -71,7 +71,7 @@ public class MySqlSourceConfigFactory extends
JdbcSourceConfigFactory {
props.setProperty("database.history.skip.unparseable.ddl",
String.valueOf(true));
props.setProperty("database.history.refer.ddl", String.valueOf(true));
- props.setProperty("connect.timeout.ms",
String.valueOf(connectTimeout.toMillis()));
+ props.setProperty("connect.timeout.ms",
String.valueOf(connectTimeoutMillis));
// the underlying debezium reader should always capture the schema
changes and forward them.
// Note: the includeSchemaChanges parameter is used to control
emitting the schema record,
// only DataStream API program need to emit the schema record, the
Table API need not
@@ -125,7 +125,7 @@ public class MySqlSourceConfigFactory extends
JdbcSourceConfigFactory {
password,
fetchSize,
serverTimeZone,
- connectTimeout,
+ connectTimeoutMillis,
connectMaxRetries,
connectionPoolSize);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index bac7186bf..b473c97a7 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -42,9 +42,11 @@ import java.time.ZoneId;
@AutoService(SeaTunnelSource.class)
public class MySqlIncrementalSource<T> extends IncrementalSource<T,
JdbcSourceConfig> {
+ static final String IDENTIFIER = "MySQL-CDC";
+
@Override
public String getPluginName() {
- return "MySQL-CDC";
+ return IDENTIFIER;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
new file mode 100644
index 000000000..3ecb0d58c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class MySqlIncrementalSourceFactory implements TableSourceFactory {
+ @Override
+ public String factoryIdentifier() {
+ return MySqlIncrementalSource.IDENTIFIER;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return JdbcSourceOptions.BASE_RULE
+ .required(
+ JdbcSourceOptions.HOSTNAME,
+ JdbcSourceOptions.USERNAME,
+ JdbcSourceOptions.PASSWORD,
+ JdbcSourceOptions.DATABASE_NAME,
+ JdbcSourceOptions.TABLE_NAME,
+ JdbcCatalogOptions.BASE_URL)
+ .optional(
+ JdbcSourceOptions.PORT,
+ JdbcSourceOptions.SERVER_ID,
+ JdbcSourceOptions.SERVER_TIME_ZONE,
+ JdbcSourceOptions.CONNECT_TIMEOUT_MS,
+ JdbcSourceOptions.CONNECT_MAX_RETRIES,
+ JdbcSourceOptions.CONNECTION_POOL_SIZE)
+ .build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactoryTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactoryTest.java
new file mode 100644
index 000000000..37ab496fb
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactoryTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class MySqlIncrementalSourceFactoryTest {
+ @Test
+ public void testOptionRule() {
+ Assertions.assertNotNull((new
MySqlIncrementalSourceFactory()).optionRule());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfig.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfig.java
index 8266912fc..7d4ea3edf 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfig.java
@@ -24,7 +24,6 @@ import
org.apache.seatunnel.connectors.cdc.base.config.StopConfig;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.relational.RelationalTableFilters;
-import java.time.Duration;
import java.util.List;
import java.util.Properties;
@@ -52,7 +51,7 @@ public class SqlServerSourceConfig extends JdbcSourceConfig {
String password,
int fetchSize,
String serverTimeZone,
- Duration connectTimeout,
+ long connectTimeoutMillis,
int connectMaxRetries,
int connectionPoolSize) {
super(
@@ -71,7 +70,7 @@ public class SqlServerSourceConfig extends JdbcSourceConfig {
password,
fetchSize,
serverTimeZone,
- connectTimeout,
+ connectTimeoutMillis,
connectMaxRetries,
connectionPoolSize);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfigFactory.java
index 52cdb29e6..d12b8aaa0 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfigFactory.java
@@ -91,7 +91,7 @@ public class SqlServerSourceConfigFactory extends
JdbcSourceConfigFactory {
password,
fetchSize,
serverTimeZone,
- connectTimeout,
+ connectTimeoutMillis,
connectMaxRetries,
connectionPoolSize);
}