This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 86cba87450 [Improve] Add default implement for 
`SeaTunnelSink::setTypeInfo` (#5682)
86cba87450 is described below

commit 86cba87450d04eb1ec4e2ae262848b8d72af02b4
Author: Jia Fan <[email protected]>
AuthorDate: Fri Oct 27 15:05:18 2023 +0800

    [Improve] Add default implement for `SeaTunnelSink::setTypeInfo` (#5682)
---
 .../apache/seatunnel/api/sink/SeaTunnelSink.java   |  4 +-
 .../api/table/factory/TableSinkFactory.java        |  2 +-
 .../seatunnel/assertion/sink/AssertSink.java       | 49 +---------------------
 .../common/multitablesink/MultiTableSink.java      | 24 +----------
 .../seatunnel/console/sink/ConsoleSink.java        | 25 ++---------
 .../connectors/seatunnel/jdbc/sink/JdbcSink.java   | 44 +++----------------
 .../connectors/seatunnel/kafka/sink/KafkaSink.java | 25 +----------
 .../seatunnel/starrocks/sink/StarRocksSink.java    | 35 ++--------------
 .../connectors/seatunnel/jdbc/JdbcMysqlIT.java     |  5 ---
 .../apache/seatunnel/engine/server/TestUtils.java  | 11 ++++-
 .../server/checkpoint/CheckpointPlanTest.java      | 11 ++++-
 .../seatunnel/engine/server/dag/TaskTest.java      | 10 ++++-
 12 files changed, 50 insertions(+), 195 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index 352c2e9d48..913d644958 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -57,7 +57,9 @@ public interface SeaTunnelSink<IN, StateT, CommitInfoT, 
AggregatedCommitInfoT>
      * @param seaTunnelRowType The row type info of sink.
      */
     @Deprecated
-    void setTypeInfo(SeaTunnelRowType seaTunnelRowType);
+    default void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        throw new UnsupportedOperationException("setTypeInfo method is not 
supported");
+    }
 
     /**
      * Get the data type of the records consumed by this sink.
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
index 2fca039e7d..97fba1f256 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
@@ -34,7 +34,7 @@ public interface TableSinkFactory<IN, StateT, CommitInfoT, 
AggregatedCommitInfoT
      * We will never use this method now. So gave a default implement and 
return null.
      *
      * @param context TableFactoryContext
-     * @return
+     * @return return the sink created by this factory
      */
     default TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> 
createSink(
             TableSinkFactoryContext context) {
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index d65398c9de..a312159929 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -22,7 +22,6 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigException;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.table.catalog.CatalogOptions;
@@ -39,7 +38,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 
 import org.apache.commons.collections4.CollectionUtils;
 
-import com.google.auto.service.AutoService;
 import com.google.common.base.Throwables;
 
 import java.util.ArrayList;
@@ -50,21 +48,15 @@ import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertCon
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.ROW_RULES;
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULES;
 
-@AutoService(SeaTunnelSink.class)
 public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void>
         implements SupportMultiTableSink {
     private SeaTunnelRowType seaTunnelRowType;
-    private CatalogTable catalogTable;
     private List<AssertFieldRule> assertFieldRules;
     private List<AssertFieldRule.AssertRule> assertRowRules;
-    private AssertTableRule assertTableRule;
-
+    private final AssertTableRule assertTableRule;
     private AssertCatalogTableRule assertCatalogTableRule;
 
-    public AssertSink() {}
-
     public AssertSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
-        this.catalogTable = catalogTable;
         this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
         if (!pluginConfig.getOptional(RULES).isPresent()) {
             Throwables.propagateIfPossible(new 
ConfigException.Missing(RULES.key()));
@@ -105,11 +97,6 @@ public class AssertSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
         }
     }
 
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
-    }
-
     @Override
     public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
         return seaTunnelRowType;
@@ -121,40 +108,6 @@ public class AssertSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
                 seaTunnelRowType, assertFieldRules, assertRowRules, 
assertTableRule);
     }
 
-    @Override
-    public void prepare(Config pluginConfig) {
-        if (!pluginConfig.hasPath(RULES.key())) {
-            Throwables.propagateIfPossible(new 
ConfigException.Missing(RULES.key()));
-        }
-        Config ruleConfig = pluginConfig.getConfig(RULES.key());
-        List<? extends Config> rowConfigList = null;
-        List<? extends Config> configList = null;
-        if (ruleConfig.hasPath(ROW_RULES)) {
-            rowConfigList = ruleConfig.getConfigList(ROW_RULES);
-            assertRowRules = new 
AssertRuleParser().parseRowRules(rowConfigList);
-        }
-        if (ruleConfig.hasPath(FIELD_RULES)) {
-            configList = ruleConfig.getConfigList(FIELD_RULES);
-            assertFieldRules = new AssertRuleParser().parseRules(configList);
-        }
-
-        if (ruleConfig.hasPath(CatalogOptions.TABLE_NAMES.key())) {
-            assertTableRule =
-                    new 
AssertTableRule(ruleConfig.getStringList(CatalogOptions.TABLE_NAMES.key()));
-        } else {
-            assertTableRule = new AssertTableRule(new ArrayList<>());
-        }
-
-        if (CollectionUtils.isEmpty(configList)
-                && CollectionUtils.isEmpty(rowConfigList)
-                && assertCatalogTableRule == null
-                && assertTableRule.getTableNames().isEmpty()) {
-            Throwables.propagateIfPossible(
-                    new ConfigException.BadValue(
-                            RULES.key(), "Assert rule config is empty, please 
add rule config."));
-        }
-    }
-
     @Override
     public String getPluginName() {
         return "Assert";
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
index 21f25f4728..864137bb8c 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
@@ -17,10 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.DataSaveMode;
@@ -33,10 +30,6 @@ import org.apache.seatunnel.api.sink.SupportDataSaveMode;
 import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-
-import com.google.auto.service.AutoService;
-import lombok.NoArgsConstructor;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -48,8 +41,6 @@ import java.util.stream.Collectors;
 
 import static org.apache.seatunnel.api.sink.DataSaveMode.KEEP_SCHEMA_AND_DATA;
 
-@AutoService(SeaTunnelSink.class)
-@NoArgsConstructor
 public class MultiTableSink
         implements SeaTunnelSink<
                         SeaTunnelRow,
@@ -58,8 +49,8 @@ public class MultiTableSink
                         MultiTableAggregatedCommitInfo>,
                 SupportDataSaveMode {
 
-    private Map<String, SeaTunnelSink> sinks;
-    private int replicaNum;
+    private final Map<String, SeaTunnelSink> sinks;
+    private final int replicaNum;
 
     public MultiTableSink(MultiTableFactoryContext context) {
         this.sinks = context.getSinks();
@@ -71,17 +62,6 @@ public class MultiTableSink
         return "MultiTableSink";
     }
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        throw new UnsupportedOperationException(
-                "Please use MultiTableSinkFactory to create MultiTableSink");
-    }
-
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        throw new UnsupportedOperationException("MultiTableSink only support 
CatalogTable");
-    }
-
     @Override
     public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
         throw new UnsupportedOperationException("MultiTableSink only support 
CatalogTable");
diff --git 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index dd0d960f36..7e3c3e5bbe 100644
--- 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++ 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -17,10 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.console.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -29,19 +26,14 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 
-import com.google.auto.service.AutoService;
-import lombok.NoArgsConstructor;
-
 import static 
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DATA;
 import static 
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DELAY;
 
-@NoArgsConstructor
-@AutoService(SeaTunnelSink.class)
 public class ConsoleSink extends AbstractSimpleSink<SeaTunnelRow, Void>
         implements SupportMultiTableSink {
-    private SeaTunnelRowType seaTunnelRowType;
-    private boolean isPrintData = true;
-    private int delayMs = 0;
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final boolean isPrintData;
+    private final int delayMs;
 
     public ConsoleSink(SeaTunnelRowType seaTunnelRowType, ReadonlyConfig 
options) {
         this.seaTunnelRowType = seaTunnelRowType;
@@ -49,11 +41,6 @@ public class ConsoleSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
         this.delayMs = options.get(LOG_PRINT_DELAY);
     }
 
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
-    }
-
     @Override
     public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
         return this.seaTunnelRowType;
@@ -68,10 +55,4 @@ public class ConsoleSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
     public String getPluginName() {
         return "Console";
     }
-
-    @Override
-    public void prepare(Config pluginConfig) {
-        this.isPrintData = 
ReadonlyConfig.fromConfig(pluginConfig).get(LOG_PRINT_DATA);
-        this.delayMs = 
ReadonlyConfig.fromConfig(pluginConfig).get(LOG_PRINT_DELAY);
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 76f691fbbf..826a08af0c 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -17,10 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
@@ -42,7 +39,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
@@ -51,8 +47,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcCatalogUtils;
 
 import org.apache.commons.lang3.StringUtils;
 
-import com.google.auto.service.AutoService;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -60,25 +54,24 @@ import java.util.Optional;
 
 import static 
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
 
-@AutoService(SeaTunnelSink.class)
 public class JdbcSink
         implements SeaTunnelSink<SeaTunnelRow, JdbcSinkState, XidInfo, 
JdbcAggregatedCommitInfo>,
                 SupportDataSaveMode,
                 SupportMultiTableSink {
 
-    private SeaTunnelRowType seaTunnelRowType;
+    private final SeaTunnelRowType seaTunnelRowType;
 
     private JobContext jobContext;
 
-    private JdbcSinkConfig jdbcSinkConfig;
+    private final JdbcSinkConfig jdbcSinkConfig;
 
-    private JdbcDialect dialect;
+    private final JdbcDialect dialect;
 
-    private ReadonlyConfig config;
+    private final ReadonlyConfig config;
 
-    private DataSaveMode dataSaveMode;
+    private final DataSaveMode dataSaveMode;
 
-    private CatalogTable catalogTable;
+    private final CatalogTable catalogTable;
 
     public JdbcSink(
             ReadonlyConfig config,
@@ -94,31 +87,11 @@ public class JdbcSink
         this.seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
     }
 
-    public JdbcSink() {}
-
     @Override
     public String getPluginName() {
         return "Jdbc";
     }
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        this.config = ReadonlyConfig.fromConfig(pluginConfig);
-        this.jdbcSinkConfig = JdbcSinkConfig.of(config);
-        this.dialect =
-                JdbcDialectLoader.load(
-                        jdbcSinkConfig.getJdbcConnectionConfig().getUrl(),
-                        
jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode(),
-                        config.get(JdbcOptions.FIELD_IDE) == null
-                                ? null
-                                : 
config.get(JdbcOptions.FIELD_IDE).getValue());
-        this.dialect.connectionUrlParse(
-                jdbcSinkConfig.getJdbcConnectionConfig().getUrl(),
-                jdbcSinkConfig.getJdbcConnectionConfig().getProperties(),
-                this.dialect.defaultParameter());
-        this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
-    }
-
     @Override
     public SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> createWriter(
             SinkWriter.Context context) {
@@ -167,11 +140,6 @@ public class JdbcSink
         return Optional.empty();
     }
 
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
-    }
-
     @Override
     public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
         return this.seaTunnelRowType;
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index cbd409f99c..43b6b853a2 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -17,11 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -34,9 +30,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedComm
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
 
-import com.google.auto.service.AutoService;
-import lombok.NoArgsConstructor;
-
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
@@ -45,32 +38,18 @@ import java.util.Optional;
  * Kafka Sink implementation by using SeaTunnel sink API. This class contains 
the method to create
  * {@link KafkaSinkWriter} and {@link KafkaSinkCommitter}.
  */
-@AutoService(SeaTunnelSink.class)
-@NoArgsConstructor
 public class KafkaSink
         implements SeaTunnelSink<
                 SeaTunnelRow, KafkaSinkState, KafkaCommitInfo, 
KafkaAggregatedCommitInfo> {
 
-    private ReadonlyConfig pluginConfig;
-    private SeaTunnelRowType seaTunnelRowType;
+    private final ReadonlyConfig pluginConfig;
+    private final SeaTunnelRowType seaTunnelRowType;
 
     public KafkaSink(ReadonlyConfig pluginConfig, SeaTunnelRowType rowType) {
         this.pluginConfig = pluginConfig;
         this.seaTunnelRowType = rowType;
     }
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
-                .validate(new KafkaSinkFactory().optionRule());
-        this.pluginConfig = ReadonlyConfig.fromConfig(pluginConfig);
-    }
-
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
-    }
-
     @Override
     public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
         return this.seaTunnelRowType;
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index 54163bd6f1..9aaef3ada4 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -17,13 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.sink.DataSaveMode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportDataSaveMode;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -37,21 +31,14 @@ import 
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCata
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalogFactory;
 import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
 
-import org.apache.commons.lang3.StringUtils;
-
-import com.google.auto.service.AutoService;
-import lombok.NoArgsConstructor;
-
-@NoArgsConstructor
-@AutoService(SeaTunnelSink.class)
 public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>
         implements SupportDataSaveMode {
 
     private SeaTunnelRowType seaTunnelRowType;
-    private SinkConfig sinkConfig;
-    private DataSaveMode dataSaveMode;
+    private final SinkConfig sinkConfig;
+    private final DataSaveMode dataSaveMode;
 
-    private CatalogTable catalogTable;
+    private final CatalogTable catalogTable;
 
     public StarRocksSink(SinkConfig sinkConfig, CatalogTable catalogTable) {
         this.sinkConfig = sinkConfig;
@@ -65,17 +52,6 @@ public class StarRocksSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
         return StarRocksCatalogFactory.IDENTIFIER;
     }
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
-                .validate(new StarRocksCatalogFactory().optionRule());
-        sinkConfig = SinkConfig.of(ReadonlyConfig.fromConfig(pluginConfig));
-        if (StringUtils.isEmpty(sinkConfig.getTable()) && catalogTable != 
null) {
-            sinkConfig.setTable(catalogTable.getTableId().getTableName());
-        }
-        dataSaveMode = sinkConfig.getDataSaveMode();
-    }
-
     private void autoCreateTable(String template) {
         StarRocksCatalog starRocksCatalog =
                 new StarRocksCatalog(
@@ -97,11 +73,6 @@ public class StarRocksSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
         }
     }
 
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
-    }
-
     @Override
     public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
         return this.seaTunnelRowType;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
index 7902627e08..75c2b9324f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
@@ -26,9 +26,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import org.apache.seatunnel.common.utils.ReflectionUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
@@ -507,9 +505,6 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
 
     private Properties getSinkProperties(JdbcSink jdbcSink)
             throws IOException, SQLException, ClassNotFoundException {
-        jdbcSink.setTypeInfo(
-                new SeaTunnelRowType(
-                        new String[] {"id"}, new SeaTunnelDataType<?>[] 
{BasicType.INT_TYPE}));
         JdbcSinkWriter jdbcSinkWriter = (JdbcSinkWriter) 
jdbcSink.createWriter(null);
         JdbcConnectionProvider connectionProvider =
                 (JdbcConnectionProvider)
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
index 6de34c3366..21177a52d2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
@@ -22,6 +22,9 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
@@ -45,6 +48,7 @@ import com.google.common.collect.Sets;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Set;
 
@@ -73,7 +77,12 @@ public class TestUtils {
         fake.setParallelism(3);
         LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 3);
 
-        ConsoleSink consoleSink = new ConsoleSink();
+        ConsoleSink consoleSink =
+                new ConsoleSink(
+                        new SeaTunnelRowType(
+                                new String[] {"id"},
+                                new SeaTunnelDataType<?>[] 
{BasicType.INT_TYPE}),
+                        ReadonlyConfig.fromMap(new HashMap<>()));
         consoleSink.setJobContext(jobContext);
         Action console =
                 new SinkAction<>(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
index 77ff157bd1..9c1f385441 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
@@ -22,6 +22,9 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
 import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
@@ -47,6 +50,7 @@ import com.google.common.collect.ImmutableMap;
 import com.hazelcast.map.IMap;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Executors;
 
@@ -123,7 +127,12 @@ public class CheckpointPlanTest extends 
AbstractSeaTunnelServerTest {
         fake.setParallelism(parallelism);
         LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 
parallelism);
 
-        ConsoleSink consoleSink = new ConsoleSink();
+        ConsoleSink consoleSink =
+                new ConsoleSink(
+                        new SeaTunnelRowType(
+                                new String[] {"id"},
+                                new SeaTunnelDataType<?>[] 
{BasicType.INT_TYPE}),
+                        ReadonlyConfig.fromMap(new HashMap<>()));
         consoleSink.setJobContext(jobContext);
         Action console =
                 new SinkAction<>(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index e7fb77e37a..cf6fac1b6f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -22,6 +22,9 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
 import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
@@ -53,6 +56,7 @@ import com.hazelcast.map.IMap;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.concurrent.Executors;
 
 public class TaskTest extends AbstractSeaTunnelServerTest {
@@ -110,7 +114,11 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
                 new SinkAction<>(
                         idGenerator.getNextId(),
                         "console",
-                        new ConsoleSink(),
+                        new ConsoleSink(
+                                new SeaTunnelRowType(
+                                        new String[] {"id"},
+                                        new SeaTunnelDataType<?>[] 
{BasicType.INT_TYPE}),
+                                ReadonlyConfig.fromMap(new HashMap<>())),
                         Sets.newHashSet(new URL("file:///console.jar")));
         LogicalVertex consoleVertex = new LogicalVertex(console.getId(), 
console, 2);
 

Reply via email to