This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 86cba87450 [Improve] Add default implement for
`SeaTunnelSink::setTypeInfo` (#5682)
86cba87450 is described below
commit 86cba87450d04eb1ec4e2ae262848b8d72af02b4
Author: Jia Fan <[email protected]>
AuthorDate: Fri Oct 27 15:05:18 2023 +0800
[Improve] Add default implement for `SeaTunnelSink::setTypeInfo` (#5682)
---
.../apache/seatunnel/api/sink/SeaTunnelSink.java | 4 +-
.../api/table/factory/TableSinkFactory.java | 2 +-
.../seatunnel/assertion/sink/AssertSink.java | 49 +---------------------
.../common/multitablesink/MultiTableSink.java | 24 +----------
.../seatunnel/console/sink/ConsoleSink.java | 25 ++---------
.../connectors/seatunnel/jdbc/sink/JdbcSink.java | 44 +++----------------
.../connectors/seatunnel/kafka/sink/KafkaSink.java | 25 +----------
.../seatunnel/starrocks/sink/StarRocksSink.java | 35 ++--------------
.../connectors/seatunnel/jdbc/JdbcMysqlIT.java | 5 ---
.../apache/seatunnel/engine/server/TestUtils.java | 11 ++++-
.../server/checkpoint/CheckpointPlanTest.java | 11 ++++-
.../seatunnel/engine/server/dag/TaskTest.java | 10 ++++-
12 files changed, 50 insertions(+), 195 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index 352c2e9d48..913d644958 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -57,7 +57,9 @@ public interface SeaTunnelSink<IN, StateT, CommitInfoT,
AggregatedCommitInfoT>
* @param seaTunnelRowType The row type info of sink.
*/
@Deprecated
- void setTypeInfo(SeaTunnelRowType seaTunnelRowType);
+ default void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ throw new UnsupportedOperationException("setTypeInfo method is not
supported");
+ }
/**
* Get the data type of the records consumed by this sink.
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
index 2fca039e7d..97fba1f256 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
@@ -34,7 +34,7 @@ public interface TableSinkFactory<IN, StateT, CommitInfoT,
AggregatedCommitInfoT
* We will never use this method now. So gave a default implement and
return null.
*
* @param context TableFactoryContext
- * @return
+ * @return return the sink created by this factory
*/
default TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
createSink(
TableSinkFactoryContext context) {
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index d65398c9de..a312159929 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -22,7 +22,6 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigException;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogOptions;
@@ -39,7 +38,6 @@ import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.commons.collections4.CollectionUtils;
-import com.google.auto.service.AutoService;
import com.google.common.base.Throwables;
import java.util.ArrayList;
@@ -50,21 +48,15 @@ import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertCon
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.ROW_RULES;
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULES;
-@AutoService(SeaTunnelSink.class)
public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportMultiTableSink {
private SeaTunnelRowType seaTunnelRowType;
- private CatalogTable catalogTable;
private List<AssertFieldRule> assertFieldRules;
private List<AssertFieldRule.AssertRule> assertRowRules;
- private AssertTableRule assertTableRule;
-
+ private final AssertTableRule assertTableRule;
private AssertCatalogTableRule assertCatalogTableRule;
- public AssertSink() {}
-
public AssertSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
- this.catalogTable = catalogTable;
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
if (!pluginConfig.getOptional(RULES).isPresent()) {
Throwables.propagateIfPossible(new
ConfigException.Missing(RULES.key()));
@@ -105,11 +97,6 @@ public class AssertSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
}
}
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
- }
-
@Override
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
return seaTunnelRowType;
@@ -121,40 +108,6 @@ public class AssertSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
seaTunnelRowType, assertFieldRules, assertRowRules,
assertTableRule);
}
- @Override
- public void prepare(Config pluginConfig) {
- if (!pluginConfig.hasPath(RULES.key())) {
- Throwables.propagateIfPossible(new
ConfigException.Missing(RULES.key()));
- }
- Config ruleConfig = pluginConfig.getConfig(RULES.key());
- List<? extends Config> rowConfigList = null;
- List<? extends Config> configList = null;
- if (ruleConfig.hasPath(ROW_RULES)) {
- rowConfigList = ruleConfig.getConfigList(ROW_RULES);
- assertRowRules = new
AssertRuleParser().parseRowRules(rowConfigList);
- }
- if (ruleConfig.hasPath(FIELD_RULES)) {
- configList = ruleConfig.getConfigList(FIELD_RULES);
- assertFieldRules = new AssertRuleParser().parseRules(configList);
- }
-
- if (ruleConfig.hasPath(CatalogOptions.TABLE_NAMES.key())) {
- assertTableRule =
- new
AssertTableRule(ruleConfig.getStringList(CatalogOptions.TABLE_NAMES.key()));
- } else {
- assertTableRule = new AssertTableRule(new ArrayList<>());
- }
-
- if (CollectionUtils.isEmpty(configList)
- && CollectionUtils.isEmpty(rowConfigList)
- && assertCatalogTableRule == null
- && assertTableRule.getTableNames().isEmpty()) {
- Throwables.propagateIfPossible(
- new ConfigException.BadValue(
- RULES.key(), "Assert rule config is empty, please
add rule config."));
- }
- }
-
@Override
public String getPluginName() {
return "Assert";
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
index 21f25f4728..864137bb8c 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
@@ -17,10 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.DataSaveMode;
@@ -33,10 +30,6 @@ import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-
-import com.google.auto.service.AutoService;
-import lombok.NoArgsConstructor;
import java.io.IOException;
import java.util.HashMap;
@@ -48,8 +41,6 @@ import java.util.stream.Collectors;
import static org.apache.seatunnel.api.sink.DataSaveMode.KEEP_SCHEMA_AND_DATA;
-@AutoService(SeaTunnelSink.class)
-@NoArgsConstructor
public class MultiTableSink
implements SeaTunnelSink<
SeaTunnelRow,
@@ -58,8 +49,8 @@ public class MultiTableSink
MultiTableAggregatedCommitInfo>,
SupportDataSaveMode {
- private Map<String, SeaTunnelSink> sinks;
- private int replicaNum;
+ private final Map<String, SeaTunnelSink> sinks;
+ private final int replicaNum;
public MultiTableSink(MultiTableFactoryContext context) {
this.sinks = context.getSinks();
@@ -71,17 +62,6 @@ public class MultiTableSink
return "MultiTableSink";
}
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- throw new UnsupportedOperationException(
- "Please use MultiTableSinkFactory to create MultiTableSink");
- }
-
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- throw new UnsupportedOperationException("MultiTableSink only support
CatalogTable");
- }
-
@Override
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
throw new UnsupportedOperationException("MultiTableSink only support
CatalogTable");
diff --git
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index dd0d960f36..7e3c3e5bbe 100644
---
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -17,10 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.console.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -29,19 +26,14 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import com.google.auto.service.AutoService;
-import lombok.NoArgsConstructor;
-
import static
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DATA;
import static
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DELAY;
-@NoArgsConstructor
-@AutoService(SeaTunnelSink.class)
public class ConsoleSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportMultiTableSink {
- private SeaTunnelRowType seaTunnelRowType;
- private boolean isPrintData = true;
- private int delayMs = 0;
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final boolean isPrintData;
+ private final int delayMs;
public ConsoleSink(SeaTunnelRowType seaTunnelRowType, ReadonlyConfig
options) {
this.seaTunnelRowType = seaTunnelRowType;
@@ -49,11 +41,6 @@ public class ConsoleSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
this.delayMs = options.get(LOG_PRINT_DELAY);
}
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
- }
-
@Override
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
return this.seaTunnelRowType;
@@ -68,10 +55,4 @@ public class ConsoleSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
public String getPluginName() {
return "Console";
}
-
- @Override
- public void prepare(Config pluginConfig) {
- this.isPrintData =
ReadonlyConfig.fromConfig(pluginConfig).get(LOG_PRINT_DATA);
- this.delayMs =
ReadonlyConfig.fromConfig(pluginConfig).get(LOG_PRINT_DELAY);
- }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 76f691fbbf..826a08af0c 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -17,10 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
@@ -42,7 +39,6 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
@@ -51,8 +47,6 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcCatalogUtils;
import org.apache.commons.lang3.StringUtils;
-import com.google.auto.service.AutoService;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -60,25 +54,24 @@ import java.util.Optional;
import static
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
-@AutoService(SeaTunnelSink.class)
public class JdbcSink
implements SeaTunnelSink<SeaTunnelRow, JdbcSinkState, XidInfo,
JdbcAggregatedCommitInfo>,
SupportDataSaveMode,
SupportMultiTableSink {
- private SeaTunnelRowType seaTunnelRowType;
+ private final SeaTunnelRowType seaTunnelRowType;
private JobContext jobContext;
- private JdbcSinkConfig jdbcSinkConfig;
+ private final JdbcSinkConfig jdbcSinkConfig;
- private JdbcDialect dialect;
+ private final JdbcDialect dialect;
- private ReadonlyConfig config;
+ private final ReadonlyConfig config;
- private DataSaveMode dataSaveMode;
+ private final DataSaveMode dataSaveMode;
- private CatalogTable catalogTable;
+ private final CatalogTable catalogTable;
public JdbcSink(
ReadonlyConfig config,
@@ -94,31 +87,11 @@ public class JdbcSink
this.seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
}
- public JdbcSink() {}
-
@Override
public String getPluginName() {
return "Jdbc";
}
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- this.config = ReadonlyConfig.fromConfig(pluginConfig);
- this.jdbcSinkConfig = JdbcSinkConfig.of(config);
- this.dialect =
- JdbcDialectLoader.load(
- jdbcSinkConfig.getJdbcConnectionConfig().getUrl(),
-
jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode(),
- config.get(JdbcOptions.FIELD_IDE) == null
- ? null
- :
config.get(JdbcOptions.FIELD_IDE).getValue());
- this.dialect.connectionUrlParse(
- jdbcSinkConfig.getJdbcConnectionConfig().getUrl(),
- jdbcSinkConfig.getJdbcConnectionConfig().getProperties(),
- this.dialect.defaultParameter());
- this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
- }
-
@Override
public SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> createWriter(
SinkWriter.Context context) {
@@ -167,11 +140,6 @@ public class JdbcSink
return Optional.empty();
}
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
- }
-
@Override
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
return this.seaTunnelRowType;
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index cbd409f99c..43b6b853a2 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -17,11 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -34,9 +30,6 @@ import
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedComm
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
-import com.google.auto.service.AutoService;
-import lombok.NoArgsConstructor;
-
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -45,32 +38,18 @@ import java.util.Optional;
* Kafka Sink implementation by using SeaTunnel sink API. This class contains
the method to create
* {@link KafkaSinkWriter} and {@link KafkaSinkCommitter}.
*/
-@AutoService(SeaTunnelSink.class)
-@NoArgsConstructor
public class KafkaSink
implements SeaTunnelSink<
SeaTunnelRow, KafkaSinkState, KafkaCommitInfo,
KafkaAggregatedCommitInfo> {
- private ReadonlyConfig pluginConfig;
- private SeaTunnelRowType seaTunnelRowType;
+ private final ReadonlyConfig pluginConfig;
+ private final SeaTunnelRowType seaTunnelRowType;
public KafkaSink(ReadonlyConfig pluginConfig, SeaTunnelRowType rowType) {
this.pluginConfig = pluginConfig;
this.seaTunnelRowType = rowType;
}
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
- .validate(new KafkaSinkFactory().optionRule());
- this.pluginConfig = ReadonlyConfig.fromConfig(pluginConfig);
- }
-
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
- }
-
@Override
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
return this.seaTunnelRowType;
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index 54163bd6f1..9aaef3ada4 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -17,13 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.sink.DataSaveMode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -37,21 +31,14 @@ import
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCata
import
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalogFactory;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
-import org.apache.commons.lang3.StringUtils;
-
-import com.google.auto.service.AutoService;
-import lombok.NoArgsConstructor;
-
-@NoArgsConstructor
-@AutoService(SeaTunnelSink.class)
public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportDataSaveMode {
private SeaTunnelRowType seaTunnelRowType;
- private SinkConfig sinkConfig;
- private DataSaveMode dataSaveMode;
+ private final SinkConfig sinkConfig;
+ private final DataSaveMode dataSaveMode;
- private CatalogTable catalogTable;
+ private final CatalogTable catalogTable;
public StarRocksSink(SinkConfig sinkConfig, CatalogTable catalogTable) {
this.sinkConfig = sinkConfig;
@@ -65,17 +52,6 @@ public class StarRocksSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
return StarRocksCatalogFactory.IDENTIFIER;
}
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
- .validate(new StarRocksCatalogFactory().optionRule());
- sinkConfig = SinkConfig.of(ReadonlyConfig.fromConfig(pluginConfig));
- if (StringUtils.isEmpty(sinkConfig.getTable()) && catalogTable !=
null) {
- sinkConfig.setTable(catalogTable.getTableId().getTableName());
- }
- dataSaveMode = sinkConfig.getDataSaveMode();
- }
-
private void autoCreateTable(String template) {
StarRocksCatalog starRocksCatalog =
new StarRocksCatalog(
@@ -97,11 +73,6 @@ public class StarRocksSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
}
}
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
- }
-
@Override
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
return this.seaTunnelRowType;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
index 7902627e08..75c2b9324f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
@@ -26,9 +26,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
@@ -507,9 +505,6 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
private Properties getSinkProperties(JdbcSink jdbcSink)
throws IOException, SQLException, ClassNotFoundException {
- jdbcSink.setTypeInfo(
- new SeaTunnelRowType(
- new String[] {"id"}, new SeaTunnelDataType<?>[]
{BasicType.INT_TYPE}));
JdbcSinkWriter jdbcSinkWriter = (JdbcSinkWriter)
jdbcSink.createWriter(null);
JdbcConnectionProvider connectionProvider =
(JdbcConnectionProvider)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
index 6de34c3366..21177a52d2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
@@ -22,6 +22,9 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
@@ -45,6 +48,7 @@ import com.google.common.collect.Sets;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Set;
@@ -73,7 +77,12 @@ public class TestUtils {
fake.setParallelism(3);
LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 3);
- ConsoleSink consoleSink = new ConsoleSink();
+ ConsoleSink consoleSink =
+ new ConsoleSink(
+ new SeaTunnelRowType(
+ new String[] {"id"},
+ new SeaTunnelDataType<?>[]
{BasicType.INT_TYPE}),
+ ReadonlyConfig.fromMap(new HashMap<>()));
consoleSink.setJobContext(jobContext);
Action console =
new SinkAction<>(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
index 77ff157bd1..9c1f385441 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
@@ -22,6 +22,9 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
@@ -47,6 +50,7 @@ import com.google.common.collect.ImmutableMap;
import com.hazelcast.map.IMap;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
@@ -123,7 +127,12 @@ public class CheckpointPlanTest extends
AbstractSeaTunnelServerTest {
fake.setParallelism(parallelism);
LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake,
parallelism);
- ConsoleSink consoleSink = new ConsoleSink();
+ ConsoleSink consoleSink =
+ new ConsoleSink(
+ new SeaTunnelRowType(
+ new String[] {"id"},
+ new SeaTunnelDataType<?>[]
{BasicType.INT_TYPE}),
+ ReadonlyConfig.fromMap(new HashMap<>()));
consoleSink.setJobContext(jobContext);
Action console =
new SinkAction<>(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index e7fb77e37a..cf6fac1b6f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -22,6 +22,9 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
@@ -53,6 +56,7 @@ import com.hazelcast.map.IMap;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collections;
+import java.util.HashMap;
import java.util.concurrent.Executors;
public class TaskTest extends AbstractSeaTunnelServerTest {
@@ -110,7 +114,11 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
new SinkAction<>(
idGenerator.getNextId(),
"console",
- new ConsoleSink(),
+ new ConsoleSink(
+ new SeaTunnelRowType(
+ new String[] {"id"},
+ new SeaTunnelDataType<?>[]
{BasicType.INT_TYPE}),
+ ReadonlyConfig.fromMap(new HashMap<>())),
Sets.newHashSet(new URL("file:///console.jar")));
LogicalVertex consoleVertex = new LogicalVertex(console.getId(),
console, 2);