This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4acd370d4 [Improve] Improve savemode api (#4767)
4acd370d4 is described below

commit 4acd370d488f8a7101591911a65e47b5ddb76249
Author: Eric <[email protected]>
AuthorDate: Mon Jul 17 19:05:48 2023 +0800

    [Improve] Improve savemode api (#4767)
---
 .../seatunnel/api/configuration/Options.java       |  4 +-
 .../api/configuration/SingleChoiceOption.java      |  8 +++-
 .../apache/seatunnel/api/sink/DataSaveMode.java    |  4 ++
 .../seatunnel/api/sink/SinkCommonOptions.java      | 23 ----------
 .../seatunnel/api/sink/SupportDataSaveMode.java    | 51 +++-------------------
 .../seatunnel/api/table/factory/FactoryUtil.java   | 28 ------------
 .../connectors/seatunnel/jdbc/sink/JdbcSink.java   |  8 +---
 .../seatunnel/starrocks/config/SinkConfig.java     |  4 ++
 .../starrocks/config/StarRocksSinkOptions.java     | 12 +++++
 .../seatunnel/starrocks/sink/StarRocksSink.java    | 17 ++------
 .../starrocks/sink/StarRocksSinkFactory.java       |  4 +-
 .../flink/execution/SinkExecuteProcessor.java      |  8 +---
 .../flink/execution/SinkExecuteProcessor.java      |  8 +---
 .../spark/execution/SinkExecuteProcessor.java      |  8 +---
 .../spark/execution/SinkExecuteProcessor.java      |  8 +---
 .../core/parse/MultipleTableJobConfigParser.java   |  2 +-
 16 files changed, 47 insertions(+), 150 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
index 432e931c2..a4ce408d7 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
@@ -249,7 +249,7 @@ public class Options {
          * @param value The default value for the config option
          * @return The config option with the default value.
          */
-        public Option<T> defaultValue(T value) {
+        public SingleChoiceOption<T> defaultValue(T value) {
             return new SingleChoiceOption<T>(key, typeReference, optionValues, 
value);
         }
 
@@ -258,7 +258,7 @@ public class Options {
          *
          * @return The config option without a default value.
          */
-        public Option<T> noDefaultValue() {
+        public SingleChoiceOption<T> noDefaultValue() {
             return new SingleChoiceOption<T>(key, typeReference, optionValues, 
null);
         }
     }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/SingleChoiceOption.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/SingleChoiceOption.java
index fd3697f68..b3a6574e9 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/SingleChoiceOption.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/SingleChoiceOption.java
@@ -23,7 +23,7 @@ import lombok.Getter;
 
 import java.util.List;
 
-public class SingleChoiceOption<T> extends Option {
+public class SingleChoiceOption<T> extends Option<T> {
 
     @Getter private final List<T> optionValues;
 
@@ -32,4 +32,10 @@ public class SingleChoiceOption<T> extends Option {
         super(key, typeReference, defaultValue);
         this.optionValues = optionValues;
     }
+
+    @Override
+    public SingleChoiceOption<T> withDescription(String description) {
+        this.description = description;
+        return this;
+    }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java
index f269c9f2c..7ef849f61 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java
@@ -31,6 +31,10 @@ public enum DataSaveMode {
     // path and files in the path, create new files in the path.
     KEEP_SCHEMA_AND_DATA,
 
+    // The connector provides custom processing methods, such as running user 
provided SQL or shell
+    // scripts, etc
+    CUSTOM_PROCESSING,
+
     // Throw error when table is exists for MySQL. Throw error when path is 
exists.
     ERROR_WHEN_EXISTS
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
deleted file mode 100644
index 4bf320b49..000000000
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.sink;
-
-public class SinkCommonOptions {
-
-    public static final String DATA_SAVE_MODE = "save_mode";
-}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportDataSaveMode.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportDataSaveMode.java
index 7d0c2838b..46ea2e70e 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportDataSaveMode.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportDataSaveMode.java
@@ -17,55 +17,16 @@
 
 package org.apache.seatunnel.api.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-
-import java.util.List;
-import java.util.Locale;
-
 /** The Sink Connectors which support data SaveMode should implement this 
interface */
 public interface SupportDataSaveMode {
-
-    /**
-     * We hope every sink connector use the same option name to config 
SaveMode, So I add
-     * checkOptions method to this interface. checkOptions method have a 
default implement to check
-     * whether `save_mode` parameter is in config.
-     *
-     * @param config config of sink Connector
-     */
-    default void checkOptions(Config config) {
-        if (config.hasPath(SinkCommonOptions.DATA_SAVE_MODE)) {
-            String tableSaveMode = 
config.getString(SinkCommonOptions.DATA_SAVE_MODE);
-            DataSaveMode dataSaveMode =
-                    
DataSaveMode.valueOf(tableSaveMode.toUpperCase(Locale.ROOT));
-            if (!supportedDataSaveModeValues().contains(dataSaveMode)) {
-                throw new SeaTunnelRuntimeException(
-                        SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                        "This connector don't support save mode: " + 
dataSaveMode);
-            }
-        }
-    }
-
+    String SAVE_MODE_KEY = "savemode";
     /**
-     * Get the {@link DataSaveMode} that the user configured
+     * Return the value of DataSaveMode configured by user in the job config 
file.
      *
-     * @return DataSaveMode
+     * @return
      */
-    DataSaveMode getDataSaveMode();
+    DataSaveMode getUserConfigSaveMode();
 
-    /**
-     * Return the {@link DataSaveMode} list supported by this connector
-     *
-     * @return the list of supported data save modes
-     */
-    List<DataSaveMode> supportedDataSaveModeValues();
-
-    /**
-     * The implementation of specific logic according to different {@link 
DataSaveMode}
-     *
-     * @param saveMode data save mode
-     */
-    void handleSaveMode(DataSaveMode saveMode);
+    /** The implementation of specific logic according to different {@link 
DataSaveMode} */
+    void handleSaveMode(DataSaveMode userConfigSaveMode);
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 6ac939149..f30900269 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -18,21 +18,15 @@
 package org.apache.seatunnel.api.table.factory;
 
 import org.apache.seatunnel.api.common.CommonOptions;
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkCommonOptions;
-import org.apache.seatunnel.api.sink.SupportDataSaveMode;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SupportParallelism;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 
@@ -289,28 +283,6 @@ public final class FactoryUtil {
         if (sinkOptionRule == null) {
             throw new FactoryException("sinkOptionRule can not be null");
         }
-
-        try {
-            TableSink sink = factory.createSink(null);
-            if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
-                SupportDataSaveMode supportDataSaveModeSink = 
(SupportDataSaveMode) sink;
-                Option<DataSaveMode> saveMode =
-                        Options.key(SinkCommonOptions.DATA_SAVE_MODE)
-                                .singleChoice(
-                                        DataSaveMode.class,
-                                        
supportDataSaveModeSink.supportedDataSaveModeValues())
-                                .noDefaultValue()
-                                .withDescription("data save mode");
-                OptionRule sinkCommonOptionRule = 
OptionRule.builder().required(saveMode).build();
-                sinkOptionRule
-                        .getOptionalOptions()
-                        .addAll(sinkCommonOptionRule.getOptionalOptions());
-            }
-        } catch (Exception e) {
-            LOG.warn(
-                    "Add save mode option need sink connector support create 
sink by TableSinkFactory");
-        }
-
         return sinkOptionRule;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 4666eae1e..eec473512 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -52,7 +52,6 @@ import com.google.auto.service.AutoService;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -182,15 +181,10 @@ public class JdbcSink
     }
 
     @Override
-    public DataSaveMode getDataSaveMode() {
+    public DataSaveMode getUserConfigSaveMode() {
         return dataSaveMode;
     }
 
-    @Override
-    public List<DataSaveMode> supportedDataSaveModeValues() {
-        return Collections.singletonList(DataSaveMode.KEEP_SCHEMA_AND_DATA);
-    }
-
     @Override
     public void handleSaveMode(DataSaveMode saveMode) {
         if (catalogTable != null) {
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
index 91bfb9358..f5a2d0dc8 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.starrocks.config;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -58,6 +59,8 @@ public class SinkConfig implements Serializable {
 
     private String saveModeCreateTemplate;
 
+    private DataSaveMode dataSaveMode;
+
     @Getter private final Map<String, Object> streamLoadProps = new 
HashMap<>();
 
     public static SinkConfig of(ReadonlyConfig config) {
@@ -89,6 +92,7 @@ public class SinkConfig implements Serializable {
         config.getOptional(StarRocksSinkOptions.COLUMN_SEPARATOR)
                 .ifPresent(sinkConfig::setColumnSeparator);
         sinkConfig.setLoadFormat(config.get(StarRocksSinkOptions.LOAD_FORMAT));
+        sinkConfig.setDataSaveMode(config.get(StarRocksSinkOptions.SAVE_MODE));
         return sinkConfig;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
index 4f87b690f..02918f0f9 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
@@ -19,8 +19,12 @@ package 
org.apache.seatunnel.connectors.seatunnel.starrocks.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.SingleChoiceOption;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SupportDataSaveMode;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.StreamLoadFormat;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -133,4 +137,12 @@ public interface StarRocksSinkOptions {
                     .enumType(StreamLoadFormat.class)
                     .defaultValue(StreamLoadFormat.JSON)
                     .withDescription("");
+
+    SingleChoiceOption<DataSaveMode> SAVE_MODE =
+            Options.key(SupportDataSaveMode.SAVE_MODE_KEY)
+                    .singleChoice(
+                            DataSaveMode.class, 
Arrays.asList(DataSaveMode.KEEP_SCHEMA_AND_DATA))
+                    .defaultValue(DataSaveMode.KEEP_SCHEMA_AND_DATA)
+                    .withDescription(
+                            "Table structure and data processing methods that 
already exist on the target end");
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index ae808a36e..54163bd6f 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -42,9 +42,6 @@ import org.apache.commons.lang3.StringUtils;
 import com.google.auto.service.AutoService;
 import lombok.NoArgsConstructor;
 
-import java.util.Collections;
-import java.util.List;
-
 @NoArgsConstructor
 @AutoService(SeaTunnelSink.class)
 public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>
@@ -56,12 +53,11 @@ public class StarRocksSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
 
     private CatalogTable catalogTable;
 
-    public StarRocksSink(
-            DataSaveMode dataSaveMode, SinkConfig sinkConfig, CatalogTable 
catalogTable) {
-        this.dataSaveMode = dataSaveMode;
+    public StarRocksSink(SinkConfig sinkConfig, CatalogTable catalogTable) {
         this.sinkConfig = sinkConfig;
         this.seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
         this.catalogTable = catalogTable;
+        this.dataSaveMode = sinkConfig.getDataSaveMode();
     }
 
     @Override
@@ -77,7 +73,7 @@ public class StarRocksSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
         if (StringUtils.isEmpty(sinkConfig.getTable()) && catalogTable != 
null) {
             sinkConfig.setTable(catalogTable.getTableId().getTableName());
         }
-        dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
+        dataSaveMode = sinkConfig.getDataSaveMode();
     }
 
     private void autoCreateTable(String template) {
@@ -117,15 +113,10 @@ public class StarRocksSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
     }
 
     @Override
-    public DataSaveMode getDataSaveMode() {
+    public DataSaveMode getUserConfigSaveMode() {
         return dataSaveMode;
     }
 
-    @Override
-    public List<DataSaveMode> supportedDataSaveModeValues() {
-        return Collections.singletonList(DataSaveMode.KEEP_SCHEMA_AND_DATA);
-    }
-
     @Override
     public void handleSaveMode(DataSaveMode saveMode) {
         if (catalogTable != null) {
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 44a84c548..471be7001 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
@@ -56,6 +55,7 @@ public class StarRocksSinkFactory implements TableSinkFactory 
{
                         StarRocksSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS,
                         StarRocksSinkOptions.STARROCKS_CONFIG,
                         StarRocksSinkOptions.ENABLE_UPSERT_DELETE,
+                        StarRocksSinkOptions.SAVE_MODE,
                         StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE)
                 .build();
     }
@@ -67,6 +67,6 @@ public class StarRocksSinkFactory implements TableSinkFactory 
{
         if (StringUtils.isBlank(sinkConfig.getTable())) {
             sinkConfig.setTable(catalogTable.getTableId().getTableName());
         }
-        return () -> new StarRocksSink(DataSaveMode.KEEP_SCHEMA_AND_DATA, 
sinkConfig, catalogTable);
+        return () -> new StarRocksSink(sinkConfig, catalogTable);
     }
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index c23225538..03bd2077e 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -84,12 +84,6 @@ public class SinkExecuteProcessor
                                                             pluginIdentifier);
                                     seaTunnelSink.prepare(sinkConfig);
                                     seaTunnelSink.setJobContext(jobContext);
-                                    if 
(SupportDataSaveMode.class.isAssignableFrom(
-                                            seaTunnelSink.getClass())) {
-                                        SupportDataSaveMode saveModeSink =
-                                                (SupportDataSaveMode) 
seaTunnelSink;
-                                        saveModeSink.checkOptions(sinkConfig);
-                                    }
                                     return seaTunnelSink;
                                 })
                         .distinct()
@@ -111,7 +105,7 @@ public class SinkExecuteProcessor
                     (SeaTunnelRowType) 
TypeConverterUtils.convert(stream.getType()));
             if 
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
                 SupportDataSaveMode saveModeSink = (SupportDataSaveMode) 
seaTunnelSink;
-                DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
+                DataSaveMode dataSaveMode = 
saveModeSink.getUserConfigSaveMode();
                 saveModeSink.handleSaveMode(dataSaveMode);
             }
             DataStreamSink<Row> dataStreamSink =
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index d8fa8eedd..ca9a05f63 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -85,12 +85,6 @@ public class SinkExecuteProcessor
                                                             pluginIdentifier);
                                     seaTunnelSink.prepare(sinkConfig);
                                     seaTunnelSink.setJobContext(jobContext);
-                                    if 
(SupportDataSaveMode.class.isAssignableFrom(
-                                            seaTunnelSink.getClass())) {
-                                        SupportDataSaveMode saveModeSink =
-                                                (SupportDataSaveMode) 
seaTunnelSink;
-                                        saveModeSink.checkOptions(sinkConfig);
-                                    }
                                     return seaTunnelSink;
                                 })
                         .distinct()
@@ -112,7 +106,7 @@ public class SinkExecuteProcessor
                     (SeaTunnelRowType) 
TypeConverterUtils.convert(stream.getType()));
             if 
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
                 SupportDataSaveMode saveModeSink = (SupportDataSaveMode) 
seaTunnelSink;
-                DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
+                DataSaveMode dataSaveMode = 
saveModeSink.getUserConfigSaveMode();
                 saveModeSink.handleSaveMode(dataSaveMode);
             }
             DataStreamSink<Row> dataStreamSink =
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 61f3fb07b..503f76b87 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -75,12 +75,6 @@ public class SinkExecuteProcessor
                                                     pluginIdentifier);
                                     seaTunnelSink.prepare(sinkConfig);
                                     seaTunnelSink.setJobContext(jobContext);
-                                    if 
(SupportDataSaveMode.class.isAssignableFrom(
-                                            seaTunnelSink.getClass())) {
-                                        SupportDataSaveMode saveModeSink =
-                                                (SupportDataSaveMode) 
seaTunnelSink;
-                                        saveModeSink.checkOptions(sinkConfig);
-                                    }
                                     return seaTunnelSink;
                                 })
                         .distinct()
@@ -115,7 +109,7 @@ public class SinkExecuteProcessor
                     (SeaTunnelRowType) 
TypeConverterUtils.convert(dataset.schema()));
             if 
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
                 SupportDataSaveMode saveModeSink = (SupportDataSaveMode) 
seaTunnelSink;
-                DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
+                DataSaveMode dataSaveMode = 
saveModeSink.getUserConfigSaveMode();
                 saveModeSink.handleSaveMode(dataSaveMode);
             }
             SparkSinkInjector.inject(dataset.write(), seaTunnelSink)
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index f4d3c0b15..8afffe1ad 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -76,12 +76,6 @@ public class SinkExecuteProcessor
                                                     pluginIdentifier);
                                     seaTunnelSink.prepare(sinkConfig);
                                     seaTunnelSink.setJobContext(jobContext);
-                                    if 
(SupportDataSaveMode.class.isAssignableFrom(
-                                            seaTunnelSink.getClass())) {
-                                        SupportDataSaveMode saveModeSink =
-                                                (SupportDataSaveMode) 
seaTunnelSink;
-                                        saveModeSink.checkOptions(sinkConfig);
-                                    }
                                     return seaTunnelSink;
                                 })
                         .distinct()
@@ -116,7 +110,7 @@ public class SinkExecuteProcessor
                     (SeaTunnelRowType) 
TypeConverterUtils.convert(dataset.schema()));
             if 
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
                 SupportDataSaveMode saveModeSink = (SupportDataSaveMode) 
seaTunnelSink;
-                DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
+                DataSaveMode dataSaveMode = 
saveModeSink.getUserConfigSaveMode();
                 saveModeSink.handleSaveMode(dataSaveMode);
             }
             SparkSinkInjector.inject(dataset.write(), seaTunnelSink)
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 5fd4892cd..faf178e1b 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -616,7 +616,7 @@ public class MultipleTableJobConfigParser {
     public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
         if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
             SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;
-            DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
+            DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode();
             saveModeSink.handleSaveMode(dataSaveMode);
         }
     }

Reply via email to