This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f60ac1a5e Update CDC StartupMode and StopMode option to
SingleChoiceOption (#4357)
f60ac1a5e is described below
commit f60ac1a5e99432c527ebb1b2f8852e8c1a1807e6
Author: Eric <[email protected]>
AuthorDate: Wed Mar 15 14:51:32 2023 +0800
Update CDC StartupMode and StopMode option to SingleChoiceOption (#4357)
---
.../connectors/cdc/base/option/SourceOptions.java | 46 ++++++++++------------
.../cdc/base/source/IncrementalSource.java | 5 ++-
.../source/MySqlIncrementalSourceFactory.java | 22 +++++++++++
.../source/SqlServerIncrementalSourceFactory.java | 21 ++++++++++
4 files changed, 66 insertions(+), 28 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
index 525069d11..b7090e6fd 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
@@ -19,9 +19,11 @@ package org.apache.seatunnel.connectors.cdc.base.option;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.SingleChoiceOption;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
+import java.util.ArrayList;
import java.util.Map;
@SuppressWarnings("MagicNumber")
@@ -41,13 +43,23 @@ public class SourceOptions {
.withDescription(
"The maximum fetch size for per poll when read
table snapshot.");
- public static final Option<StartupMode> STARTUP_MODE =
- Options.key("startup.mode")
- .enumType(StartupMode.class)
- .defaultValue(StartupMode.INITIAL)
- .withDescription(
- "Optional startup mode for CDC source, valid
enumerations are "
- + "\"initial\", \"earliest\", \"latest\",
\"timestamp\"\n or \"specific\"");
+ public static final SingleChoiceOption<StartupMode> STARTUP_MODE =
+ (SingleChoiceOption)
+ Options.key("startup.mode")
+ .singleChoice(StartupMode.class, new ArrayList<>())
+ .defaultValue(StartupMode.INITIAL)
+ .withDescription(
+ "Optional startup mode for CDC source,
valid enumerations are "
+ + "\"initial\", \"earliest\",
\"latest\", \"timestamp\"\n or \"specific\"");
+
+ public static final SingleChoiceOption<StopMode> STOP_MODE =
+ (SingleChoiceOption)
+ Options.key("stop.mode")
+ .singleChoice(StopMode.class, new ArrayList<>())
+ .defaultValue(StopMode.NEVER)
+ .withDescription(
+ "Optional stop mode for CDC source, valid
enumerations are "
+ + "\"never\", \"latest\",
\"timestamp\"\n or \"specific\"");
public static final Option<Long> STARTUP_TIMESTAMP =
Options.key("startup.timestamp")
@@ -74,14 +86,6 @@ public class SourceOptions {
.defaultValue(1)
.withDescription("The number of parallel readers in the
incremental phase.");
- public static final Option<StopMode> STOP_MODE =
- Options.key("stop.mode")
- .enumType(StopMode.class)
- .defaultValue(StopMode.NEVER)
- .withDescription(
- "Optional stop mode for CDC source, valid
enumerations are "
- + "\"never\", \"latest\", \"timestamp\"\n
or \"specific\"");
-
public static final Option<Long> STOP_TIMESTAMP =
Options.key("stop.timestamp")
.longType()
@@ -122,16 +126,6 @@ public class SourceOptions {
.optional(STARTUP_MODE, STOP_MODE)
.optional(DEBEZIUM_PROPERTIES)
.conditional(STARTUP_MODE, StartupMode.TIMESTAMP,
STARTUP_TIMESTAMP)
- .conditional(
- STARTUP_MODE,
- StartupMode.SPECIFIC,
- STARTUP_SPECIFIC_OFFSET_FILE,
- STARTUP_SPECIFIC_OFFSET_POS)
- .conditional(STOP_MODE, StopMode.TIMESTAMP, STOP_TIMESTAMP)
- .conditional(
- STOP_MODE,
- StopMode.SPECIFIC,
- STOP_SPECIFIC_OFFSET_FILE,
- STOP_SPECIFIC_OFFSET_POS);
+ .conditional(STOP_MODE, StopMode.TIMESTAMP, STOP_TIMESTAMP);
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index dbad777cc..a11014afa 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -120,7 +121,7 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
protected StartupConfig getStartupConfig(ReadonlyConfig config) {
return new StartupConfig(
- config.get(SourceOptions.STARTUP_MODE),
+ config.get((Option<StartupMode>) SourceOptions.STARTUP_MODE),
config.get(SourceOptions.STARTUP_SPECIFIC_OFFSET_FILE),
config.get(SourceOptions.STARTUP_SPECIFIC_OFFSET_POS),
config.get(SourceOptions.STARTUP_TIMESTAMP));
@@ -128,7 +129,7 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
private StopConfig getStopConfig(ReadonlyConfig config) {
return new StopConfig(
- config.get(SourceOptions.STOP_MODE),
+ config.get((Option<StopMode>) SourceOptions.STOP_MODE),
config.get(SourceOptions.STOP_SPECIFIC_OFFSET_FILE),
config.get(SourceOptions.STOP_SPECIFIC_OFFSET_POS),
config.get(SourceOptions.STOP_TIMESTAMP));
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index a185921ca..4274488e9 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -32,11 +32,15 @@ import
org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import com.google.auto.service.AutoService;
import java.io.Serializable;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -50,6 +54,14 @@ public class MySqlIncrementalSourceFactory implements
TableSourceFactory, Suppor
@Override
public OptionRule optionRule() {
+ SourceOptions.STARTUP_MODE
+ .getOptionValues()
+ .addAll(
+ Arrays.asList(
+ StartupMode.INITIAL, StartupMode.EARLIEST,
StartupMode.LATEST));
+
+
SourceOptions.STOP_MODE.getOptionValues().addAll(Arrays.asList(StopMode.NEVER));
+
return JdbcSourceOptions.getBaseRule()
.required(
JdbcSourceOptions.USERNAME,
@@ -63,6 +75,16 @@ public class MySqlIncrementalSourceFactory implements
TableSourceFactory, Suppor
JdbcSourceOptions.CONNECT_TIMEOUT_MS,
JdbcSourceOptions.CONNECT_MAX_RETRIES,
JdbcSourceOptions.CONNECTION_POOL_SIZE)
+ .conditional(
+ SourceOptions.STARTUP_MODE,
+ StartupMode.SPECIFIC,
+ SourceOptions.STARTUP_SPECIFIC_OFFSET_FILE,
+ SourceOptions.STARTUP_SPECIFIC_OFFSET_POS)
+ .conditional(
+ SourceOptions.STOP_MODE,
+ StopMode.SPECIFIC,
+ SourceOptions.STOP_SPECIFIC_OFFSET_FILE,
+ SourceOptions.STOP_SPECIFIC_OFFSET_POS)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
index 01f79f5e7..d859493ac 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
@@ -22,6 +22,11 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
+
+import java.util.Arrays;
public class SqlServerIncrementalSourceFactory implements TableSourceFactory {
@@ -32,6 +37,14 @@ public class SqlServerIncrementalSourceFactory implements
TableSourceFactory {
@Override
public OptionRule optionRule() {
+ SourceOptions.STARTUP_MODE
+ .getOptionValues()
+ .addAll(
+ Arrays.asList(
+ StartupMode.INITIAL, StartupMode.EARLIEST,
StartupMode.LATEST));
+
+
SourceOptions.STOP_MODE.getOptionValues().addAll(Arrays.asList(StopMode.NEVER));
+
return JdbcSourceOptions.getBaseRule()
.required(
JdbcSourceOptions.HOSTNAME,
@@ -45,6 +58,14 @@ public class SqlServerIncrementalSourceFactory implements
TableSourceFactory {
JdbcSourceOptions.CONNECT_TIMEOUT_MS,
JdbcSourceOptions.CONNECT_MAX_RETRIES,
JdbcSourceOptions.CONNECTION_POOL_SIZE)
+ .conditional(
+ SourceOptions.STARTUP_MODE,
+ StartupMode.SPECIFIC,
+ SourceOptions.STARTUP_SPECIFIC_OFFSET_POS)
+ .conditional(
+ SourceOptions.STOP_MODE,
+ StopMode.SPECIFIC,
+ SourceOptions.STOP_SPECIFIC_OFFSET_POS)
.build();
}