This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b71d8739d [Improve][CDC] Improve startup.mode/stop.mode options (#4360)
b71d8739d is described below

commit b71d8739d5e851ced70e6fd3e647e850d6d6bc81
Author: Eric <[email protected]>
AuthorDate: Fri Mar 17 10:40:05 2023 +0800

    [Improve][CDC] Improve startup.mode/stop.mode options (#4360)
    
    * improve cdc options
    
    * Update 
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/CdcSourceOptions.java
    
    Co-authored-by: Zongwen Li <[email protected]>
    
    * Update 
seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/CdcSourceOptions.java
    
    Co-authored-by: Zongwen Li <[email protected]>
    
    * improve cdc options
    
    ---------
    
    Co-authored-by: Zongwen Li <[email protected]>
---
 .../connectors/cdc/base/config/StartupConfig.java  |  2 +
 .../connectors/cdc/base/option/SourceOptions.java  | 28 ++----------
 .../cdc/base/source/IncrementalSource.java         |  8 +++-
 .../cdc/mysql/source/MySqlIncrementalSource.java   | 13 ++++++
 .../source/MySqlIncrementalSourceFactory.java      | 22 +++++-----
 .../cdc/mysql/source/MySqlSourceOptions.java       | 51 ++++++++++++++++++++++
 .../source/source/SqlServerIncrementalSource.java  | 13 ++++++
 .../source/SqlServerIncrementalSourceFactory.java  | 23 +++++-----
 .../source/source/SqlServerSourceOptions.java      | 50 +++++++++++++++++++++
 9 files changed, 161 insertions(+), 49 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/StartupConfig.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/StartupConfig.java
index 98a079c73..52e7ec23e 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/StartupConfig.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/StartupConfig.java
@@ -44,6 +44,8 @@ public final class StartupConfig implements Serializable {
                 return offsetFactory.latest();
             case INITIAL:
                 return null;
+            case SPECIFIC:
+                return offsetFactory.specific(specificOffsetFile, 
specificOffsetPos);
             case TIMESTAMP:
                 return offsetFactory.timestamp(timestamp);
             default:
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
index 5a59b3734..b0f69c616 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
@@ -19,16 +19,17 @@ package org.apache.seatunnel.connectors.cdc.base.option;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.configuration.SingleChoiceOption;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
 
-import java.util.ArrayList;
 import java.util.Map;
 
 @SuppressWarnings("MagicNumber")
 public class SourceOptions {
 
+    public static final String STARTUP_MODE_KEY = "startup.mode";
+    public static final String STOP_MODE_KEY = "stop.mode";
+
     public static final Option<Integer> SNAPSHOT_SPLIT_SIZE =
             Options.key("snapshot.split.size")
                     .intType()
@@ -43,24 +44,6 @@ public class SourceOptions {
                     .withDescription(
                             "The maximum fetch size for per poll when read 
table snapshot.");
 
-    public static final SingleChoiceOption<StartupMode> STARTUP_MODE =
-            (SingleChoiceOption)
-                    Options.key("startup.mode")
-                            .singleChoice(StartupMode.class, new ArrayList<>())
-                            .defaultValue(StartupMode.INITIAL)
-                            .withDescription(
-                                    "Optional startup mode for CDC source, 
valid enumerations are "
-                                            + "\"initial\", \"earliest\", 
\"latest\", \"timestamp\"\n or \"specific\"");
-
-    public static final SingleChoiceOption<StopMode> STOP_MODE =
-            (SingleChoiceOption)
-                    Options.key("stop.mode")
-                            .singleChoice(StopMode.class, new ArrayList<>())
-                            .defaultValue(StopMode.NEVER)
-                            .withDescription(
-                                    "Optional stop mode for CDC source, valid 
enumerations are "
-                                            + "\"never\", \"latest\", 
\"timestamp\"\n or \"specific\"");
-
     public static final Option<Long> STARTUP_TIMESTAMP =
             Options.key("startup.timestamp")
                     .longType()
@@ -124,9 +107,6 @@ public class SourceOptions {
                 .optional(FORMAT)
                 .optional(SNAPSHOT_SPLIT_SIZE, SNAPSHOT_FETCH_SIZE)
                 .optional(INCREMENTAL_PARALLELISM)
-                .optional(STARTUP_MODE, STOP_MODE)
-                .optional(DEBEZIUM_PROPERTIES)
-                .conditional(STARTUP_MODE, StartupMode.TIMESTAMP, 
STARTUP_TIMESTAMP)
-                .conditional(STOP_MODE, StopMode.TIMESTAMP, STOP_TIMESTAMP);
+                .optional(DEBEZIUM_PROPERTIES);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index a11014afa..965d3fd27 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -121,7 +121,7 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
 
     protected StartupConfig getStartupConfig(ReadonlyConfig config) {
         return new StartupConfig(
-                config.get((Option<StartupMode>) SourceOptions.STARTUP_MODE),
+                config.get(getStartupModeOption()),
                 config.get(SourceOptions.STARTUP_SPECIFIC_OFFSET_FILE),
                 config.get(SourceOptions.STARTUP_SPECIFIC_OFFSET_POS),
                 config.get(SourceOptions.STARTUP_TIMESTAMP));
@@ -129,12 +129,16 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
 
     private StopConfig getStopConfig(ReadonlyConfig config) {
         return new StopConfig(
-                config.get((Option<StopMode>) SourceOptions.STOP_MODE),
+                config.get(getStopModeOption()),
                 config.get(SourceOptions.STOP_SPECIFIC_OFFSET_FILE),
                 config.get(SourceOptions.STOP_SPECIFIC_OFFSET_POS),
                 config.get(SourceOptions.STOP_TIMESTAMP));
     }
 
+    public abstract Option<StartupMode> getStartupModeOption();
+
+    public abstract Option<StopMode> getStopModeOption();
+
     public abstract SourceConfig.Factory<C> 
createSourceConfigFactory(ReadonlyConfig config);
 
     public abstract DebeziumDeserializationSchema<T> 
createDebeziumDeserializationSchema(
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index a8531bd05..6f886bf62 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
 
+import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SupportParallelism;
@@ -32,6 +33,8 @@ import 
org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
 import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
 import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
 import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
 import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
 import 
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
@@ -59,6 +62,16 @@ public class MySqlIncrementalSource<T> extends 
IncrementalSource<T, JdbcSourceCo
         super(options, dataType);
     }
 
+    @Override
+    public Option<StartupMode> getStartupModeOption() {
+        return MySqlSourceOptions.STARTUP_MODE;
+    }
+
+    @Override
+    public Option<StopMode> getStopModeOption() {
+        return MySqlSourceOptions.STOP_MODE;
+    }
+
     @Override
     public String getPluginName() {
         return IDENTIFIER;
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index 4274488e9..3be95901f 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -40,7 +40,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions
 import com.google.auto.service.AutoService;
 
 import java.io.Serializable;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -54,14 +53,6 @@ public class MySqlIncrementalSourceFactory implements 
TableSourceFactory, Suppor
 
     @Override
     public OptionRule optionRule() {
-        SourceOptions.STARTUP_MODE
-                .getOptionValues()
-                .addAll(
-                        Arrays.asList(
-                                StartupMode.INITIAL, StartupMode.EARLIEST, 
StartupMode.LATEST));
-
-        
SourceOptions.STOP_MODE.getOptionValues().addAll(Arrays.asList(StopMode.NEVER));
-
         return JdbcSourceOptions.getBaseRule()
                 .required(
                         JdbcSourceOptions.USERNAME,
@@ -75,16 +66,25 @@ public class MySqlIncrementalSourceFactory implements 
TableSourceFactory, Suppor
                         JdbcSourceOptions.CONNECT_TIMEOUT_MS,
                         JdbcSourceOptions.CONNECT_MAX_RETRIES,
                         JdbcSourceOptions.CONNECTION_POOL_SIZE)
+                .optional(MySqlSourceOptions.STARTUP_MODE, 
MySqlSourceOptions.STOP_MODE)
                 .conditional(
-                        SourceOptions.STARTUP_MODE,
+                        MySqlSourceOptions.STARTUP_MODE,
                         StartupMode.SPECIFIC,
                         SourceOptions.STARTUP_SPECIFIC_OFFSET_FILE,
                         SourceOptions.STARTUP_SPECIFIC_OFFSET_POS)
                 .conditional(
-                        SourceOptions.STOP_MODE,
+                        MySqlSourceOptions.STOP_MODE,
                         StopMode.SPECIFIC,
                         SourceOptions.STOP_SPECIFIC_OFFSET_FILE,
                         SourceOptions.STOP_SPECIFIC_OFFSET_POS)
+                .conditional(
+                        MySqlSourceOptions.STARTUP_MODE,
+                        StartupMode.TIMESTAMP,
+                        SourceOptions.STARTUP_TIMESTAMP)
+                .conditional(
+                        MySqlSourceOptions.STOP_MODE,
+                        StopMode.TIMESTAMP,
+                        SourceOptions.STOP_TIMESTAMP)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSourceOptions.java
new file mode 100644
index 000000000..43f3f4c70
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSourceOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
+
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.SingleChoiceOption;
+import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
+
+import java.util.Arrays;
+
+public class MySqlSourceOptions {
+    public static final SingleChoiceOption<StartupMode> STARTUP_MODE =
+            (SingleChoiceOption)
+                    Options.key(SourceOptions.STARTUP_MODE_KEY)
+                            .singleChoice(
+                                    StartupMode.class,
+                                    Arrays.asList(
+                                            StartupMode.INITIAL,
+                                            StartupMode.EARLIEST,
+                                            StartupMode.LATEST))
+                            .defaultValue(StartupMode.INITIAL)
+                            .withDescription(
+                                    "Optional startup mode for CDC source, 
valid enumerations are "
+                                            + "\"initial\", \"earliest\", 
\"latest\", \"timestamp\"\n or \"specific\"");
+
+    public static final SingleChoiceOption<StopMode> STOP_MODE =
+            (SingleChoiceOption)
+                    Options.key(SourceOptions.STOP_MODE_KEY)
+                            .singleChoice(StopMode.class, 
Arrays.asList(StopMode.NEVER))
+                            .defaultValue(StopMode.NEVER)
+                            .withDescription(
+                                    "Optional stop mode for CDC source, valid 
enumerations are "
+                                            + "\"never\", \"latest\", 
\"timestamp\"\n or \"specific\"");
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
index 43c8c06c0..91408cd1d 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source;
 
+import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SupportParallelism;
@@ -25,6 +26,8 @@ import 
org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
 import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
 import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
 import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
 import 
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
@@ -56,6 +59,16 @@ public class SqlServerIncrementalSource<T> extends 
IncrementalSource<T, JdbcSour
         return IDENTIFIER;
     }
 
+    @Override
+    public Option<StartupMode> getStartupModeOption() {
+        return SqlServerSourceOptions.STARTUP_MODE;
+    }
+
+    @Override
+    public Option<StopMode> getStopModeOption() {
+        return SqlServerSourceOptions.STOP_MODE;
+    }
+
     @Override
     public SourceConfig.Factory<JdbcSourceConfig> 
createSourceConfigFactory(ReadonlyConfig config) {
         SqlServerSourceConfigFactory configFactory = new 
SqlServerSourceConfigFactory();
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
index d859493ac..b476ab013 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
@@ -26,8 +26,6 @@ import 
org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
 import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
 
-import java.util.Arrays;
-
 public class SqlServerIncrementalSourceFactory implements TableSourceFactory {
 
     @Override
@@ -37,14 +35,6 @@ public class SqlServerIncrementalSourceFactory implements 
TableSourceFactory {
 
     @Override
     public OptionRule optionRule() {
-        SourceOptions.STARTUP_MODE
-                .getOptionValues()
-                .addAll(
-                        Arrays.asList(
-                                StartupMode.INITIAL, StartupMode.EARLIEST, 
StartupMode.LATEST));
-
-        
SourceOptions.STOP_MODE.getOptionValues().addAll(Arrays.asList(StopMode.NEVER));
-
         return JdbcSourceOptions.getBaseRule()
                 .required(
                         JdbcSourceOptions.HOSTNAME,
@@ -58,14 +48,23 @@ public class SqlServerIncrementalSourceFactory implements 
TableSourceFactory {
                         JdbcSourceOptions.CONNECT_TIMEOUT_MS,
                         JdbcSourceOptions.CONNECT_MAX_RETRIES,
                         JdbcSourceOptions.CONNECTION_POOL_SIZE)
+                .optional(SqlServerSourceOptions.STARTUP_MODE, 
SqlServerSourceOptions.STOP_MODE)
                 .conditional(
-                        SourceOptions.STARTUP_MODE,
+                        SqlServerSourceOptions.STARTUP_MODE,
                         StartupMode.SPECIFIC,
                         SourceOptions.STARTUP_SPECIFIC_OFFSET_POS)
                 .conditional(
-                        SourceOptions.STOP_MODE,
+                        SqlServerSourceOptions.STOP_MODE,
                         StopMode.SPECIFIC,
                         SourceOptions.STOP_SPECIFIC_OFFSET_POS)
+                .conditional(
+                        SqlServerSourceOptions.STARTUP_MODE,
+                        StartupMode.TIMESTAMP,
+                        SourceOptions.STARTUP_TIMESTAMP)
+                .conditional(
+                        SqlServerSourceOptions.STOP_MODE,
+                        StopMode.TIMESTAMP,
+                        SourceOptions.STOP_TIMESTAMP)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerSourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerSourceOptions.java
new file mode 100644
index 000000000..ed7d6258b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerSourceOptions.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source;
+
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.SingleChoiceOption;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
+
+import java.util.Arrays;
+
+public class SqlServerSourceOptions {
+    public static final SingleChoiceOption<StartupMode> STARTUP_MODE =
+            (SingleChoiceOption)
+                    Options.key("startup.mode")
+                            .singleChoice(
+                                    StartupMode.class,
+                                    Arrays.asList(
+                                            StartupMode.INITIAL,
+                                            StartupMode.EARLIEST,
+                                            StartupMode.LATEST))
+                            .defaultValue(StartupMode.INITIAL)
+                            .withDescription(
+                                    "Optional startup mode for CDC source, 
valid enumerations are "
+                                            + "\"initial\", \"earliest\", 
\"latest\", \"timestamp\"\n or \"specific\"");
+
+    public static final SingleChoiceOption<StopMode> STOP_MODE =
+            (SingleChoiceOption)
+                    Options.key("stop.mode")
+                            .singleChoice(StopMode.class, 
Arrays.asList(StopMode.NEVER))
+                            .defaultValue(StopMode.NEVER)
+                            .withDescription(
+                                    "Optional stop mode for CDC source, valid 
enumerations are "
+                                            + "\"never\", \"latest\", 
\"timestamp\"\n or \"specific\"");
+}

Reply via email to