This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b71d8739d [Improve][CDC] Improve startup.mode/stop.mode options (#4360)
b71d8739d is described below
commit b71d8739d5e851ced70e6fd3e647e850d6d6bc81
Author: Eric <[email protected]>
AuthorDate: Fri Mar 17 10:40:05 2023 +0800
[Improve][CDC] Improve startup.mode/stop.mode options (#4360)
* improve cdc options
* Update
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/CdcSourceOptions.java
Co-authored-by: Zongwen Li <[email protected]>
* Update
seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/CdcSourceOptions.java
Co-authored-by: Zongwen Li <[email protected]>
* improve cdc options
---------
Co-authored-by: Zongwen Li <[email protected]>
---
.../connectors/cdc/base/config/StartupConfig.java | 2 +
.../connectors/cdc/base/option/SourceOptions.java | 28 ++----------
.../cdc/base/source/IncrementalSource.java | 8 +++-
.../cdc/mysql/source/MySqlIncrementalSource.java | 13 ++++++
.../source/MySqlIncrementalSourceFactory.java | 22 +++++-----
.../cdc/mysql/source/MySqlSourceOptions.java | 51 ++++++++++++++++++++++
.../source/source/SqlServerIncrementalSource.java | 13 ++++++
.../source/SqlServerIncrementalSourceFactory.java | 23 +++++-----
.../source/source/SqlServerSourceOptions.java | 50 +++++++++++++++++++++
9 files changed, 161 insertions(+), 49 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/StartupConfig.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/StartupConfig.java
index 98a079c73..52e7ec23e 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/StartupConfig.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/StartupConfig.java
@@ -44,6 +44,8 @@ public final class StartupConfig implements Serializable {
return offsetFactory.latest();
case INITIAL:
return null;
+ case SPECIFIC:
+ return offsetFactory.specific(specificOffsetFile,
specificOffsetPos);
case TIMESTAMP:
return offsetFactory.timestamp(timestamp);
default:
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
index 5a59b3734..b0f69c616 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
@@ -19,16 +19,17 @@ package org.apache.seatunnel.connectors.cdc.base.option;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.configuration.SingleChoiceOption;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
-import java.util.ArrayList;
import java.util.Map;
@SuppressWarnings("MagicNumber")
public class SourceOptions {
+ public static final String STARTUP_MODE_KEY = "startup.mode";
+ public static final String STOP_MODE_KEY = "stop.mode";
+
public static final Option<Integer> SNAPSHOT_SPLIT_SIZE =
Options.key("snapshot.split.size")
.intType()
@@ -43,24 +44,6 @@ public class SourceOptions {
.withDescription(
"The maximum fetch size for per poll when read
table snapshot.");
- public static final SingleChoiceOption<StartupMode> STARTUP_MODE =
- (SingleChoiceOption)
- Options.key("startup.mode")
- .singleChoice(StartupMode.class, new ArrayList<>())
- .defaultValue(StartupMode.INITIAL)
- .withDescription(
- "Optional startup mode for CDC source,
valid enumerations are "
- + "\"initial\", \"earliest\",
\"latest\", \"timestamp\"\n or \"specific\"");
-
- public static final SingleChoiceOption<StopMode> STOP_MODE =
- (SingleChoiceOption)
- Options.key("stop.mode")
- .singleChoice(StopMode.class, new ArrayList<>())
- .defaultValue(StopMode.NEVER)
- .withDescription(
- "Optional stop mode for CDC source, valid
enumerations are "
- + "\"never\", \"latest\",
\"timestamp\"\n or \"specific\"");
-
public static final Option<Long> STARTUP_TIMESTAMP =
Options.key("startup.timestamp")
.longType()
@@ -124,9 +107,6 @@ public class SourceOptions {
.optional(FORMAT)
.optional(SNAPSHOT_SPLIT_SIZE, SNAPSHOT_FETCH_SIZE)
.optional(INCREMENTAL_PARALLELISM)
- .optional(STARTUP_MODE, STOP_MODE)
- .optional(DEBEZIUM_PROPERTIES)
- .conditional(STARTUP_MODE, StartupMode.TIMESTAMP,
STARTUP_TIMESTAMP)
- .conditional(STOP_MODE, StopMode.TIMESTAMP, STOP_TIMESTAMP);
+ .optional(DEBEZIUM_PROPERTIES);
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index a11014afa..965d3fd27 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -121,7 +121,7 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
protected StartupConfig getStartupConfig(ReadonlyConfig config) {
return new StartupConfig(
- config.get((Option<StartupMode>) SourceOptions.STARTUP_MODE),
+ config.get(getStartupModeOption()),
config.get(SourceOptions.STARTUP_SPECIFIC_OFFSET_FILE),
config.get(SourceOptions.STARTUP_SPECIFIC_OFFSET_POS),
config.get(SourceOptions.STARTUP_TIMESTAMP));
@@ -129,12 +129,16 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
private StopConfig getStopConfig(ReadonlyConfig config) {
return new StopConfig(
- config.get((Option<StopMode>) SourceOptions.STOP_MODE),
+ config.get(getStopModeOption()),
config.get(SourceOptions.STOP_SPECIFIC_OFFSET_FILE),
config.get(SourceOptions.STOP_SPECIFIC_OFFSET_POS),
config.get(SourceOptions.STOP_TIMESTAMP));
}
+ public abstract Option<StartupMode> getStartupModeOption();
+
+ public abstract Option<StopMode> getStopModeOption();
+
public abstract SourceConfig.Factory<C>
createSourceConfigFactory(ReadonlyConfig config);
public abstract DebeziumDeserializationSchema<T>
createDebeziumDeserializationSchema(
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index a8531bd05..6f886bf62 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
+import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportParallelism;
@@ -32,6 +33,8 @@ import
org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
@@ -59,6 +62,16 @@ public class MySqlIncrementalSource<T> extends
IncrementalSource<T, JdbcSourceCo
super(options, dataType);
}
+ @Override
+ public Option<StartupMode> getStartupModeOption() {
+ return MySqlSourceOptions.STARTUP_MODE;
+ }
+
+ @Override
+ public Option<StopMode> getStopModeOption() {
+ return MySqlSourceOptions.STOP_MODE;
+ }
+
@Override
public String getPluginName() {
return IDENTIFIER;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index 4274488e9..3be95901f 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -40,7 +40,6 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions
import com.google.auto.service.AutoService;
import java.io.Serializable;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -54,14 +53,6 @@ public class MySqlIncrementalSourceFactory implements
TableSourceFactory, Suppor
@Override
public OptionRule optionRule() {
- SourceOptions.STARTUP_MODE
- .getOptionValues()
- .addAll(
- Arrays.asList(
- StartupMode.INITIAL, StartupMode.EARLIEST,
StartupMode.LATEST));
-
-
SourceOptions.STOP_MODE.getOptionValues().addAll(Arrays.asList(StopMode.NEVER));
-
return JdbcSourceOptions.getBaseRule()
.required(
JdbcSourceOptions.USERNAME,
@@ -75,16 +66,25 @@ public class MySqlIncrementalSourceFactory implements
TableSourceFactory, Suppor
JdbcSourceOptions.CONNECT_TIMEOUT_MS,
JdbcSourceOptions.CONNECT_MAX_RETRIES,
JdbcSourceOptions.CONNECTION_POOL_SIZE)
+ .optional(MySqlSourceOptions.STARTUP_MODE,
MySqlSourceOptions.STOP_MODE)
.conditional(
- SourceOptions.STARTUP_MODE,
+ MySqlSourceOptions.STARTUP_MODE,
StartupMode.SPECIFIC,
SourceOptions.STARTUP_SPECIFIC_OFFSET_FILE,
SourceOptions.STARTUP_SPECIFIC_OFFSET_POS)
.conditional(
- SourceOptions.STOP_MODE,
+ MySqlSourceOptions.STOP_MODE,
StopMode.SPECIFIC,
SourceOptions.STOP_SPECIFIC_OFFSET_FILE,
SourceOptions.STOP_SPECIFIC_OFFSET_POS)
+ .conditional(
+ MySqlSourceOptions.STARTUP_MODE,
+ StartupMode.TIMESTAMP,
+ SourceOptions.STARTUP_TIMESTAMP)
+ .conditional(
+ MySqlSourceOptions.STOP_MODE,
+ StopMode.TIMESTAMP,
+ SourceOptions.STOP_TIMESTAMP)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSourceOptions.java
new file mode 100644
index 000000000..43f3f4c70
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSourceOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
+
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.SingleChoiceOption;
+import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
+
+import java.util.Arrays;
+
+public class MySqlSourceOptions {
+ public static final SingleChoiceOption<StartupMode> STARTUP_MODE =
+ (SingleChoiceOption)
+ Options.key(SourceOptions.STARTUP_MODE_KEY)
+ .singleChoice(
+ StartupMode.class,
+ Arrays.asList(
+ StartupMode.INITIAL,
+ StartupMode.EARLIEST,
+ StartupMode.LATEST))
+ .defaultValue(StartupMode.INITIAL)
+ .withDescription(
+ "Optional startup mode for CDC source,
valid enumerations are "
+ + "\"initial\", \"earliest\",
\"latest\", \"timestamp\"\n or \"specific\"");
+
+ public static final SingleChoiceOption<StopMode> STOP_MODE =
+ (SingleChoiceOption)
+ Options.key(SourceOptions.STOP_MODE_KEY)
+ .singleChoice(StopMode.class,
Arrays.asList(StopMode.NEVER))
+ .defaultValue(StopMode.NEVER)
+ .withDescription(
+ "Optional stop mode for CDC source, valid
enumerations are "
+ + "\"never\", \"latest\",
\"timestamp\"\n or \"specific\"");
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
index 43c8c06c0..91408cd1d 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source;
+import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportParallelism;
@@ -25,6 +26,8 @@ import
org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
@@ -56,6 +59,16 @@ public class SqlServerIncrementalSource<T> extends
IncrementalSource<T, JdbcSour
return IDENTIFIER;
}
+ @Override
+ public Option<StartupMode> getStartupModeOption() {
+ return SqlServerSourceOptions.STARTUP_MODE;
+ }
+
+ @Override
+ public Option<StopMode> getStopModeOption() {
+ return SqlServerSourceOptions.STOP_MODE;
+ }
+
@Override
public SourceConfig.Factory<JdbcSourceConfig>
createSourceConfigFactory(ReadonlyConfig config) {
SqlServerSourceConfigFactory configFactory = new
SqlServerSourceConfigFactory();
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
index d859493ac..b476ab013 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
@@ -26,8 +26,6 @@ import
org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
-import java.util.Arrays;
-
public class SqlServerIncrementalSourceFactory implements TableSourceFactory {
@Override
@@ -37,14 +35,6 @@ public class SqlServerIncrementalSourceFactory implements
TableSourceFactory {
@Override
public OptionRule optionRule() {
- SourceOptions.STARTUP_MODE
- .getOptionValues()
- .addAll(
- Arrays.asList(
- StartupMode.INITIAL, StartupMode.EARLIEST,
StartupMode.LATEST));
-
-
SourceOptions.STOP_MODE.getOptionValues().addAll(Arrays.asList(StopMode.NEVER));
-
return JdbcSourceOptions.getBaseRule()
.required(
JdbcSourceOptions.HOSTNAME,
@@ -58,14 +48,23 @@ public class SqlServerIncrementalSourceFactory implements
TableSourceFactory {
JdbcSourceOptions.CONNECT_TIMEOUT_MS,
JdbcSourceOptions.CONNECT_MAX_RETRIES,
JdbcSourceOptions.CONNECTION_POOL_SIZE)
+ .optional(SqlServerSourceOptions.STARTUP_MODE,
SqlServerSourceOptions.STOP_MODE)
.conditional(
- SourceOptions.STARTUP_MODE,
+ SqlServerSourceOptions.STARTUP_MODE,
StartupMode.SPECIFIC,
SourceOptions.STARTUP_SPECIFIC_OFFSET_POS)
.conditional(
- SourceOptions.STOP_MODE,
+ SqlServerSourceOptions.STOP_MODE,
StopMode.SPECIFIC,
SourceOptions.STOP_SPECIFIC_OFFSET_POS)
+ .conditional(
+ SqlServerSourceOptions.STARTUP_MODE,
+ StartupMode.TIMESTAMP,
+ SourceOptions.STARTUP_TIMESTAMP)
+ .conditional(
+ SqlServerSourceOptions.STOP_MODE,
+ StopMode.TIMESTAMP,
+ SourceOptions.STOP_TIMESTAMP)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerSourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerSourceOptions.java
new file mode 100644
index 000000000..ed7d6258b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerSourceOptions.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source;
+
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.SingleChoiceOption;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
+
+import java.util.Arrays;
+
+public class SqlServerSourceOptions {
+ public static final SingleChoiceOption<StartupMode> STARTUP_MODE =
+ (SingleChoiceOption)
+ Options.key("startup.mode")
+ .singleChoice(
+ StartupMode.class,
+ Arrays.asList(
+ StartupMode.INITIAL,
+ StartupMode.EARLIEST,
+ StartupMode.LATEST))
+ .defaultValue(StartupMode.INITIAL)
+ .withDescription(
+ "Optional startup mode for CDC source,
valid enumerations are "
+ + "\"initial\", \"earliest\",
\"latest\", \"timestamp\"\n or \"specific\"");
+
+ public static final SingleChoiceOption<StopMode> STOP_MODE =
+ (SingleChoiceOption)
+ Options.key("stop.mode")
+ .singleChoice(StopMode.class,
Arrays.asList(StopMode.NEVER))
+ .defaultValue(StopMode.NEVER)
+ .withDescription(
+ "Optional stop mode for CDC source, valid
enumerations are "
+ + "\"never\", \"latest\",
\"timestamp\"\n or \"specific\"");
+}