This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f60ac1a5e Update CDC StartupMode and StopMode option to 
SingleChoiceOption (#4357)
f60ac1a5e is described below

commit f60ac1a5e99432c527ebb1b2f8852e8c1a1807e6
Author: Eric <[email protected]>
AuthorDate: Wed Mar 15 14:51:32 2023 +0800

    Update CDC StartupMode and StopMode option to SingleChoiceOption (#4357)
---
 .../connectors/cdc/base/option/SourceOptions.java  | 46 ++++++++++------------
 .../cdc/base/source/IncrementalSource.java         |  5 ++-
 .../source/MySqlIncrementalSourceFactory.java      | 22 +++++++++++
 .../source/SqlServerIncrementalSourceFactory.java  | 21 ++++++++++
 4 files changed, 66 insertions(+), 28 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
index 525069d11..b7090e6fd 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
@@ -19,9 +19,11 @@ package org.apache.seatunnel.connectors.cdc.base.option;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.SingleChoiceOption;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
 
+import java.util.ArrayList;
 import java.util.Map;
 
 @SuppressWarnings("MagicNumber")
@@ -41,13 +43,23 @@ public class SourceOptions {
                     .withDescription(
                             "The maximum fetch size for per poll when read 
table snapshot.");
 
-    public static final Option<StartupMode> STARTUP_MODE =
-            Options.key("startup.mode")
-                    .enumType(StartupMode.class)
-                    .defaultValue(StartupMode.INITIAL)
-                    .withDescription(
-                            "Optional startup mode for CDC source, valid 
enumerations are "
-                                    + "\"initial\", \"earliest\", \"latest\", 
\"timestamp\"\n or \"specific\"");
+    public static final SingleChoiceOption<StartupMode> STARTUP_MODE =
+            (SingleChoiceOption)
+                    Options.key("startup.mode")
+                            .singleChoice(StartupMode.class, new ArrayList<>())
+                            .defaultValue(StartupMode.INITIAL)
+                            .withDescription(
+                                    "Optional startup mode for CDC source, 
valid enumerations are "
+                                            + "\"initial\", \"earliest\", 
\"latest\", \"timestamp\"\n or \"specific\"");
+
+    public static final SingleChoiceOption<StopMode> STOP_MODE =
+            (SingleChoiceOption)
+                    Options.key("stop.mode")
+                            .singleChoice(StopMode.class, new ArrayList<>())
+                            .defaultValue(StopMode.NEVER)
+                            .withDescription(
+                                    "Optional stop mode for CDC source, valid 
enumerations are "
+                                            + "\"never\", \"latest\", 
\"timestamp\"\n or \"specific\"");
 
     public static final Option<Long> STARTUP_TIMESTAMP =
             Options.key("startup.timestamp")
@@ -74,14 +86,6 @@ public class SourceOptions {
                     .defaultValue(1)
                     .withDescription("The number of parallel readers in the 
incremental phase.");
 
-    public static final Option<StopMode> STOP_MODE =
-            Options.key("stop.mode")
-                    .enumType(StopMode.class)
-                    .defaultValue(StopMode.NEVER)
-                    .withDescription(
-                            "Optional stop mode for CDC source, valid 
enumerations are "
-                                    + "\"never\", \"latest\", \"timestamp\"\n 
or \"specific\"");
-
     public static final Option<Long> STOP_TIMESTAMP =
             Options.key("stop.timestamp")
                     .longType()
@@ -122,16 +126,6 @@ public class SourceOptions {
                 .optional(STARTUP_MODE, STOP_MODE)
                 .optional(DEBEZIUM_PROPERTIES)
                 .conditional(STARTUP_MODE, StartupMode.TIMESTAMP, 
STARTUP_TIMESTAMP)
-                .conditional(
-                        STARTUP_MODE,
-                        StartupMode.SPECIFIC,
-                        STARTUP_SPECIFIC_OFFSET_FILE,
-                        STARTUP_SPECIFIC_OFFSET_POS)
-                .conditional(STOP_MODE, StopMode.TIMESTAMP, STOP_TIMESTAMP)
-                .conditional(
-                        STOP_MODE,
-                        StopMode.SPECIFIC,
-                        STOP_SPECIFIC_OFFSET_FILE,
-                        STOP_SPECIFIC_OFFSET_POS);
+                .conditional(STOP_MODE, StopMode.TIMESTAMP, STOP_TIMESTAMP);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index dbad777cc..a11014afa 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -120,7 +121,7 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
 
     protected StartupConfig getStartupConfig(ReadonlyConfig config) {
         return new StartupConfig(
-                config.get(SourceOptions.STARTUP_MODE),
+                config.get((Option<StartupMode>) SourceOptions.STARTUP_MODE),
                 config.get(SourceOptions.STARTUP_SPECIFIC_OFFSET_FILE),
                 config.get(SourceOptions.STARTUP_SPECIFIC_OFFSET_POS),
                 config.get(SourceOptions.STARTUP_TIMESTAMP));
@@ -128,7 +129,7 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
 
     private StopConfig getStopConfig(ReadonlyConfig config) {
         return new StopConfig(
-                config.get(SourceOptions.STOP_MODE),
+                config.get((Option<StopMode>) SourceOptions.STOP_MODE),
                 config.get(SourceOptions.STOP_SPECIFIC_OFFSET_FILE),
                 config.get(SourceOptions.STOP_SPECIFIC_OFFSET_POS),
                 config.get(SourceOptions.STOP_TIMESTAMP));
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index a185921ca..4274488e9 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -32,11 +32,15 @@ import 
org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
 
 import com.google.auto.service.AutoService;
 
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -50,6 +54,14 @@ public class MySqlIncrementalSourceFactory implements 
TableSourceFactory, Suppor
 
     @Override
     public OptionRule optionRule() {
+        SourceOptions.STARTUP_MODE
+                .getOptionValues()
+                .addAll(
+                        Arrays.asList(
+                                StartupMode.INITIAL, StartupMode.EARLIEST, 
StartupMode.LATEST));
+
+        
SourceOptions.STOP_MODE.getOptionValues().addAll(Arrays.asList(StopMode.NEVER));
+
         return JdbcSourceOptions.getBaseRule()
                 .required(
                         JdbcSourceOptions.USERNAME,
@@ -63,6 +75,16 @@ public class MySqlIncrementalSourceFactory implements 
TableSourceFactory, Suppor
                         JdbcSourceOptions.CONNECT_TIMEOUT_MS,
                         JdbcSourceOptions.CONNECT_MAX_RETRIES,
                         JdbcSourceOptions.CONNECTION_POOL_SIZE)
+                .conditional(
+                        SourceOptions.STARTUP_MODE,
+                        StartupMode.SPECIFIC,
+                        SourceOptions.STARTUP_SPECIFIC_OFFSET_FILE,
+                        SourceOptions.STARTUP_SPECIFIC_OFFSET_POS)
+                .conditional(
+                        SourceOptions.STOP_MODE,
+                        StopMode.SPECIFIC,
+                        SourceOptions.STOP_SPECIFIC_OFFSET_FILE,
+                        SourceOptions.STOP_SPECIFIC_OFFSET_POS)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
index 01f79f5e7..d859493ac 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
@@ -22,6 +22,11 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
+import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
+
+import java.util.Arrays;
 
 public class SqlServerIncrementalSourceFactory implements TableSourceFactory {
 
@@ -32,6 +37,14 @@ public class SqlServerIncrementalSourceFactory implements 
TableSourceFactory {
 
     @Override
     public OptionRule optionRule() {
+        SourceOptions.STARTUP_MODE
+                .getOptionValues()
+                .addAll(
+                        Arrays.asList(
+                                StartupMode.INITIAL, StartupMode.EARLIEST, 
StartupMode.LATEST));
+
+        
SourceOptions.STOP_MODE.getOptionValues().addAll(Arrays.asList(StopMode.NEVER));
+
         return JdbcSourceOptions.getBaseRule()
                 .required(
                         JdbcSourceOptions.HOSTNAME,
@@ -45,6 +58,14 @@ public class SqlServerIncrementalSourceFactory implements 
TableSourceFactory {
                         JdbcSourceOptions.CONNECT_TIMEOUT_MS,
                         JdbcSourceOptions.CONNECT_MAX_RETRIES,
                         JdbcSourceOptions.CONNECTION_POOL_SIZE)
+                .conditional(
+                        SourceOptions.STARTUP_MODE,
+                        StartupMode.SPECIFIC,
+                        SourceOptions.STARTUP_SPECIFIC_OFFSET_POS)
+                .conditional(
+                        SourceOptions.STOP_MODE,
+                        StopMode.SPECIFIC,
+                        SourceOptions.STOP_SPECIFIC_OFFSET_POS)
                 .build();
     }
 

Reply via email to