This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4acd370d4 [Improve] Improve savemode api (#4767)
4acd370d4 is described below
commit 4acd370d488f8a7101591911a65e47b5ddb76249
Author: Eric <[email protected]>
AuthorDate: Mon Jul 17 19:05:48 2023 +0800
[Improve] Improve savemode api (#4767)
---
.../seatunnel/api/configuration/Options.java | 4 +-
.../api/configuration/SingleChoiceOption.java | 8 +++-
.../apache/seatunnel/api/sink/DataSaveMode.java | 4 ++
.../seatunnel/api/sink/SinkCommonOptions.java | 23 ----------
.../seatunnel/api/sink/SupportDataSaveMode.java | 51 +++-------------------
.../seatunnel/api/table/factory/FactoryUtil.java | 28 ------------
.../connectors/seatunnel/jdbc/sink/JdbcSink.java | 8 +---
.../seatunnel/starrocks/config/SinkConfig.java | 4 ++
.../starrocks/config/StarRocksSinkOptions.java | 12 +++++
.../seatunnel/starrocks/sink/StarRocksSink.java | 17 ++------
.../starrocks/sink/StarRocksSinkFactory.java | 4 +-
.../flink/execution/SinkExecuteProcessor.java | 8 +---
.../flink/execution/SinkExecuteProcessor.java | 8 +---
.../spark/execution/SinkExecuteProcessor.java | 8 +---
.../spark/execution/SinkExecuteProcessor.java | 8 +---
.../core/parse/MultipleTableJobConfigParser.java | 2 +-
16 files changed, 47 insertions(+), 150 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
index 432e931c2..a4ce408d7 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java
@@ -249,7 +249,7 @@ public class Options {
* @param value The default value for the config option
* @return The config option with the default value.
*/
- public Option<T> defaultValue(T value) {
+ public SingleChoiceOption<T> defaultValue(T value) {
return new SingleChoiceOption<T>(key, typeReference, optionValues,
value);
}
@@ -258,7 +258,7 @@ public class Options {
*
* @return The config option without a default value.
*/
- public Option<T> noDefaultValue() {
+ public SingleChoiceOption<T> noDefaultValue() {
return new SingleChoiceOption<T>(key, typeReference, optionValues,
null);
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/SingleChoiceOption.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/SingleChoiceOption.java
index fd3697f68..b3a6574e9 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/SingleChoiceOption.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/SingleChoiceOption.java
@@ -23,7 +23,7 @@ import lombok.Getter;
import java.util.List;
-public class SingleChoiceOption<T> extends Option {
+public class SingleChoiceOption<T> extends Option<T> {
@Getter private final List<T> optionValues;
@@ -32,4 +32,10 @@ public class SingleChoiceOption<T> extends Option {
super(key, typeReference, defaultValue);
this.optionValues = optionValues;
}
+
+ @Override
+ public SingleChoiceOption<T> withDescription(String description) {
+ this.description = description;
+ return this;
+ }
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java
index f269c9f2c..7ef849f61 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java
@@ -31,6 +31,10 @@ public enum DataSaveMode {
// path and files in the path, create new files in the path.
KEEP_SCHEMA_AND_DATA,
+ // The connector provides custom processing methods, such as running user
provided SQL or shell
+ // scripts, etc
+ CUSTOM_PROCESSING,
+
// Throw error when table is exists for MySQL. Throw error when path is
exists.
ERROR_WHEN_EXISTS
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
deleted file mode 100644
index 4bf320b49..000000000
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.sink;
-
-public class SinkCommonOptions {
-
- public static final String DATA_SAVE_MODE = "save_mode";
-}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportDataSaveMode.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportDataSaveMode.java
index 7d0c2838b..46ea2e70e 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportDataSaveMode.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportDataSaveMode.java
@@ -17,55 +17,16 @@
package org.apache.seatunnel.api.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-
-import java.util.List;
-import java.util.Locale;
-
/** The Sink Connectors which support data SaveMode should implement this
interface */
public interface SupportDataSaveMode {
-
- /**
- * We hope every sink connector use the same option name to config
SaveMode, So I add
- * checkOptions method to this interface. checkOptions method have a
default implement to check
- * whether `save_mode` parameter is in config.
- *
- * @param config config of sink Connector
- */
- default void checkOptions(Config config) {
- if (config.hasPath(SinkCommonOptions.DATA_SAVE_MODE)) {
- String tableSaveMode =
config.getString(SinkCommonOptions.DATA_SAVE_MODE);
- DataSaveMode dataSaveMode =
-
DataSaveMode.valueOf(tableSaveMode.toUpperCase(Locale.ROOT));
- if (!supportedDataSaveModeValues().contains(dataSaveMode)) {
- throw new SeaTunnelRuntimeException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- "This connector don't support save mode: " +
dataSaveMode);
- }
- }
- }
-
+ String SAVE_MODE_KEY = "savemode";
/**
- * Get the {@link DataSaveMode} that the user configured
+ * Return the value of DataSaveMode configured by user in the job config
file.
*
- * @return DataSaveMode
+ * @return
*/
- DataSaveMode getDataSaveMode();
+ DataSaveMode getUserConfigSaveMode();
- /**
- * Return the {@link DataSaveMode} list supported by this connector
- *
- * @return the list of supported data save modes
- */
- List<DataSaveMode> supportedDataSaveModeValues();
-
- /**
- * The implementation of specific logic according to different {@link
DataSaveMode}
- *
- * @param saveMode data save mode
- */
- void handleSaveMode(DataSaveMode saveMode);
+ /** The implementation of specific logic according to different {@link
DataSaveMode} */
+ void handleSaveMode(DataSaveMode userConfigSaveMode);
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 6ac939149..f30900269 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -18,21 +18,15 @@
package org.apache.seatunnel.api.table.factory;
import org.apache.seatunnel.api.common.CommonOptions;
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkCommonOptions;
-import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
@@ -289,28 +283,6 @@ public final class FactoryUtil {
if (sinkOptionRule == null) {
throw new FactoryException("sinkOptionRule can not be null");
}
-
- try {
- TableSink sink = factory.createSink(null);
- if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
- SupportDataSaveMode supportDataSaveModeSink =
(SupportDataSaveMode) sink;
- Option<DataSaveMode> saveMode =
- Options.key(SinkCommonOptions.DATA_SAVE_MODE)
- .singleChoice(
- DataSaveMode.class,
-
supportDataSaveModeSink.supportedDataSaveModeValues())
- .noDefaultValue()
- .withDescription("data save mode");
- OptionRule sinkCommonOptionRule =
OptionRule.builder().required(saveMode).build();
- sinkOptionRule
- .getOptionalOptions()
- .addAll(sinkCommonOptionRule.getOptionalOptions());
- }
- } catch (Exception e) {
- LOG.warn(
- "Add save mode option need sink connector support create
sink by TableSinkFactory");
- }
-
return sinkOptionRule;
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 4666eae1e..eec473512 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -52,7 +52,6 @@ import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -182,15 +181,10 @@ public class JdbcSink
}
@Override
- public DataSaveMode getDataSaveMode() {
+ public DataSaveMode getUserConfigSaveMode() {
return dataSaveMode;
}
- @Override
- public List<DataSaveMode> supportedDataSaveModeValues() {
- return Collections.singletonList(DataSaveMode.KEEP_SCHEMA_AND_DATA);
- }
-
@Override
public void handleSaveMode(DataSaveMode saveMode) {
if (catalogTable != null) {
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
index 91bfb9358..f5a2d0dc8 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.config;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
import lombok.Getter;
import lombok.Setter;
@@ -58,6 +59,8 @@ public class SinkConfig implements Serializable {
private String saveModeCreateTemplate;
+ private DataSaveMode dataSaveMode;
+
@Getter private final Map<String, Object> streamLoadProps = new
HashMap<>();
public static SinkConfig of(ReadonlyConfig config) {
@@ -89,6 +92,7 @@ public class SinkConfig implements Serializable {
config.getOptional(StarRocksSinkOptions.COLUMN_SEPARATOR)
.ifPresent(sinkConfig::setColumnSeparator);
sinkConfig.setLoadFormat(config.get(StarRocksSinkOptions.LOAD_FORMAT));
+ sinkConfig.setDataSaveMode(config.get(StarRocksSinkOptions.SAVE_MODE));
return sinkConfig;
}
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
index 4f87b690f..02918f0f9 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
@@ -19,8 +19,12 @@ package
org.apache.seatunnel.connectors.seatunnel.starrocks.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.SingleChoiceOption;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.StreamLoadFormat;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -133,4 +137,12 @@ public interface StarRocksSinkOptions {
.enumType(StreamLoadFormat.class)
.defaultValue(StreamLoadFormat.JSON)
.withDescription("");
+
+ SingleChoiceOption<DataSaveMode> SAVE_MODE =
+ Options.key(SupportDataSaveMode.SAVE_MODE_KEY)
+ .singleChoice(
+ DataSaveMode.class,
Arrays.asList(DataSaveMode.KEEP_SCHEMA_AND_DATA))
+ .defaultValue(DataSaveMode.KEEP_SCHEMA_AND_DATA)
+ .withDescription(
+ "Table structure and data processing methods that
already exist on the target end");
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index ae808a36e..54163bd6f 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -42,9 +42,6 @@ import org.apache.commons.lang3.StringUtils;
import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;
-import java.util.Collections;
-import java.util.List;
-
@NoArgsConstructor
@AutoService(SeaTunnelSink.class)
public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>
@@ -56,12 +53,11 @@ public class StarRocksSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
private CatalogTable catalogTable;
- public StarRocksSink(
- DataSaveMode dataSaveMode, SinkConfig sinkConfig, CatalogTable
catalogTable) {
- this.dataSaveMode = dataSaveMode;
+ public StarRocksSink(SinkConfig sinkConfig, CatalogTable catalogTable) {
this.sinkConfig = sinkConfig;
this.seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
this.catalogTable = catalogTable;
+ this.dataSaveMode = sinkConfig.getDataSaveMode();
}
@Override
@@ -77,7 +73,7 @@ public class StarRocksSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
if (StringUtils.isEmpty(sinkConfig.getTable()) && catalogTable !=
null) {
sinkConfig.setTable(catalogTable.getTableId().getTableName());
}
- dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
+ dataSaveMode = sinkConfig.getDataSaveMode();
}
private void autoCreateTable(String template) {
@@ -117,15 +113,10 @@ public class StarRocksSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
}
@Override
- public DataSaveMode getDataSaveMode() {
+ public DataSaveMode getUserConfigSaveMode() {
return dataSaveMode;
}
- @Override
- public List<DataSaveMode> supportedDataSaveModeValues() {
- return Collections.singletonList(DataSaveMode.KEEP_SCHEMA_AND_DATA);
- }
-
@Override
public void handleSaveMode(DataSaveMode saveMode) {
if (catalogTable != null) {
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 44a84c548..471be7001 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
@@ -56,6 +55,7 @@ public class StarRocksSinkFactory implements TableSinkFactory
{
StarRocksSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS,
StarRocksSinkOptions.STARROCKS_CONFIG,
StarRocksSinkOptions.ENABLE_UPSERT_DELETE,
+ StarRocksSinkOptions.SAVE_MODE,
StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE)
.build();
}
@@ -67,6 +67,6 @@ public class StarRocksSinkFactory implements TableSinkFactory
{
if (StringUtils.isBlank(sinkConfig.getTable())) {
sinkConfig.setTable(catalogTable.getTableId().getTableName());
}
- return () -> new StarRocksSink(DataSaveMode.KEEP_SCHEMA_AND_DATA,
sinkConfig, catalogTable);
+ return () -> new StarRocksSink(sinkConfig, catalogTable);
}
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index c23225538..03bd2077e 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -84,12 +84,6 @@ public class SinkExecuteProcessor
pluginIdentifier);
seaTunnelSink.prepare(sinkConfig);
seaTunnelSink.setJobContext(jobContext);
- if
(SupportDataSaveMode.class.isAssignableFrom(
- seaTunnelSink.getClass())) {
- SupportDataSaveMode saveModeSink =
- (SupportDataSaveMode)
seaTunnelSink;
- saveModeSink.checkOptions(sinkConfig);
- }
return seaTunnelSink;
})
.distinct()
@@ -111,7 +105,7 @@ public class SinkExecuteProcessor
(SeaTunnelRowType)
TypeConverterUtils.convert(stream.getType()));
if
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
SupportDataSaveMode saveModeSink = (SupportDataSaveMode)
seaTunnelSink;
- DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
+ DataSaveMode dataSaveMode =
saveModeSink.getUserConfigSaveMode();
saveModeSink.handleSaveMode(dataSaveMode);
}
DataStreamSink<Row> dataStreamSink =
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index d8fa8eedd..ca9a05f63 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -85,12 +85,6 @@ public class SinkExecuteProcessor
pluginIdentifier);
seaTunnelSink.prepare(sinkConfig);
seaTunnelSink.setJobContext(jobContext);
- if
(SupportDataSaveMode.class.isAssignableFrom(
- seaTunnelSink.getClass())) {
- SupportDataSaveMode saveModeSink =
- (SupportDataSaveMode)
seaTunnelSink;
- saveModeSink.checkOptions(sinkConfig);
- }
return seaTunnelSink;
})
.distinct()
@@ -112,7 +106,7 @@ public class SinkExecuteProcessor
(SeaTunnelRowType)
TypeConverterUtils.convert(stream.getType()));
if
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
SupportDataSaveMode saveModeSink = (SupportDataSaveMode)
seaTunnelSink;
- DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
+ DataSaveMode dataSaveMode =
saveModeSink.getUserConfigSaveMode();
saveModeSink.handleSaveMode(dataSaveMode);
}
DataStreamSink<Row> dataStreamSink =
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 61f3fb07b..503f76b87 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -75,12 +75,6 @@ public class SinkExecuteProcessor
pluginIdentifier);
seaTunnelSink.prepare(sinkConfig);
seaTunnelSink.setJobContext(jobContext);
- if
(SupportDataSaveMode.class.isAssignableFrom(
- seaTunnelSink.getClass())) {
- SupportDataSaveMode saveModeSink =
- (SupportDataSaveMode)
seaTunnelSink;
- saveModeSink.checkOptions(sinkConfig);
- }
return seaTunnelSink;
})
.distinct()
@@ -115,7 +109,7 @@ public class SinkExecuteProcessor
(SeaTunnelRowType)
TypeConverterUtils.convert(dataset.schema()));
if
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
SupportDataSaveMode saveModeSink = (SupportDataSaveMode)
seaTunnelSink;
- DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
+ DataSaveMode dataSaveMode =
saveModeSink.getUserConfigSaveMode();
saveModeSink.handleSaveMode(dataSaveMode);
}
SparkSinkInjector.inject(dataset.write(), seaTunnelSink)
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index f4d3c0b15..8afffe1ad 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -76,12 +76,6 @@ public class SinkExecuteProcessor
pluginIdentifier);
seaTunnelSink.prepare(sinkConfig);
seaTunnelSink.setJobContext(jobContext);
- if
(SupportDataSaveMode.class.isAssignableFrom(
- seaTunnelSink.getClass())) {
- SupportDataSaveMode saveModeSink =
- (SupportDataSaveMode)
seaTunnelSink;
- saveModeSink.checkOptions(sinkConfig);
- }
return seaTunnelSink;
})
.distinct()
@@ -116,7 +110,7 @@ public class SinkExecuteProcessor
(SeaTunnelRowType)
TypeConverterUtils.convert(dataset.schema()));
if
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
SupportDataSaveMode saveModeSink = (SupportDataSaveMode)
seaTunnelSink;
- DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
+ DataSaveMode dataSaveMode =
saveModeSink.getUserConfigSaveMode();
saveModeSink.handleSaveMode(dataSaveMode);
}
SparkSinkInjector.inject(dataset.write(), seaTunnelSink)
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 5fd4892cd..faf178e1b 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -616,7 +616,7 @@ public class MultipleTableJobConfigParser {
public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;
- DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
+ DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode();
saveModeSink.handleSaveMode(dataSaveMode);
}
}