This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8de7408100 [Improve] Remove use `SeaTunnelSink::getConsumedType` 
method and mark it as deprecated (#5755)
8de7408100 is described below

commit 8de74081002abcb36fee48faacb59843f7ff86c9
Author: Jia Fan <[email protected]>
AuthorDate: Tue Nov 7 10:31:25 2023 +0800

    [Improve] Remove use `SeaTunnelSink::getConsumedType` method and mark it as 
deprecated (#5755)
---
 .../org/apache/seatunnel/api/sink/SeaTunnelSink.java  |  6 +++++-
 .../amazondynamodb/sink/AmazonDynamoDBSink.java       |  6 ------
 .../seatunnel/amazonsqs/sink/AmazonSqsSink.java       |  6 ------
 .../seatunnel/assertion/sink/AssertSink.java          |  6 ------
 .../seatunnel/cassandra/sink/CassandraSink.java       |  6 ------
 .../clickhouse/sink/client/ClickhouseSink.java        |  6 ------
 .../clickhouse/sink/file/ClickhouseFileSink.java      |  6 ------
 .../common/multitablesink/MultiTableSink.java         |  6 ------
 .../seatunnel/console/sink/ConsoleSink.java           |  6 ------
 .../seatunnel/datahub/sink/DataHubSink.java           |  6 ------
 .../connectors/seatunnel/sink/DingTalkSink.java       |  6 ------
 .../seatunnel/connectors/doris/sink/DorisSink.java    |  6 ------
 .../elasticsearch/sink/ElasticsearchSink.java         |  6 ------
 .../connectors/seatunnel/email/sink/EmailSink.java    |  6 ------
 .../connectors/seatunnel/file/sink/BaseFileSink.java  |  6 ------
 .../google/firestore/sink/FirestoreSink.java          |  6 ------
 .../connectors/seatunnel/hbase/sink/HbaseSink.java    |  6 ------
 .../connectors/seatunnel/http/sink/HttpSink.java      |  6 ------
 .../seatunnel/influxdb/sink/InfluxDBSink.java         |  6 ------
 .../connectors/seatunnel/iotdb/sink/IoTDBSink.java    |  6 ------
 .../connectors/seatunnel/jdbc/sink/JdbcSink.java      |  6 ------
 .../connectors/seatunnel/kafka/sink/KafkaSink.java    |  6 ------
 .../connectors/seatunnel/kudu/sink/KuduSink.java      |  6 ------
 .../seatunnel/maxcompute/sink/MaxcomputeSink.java     |  6 ------
 .../seatunnel/mongodb/sink/MongodbSink.java           |  6 ------
 .../connectors/seatunnel/neo4j/sink/Neo4jSink.java    |  6 ------
 .../connectors/seatunnel/paimon/sink/PaimonSink.java  |  6 ------
 .../seatunnel/rabbitmq/sink/RabbitmqSink.java         |  6 ------
 .../connectors/seatunnel/redis/sink/RedisSink.java    |  6 ------
 .../seatunnel/rocketmq/sink/RocketMqSink.java         |  6 ------
 .../connectors/selectdb/sink/SelectDBSink.java        |  6 ------
 .../connectors/seatunnel/sentry/sink/SentrySink.java  |  6 ------
 .../connectors/seatunnel/slack/sink/SlackSink.java    |  6 ------
 .../connectors/seatunnel/socket/sink/SocketSink.java  |  6 ------
 .../seatunnel/starrocks/sink/StarRocksSink.java       |  6 ------
 .../seatunnel/tablestore/sink/TablestoreSink.java     |  6 ------
 .../seatunnel/tdengine/sink/TDengineSink.java         |  6 ------
 .../starter/flink/execution/SinkExecuteProcessor.java |  4 +++-
 .../starter/flink/execution/SinkExecuteProcessor.java |  4 +++-
 .../starter/spark/execution/SinkExecuteProcessor.java |  2 +-
 .../starter/spark/execution/SinkExecuteProcessor.java |  2 +-
 .../connector/ConnectorSpecificationCheckTest.java    |  9 ++++++++-
 .../seatunnel/translation/flink/sink/FlinkSink.java   | 13 ++++++++++---
 .../seatunnel/translation/spark/sink/SparkSink.java   | 19 ++++++++++++++++---
 .../translation/spark/sink/SparkSinkInjector.java     | 17 +++++++++++++----
 .../spark/sink/writer/SparkDataSourceWriter.java      |  9 +++++++--
 .../spark/sink/writer/SparkDataWriterFactory.java     |  9 +++++++--
 .../spark/sink/writer/SparkStreamWriter.java          |  6 ++++--
 .../translation/spark/sink/SeaTunnelBatchWrite.java   |  9 +++++++--
 .../translation/spark/sink/SeaTunnelSinkTable.java    | 16 +++++++++++++---
 .../translation/spark/sink/SparkSinkInjector.java     | 17 +++++++++++++----
 .../sink/write/SeaTunnelSparkDataWriterFactory.java   |  8 ++++++--
 .../translation/spark/sink/write/SeaTunnelWrite.java  | 10 +++++++---
 .../spark/sink/write/SeaTunnelWriteBuilder.java       |  8 ++++++--
 54 files changed, 130 insertions(+), 254 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index 913d644958..cd869a3ca8 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -64,9 +64,13 @@ public interface SeaTunnelSink<IN, StateT, CommitInfoT, 
AggregatedCommitInfoT>
     /**
      * Get the data type of the records consumed by this sink.
      *
+     * @deprecated instead by {@link 
org.apache.seatunnel.api.table.factory.Factory}
      * @return SeaTunnel data type.
      */
-    SeaTunnelDataType<IN> getConsumedType();
+    @Deprecated
+    default SeaTunnelDataType<IN> getConsumedType() {
+        throw new UnsupportedOperationException("getConsumedType method is not 
supported");
+    }
 
     /**
      * This method will be called to creat {@link SinkWriter}
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
index 694a1f6929..f26b388cd7 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -81,11 +80,6 @@ public class AmazonDynamoDBSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.rowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return rowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSink.java
 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSink.java
index 3dc12b2168..04cbdba2a5 100644
--- 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSink.java
+++ 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSink.java
@@ -19,7 +19,6 @@ package 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.sink;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -41,11 +40,6 @@ public class AmazonSqsSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.pluginConfig = pluginConfig;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return typeInfo;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index a312159929..6a93f83abc 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -26,7 +26,6 @@ import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertCatalogTableRule;
@@ -97,11 +96,6 @@ public class AssertSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
         }
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return seaTunnelRowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) {
         return new AssertSinkWriter(
diff --git 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
index 748280e66a..7614991333 100644
--- 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
+++ 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -117,11 +116,6 @@ public class CassandraSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index fe1b25e909..2aa545e676 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -26,7 +26,6 @@ import 
org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -243,9 +242,4 @@ public class ClickhouseSink
     public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
         this.option.setSeaTunnelRowType(seaTunnelRowType);
     }
-
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.option.getSeaTunnelRowType();
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index 762815ee04..7a56a0010e 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -27,7 +27,6 @@ import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -191,11 +190,6 @@ public class ClickhouseFileSink
         this.readerOption.setSeaTunnelRowType(seaTunnelRowType);
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.readerOption.getSeaTunnelRowType();
-    }
-
     @Override
     public SinkWriter<SeaTunnelRow, CKFileCommitInfo, ClickhouseSinkState> 
createWriter(
             SinkWriter.Context context) throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
index 18165b3342..7abb176117 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
@@ -26,7 +26,6 @@ import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import java.io.IOException;
@@ -58,11 +57,6 @@ public class MultiTableSink
         return "MultiTableSink";
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        throw new UnsupportedOperationException("MultiTableSink only support 
CatalogTable");
-    }
-
     @Override
     public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> 
createWriter(
             SinkWriter.Context context) throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index 7e3c3e5bbe..d26c819695 100644
--- 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++ 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -20,7 +20,6 @@ package 
org.apache.seatunnel.connectors.seatunnel.console.sink;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSink;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -41,11 +40,6 @@ public class ConsoleSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
         this.delayMs = options.get(LOG_PRINT_DELAY);
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) {
         return new ConsoleSinkWriter(seaTunnelRowType, context, isPrintData, 
delayMs);
diff --git 
a/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSink.java
 
b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSink.java
index cf8eafcf6e..8eeffb3aca 100644
--- 
a/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSink.java
+++ 
b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter.Context;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -82,11 +81,6 @@ public class DataHubSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(Context 
context) throws IOException {
         return new DataHubWriter(
diff --git 
a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
 
b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
index a078e23856..8d9d4b973e 100644
--- 
a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
+++ 
b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter.Context;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -69,11 +68,6 @@ public class DingTalkSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(Context 
context) throws IOException {
         return new DingTalkWriter(
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
index 2c6d6ae742..861a93b8be 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
@@ -27,7 +27,6 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -90,11 +89,6 @@ public class DorisSink
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
-    }
-
     @Override
     public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> 
createWriter(
             SinkWriter.Context context) throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
index 4ef7a26eb9..f1ab596b24 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchAggregatedCommitInfo;
@@ -70,11 +69,6 @@ public class ElasticsearchSink
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
-    }
-
     @Override
     public SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, 
ElasticsearchSinkState> createWriter(
             SinkWriter.Context context) {
diff --git 
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
 
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
index c0e2c981f6..c1b8ffdd37 100644
--- 
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
+++ 
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -40,11 +39,6 @@ public class EmailSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) {
         return new EmailSinkWriter(seaTunnelRowType, pluginConfig);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
index 62b39a14a6..4d9244f779 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
@@ -26,7 +26,6 @@ import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
@@ -67,11 +66,6 @@ public abstract class BaseFileSink
         this.fileSystemUtils = new FileSystemUtils(hadoopConf);
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return seaTunnelRowType;
-    }
-
     @Override
     public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> 
restoreWriter(
             SinkWriter.Context context, List<FileSinkState> states) throws 
IOException {
diff --git 
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
 
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
index 32e475b6d1..ab7c02057d 100644
--- 
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
+++ 
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -72,11 +71,6 @@ public class FirestoreSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.rowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return rowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
index 78db280a6d..81452eb989 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -94,11 +93,6 @@ public class HbaseSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         }
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return seaTunnelRowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
index 767293b67b..d8a1c5fafe 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -89,11 +88,6 @@ public class HttpSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
index 0ba29fc95f..9cc03272d1 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -70,11 +69,6 @@ public class InfluxDBSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return seaTunnelRowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
index 96e59627c0..263ab04973 100644
--- 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -75,11 +74,6 @@ public class IoTDBSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) {
         return new IoTDBSinkWriter(pluginConfig, seaTunnelRowType);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index cbed80c7fe..4234503a09 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -33,7 +33,6 @@ import org.apache.seatunnel.api.sink.SupportSaveMode;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
@@ -146,11 +145,6 @@ public class JdbcSink
         return Optional.empty();
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
-    }
-
     @Override
     public Optional<Serializer<JdbcAggregatedCommitInfo>> 
getAggregatedCommitInfoSerializer() {
         if (jdbcSinkConfig.isExactlyOnce()) {
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index 43b6b853a2..e7945d9ed1 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedCommitInfo;
@@ -50,11 +49,6 @@ public class KafkaSink
         this.seaTunnelRowType = rowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
-    }
-
     @Override
     public SinkWriter<SeaTunnelRow, KafkaCommitInfo, KafkaSinkState> 
createWriter(
             SinkWriter.Context context) {
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
index 5ea299ac71..0c8494afe8 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
@@ -25,7 +25,6 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.constants.PluginType;
@@ -63,11 +62,6 @@ public class KuduSink
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
-    }
-
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
         ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
index 31c5ab0791..c5acadb173 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -58,11 +57,6 @@ public class MaxcomputeSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.typeInfo = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.typeInfo;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) {
         return new MaxcomputeWriter(this.pluginConfig);
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
index 160aa966a0..8c86b119f9 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
@@ -25,7 +25,6 @@ import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
@@ -115,11 +114,6 @@ public class MongodbSink
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return seaTunnelRowType;
-    }
-
     @Override
     public SinkWriter<SeaTunnelRow, MongodbCommitInfo, DocumentBulk> 
createWriter(
             SinkWriter.Context context) {
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
index 4ab0070d75..26f127509e 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkQueryInfo;
@@ -54,11 +53,6 @@ public class Neo4jSink implements 
SeaTunnelSink<SeaTunnelRow, Void, Void, Void>
         this.rowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.rowType;
-    }
-
     @Override
     public SinkWriter<SeaTunnelRow, Void, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index dccef82cdd..fec1a7d7ce 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -26,7 +26,6 @@ import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -124,11 +123,6 @@ public class PaimonSink
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return seaTunnelRowType;
-    }
-
     @Override
     public SinkWriter<SeaTunnelRow, PaimonCommitInfo, PaimonSinkState> 
createWriter(
             SinkWriter.Context context) throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
index 15dc55f30c..1dcd792404 100644
--- 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -84,11 +83,6 @@ public class RabbitmqSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType getConsumedType() {
-        return seaTunnelRowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
index 3b7cfee6a0..381576c6c9 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -75,11 +74,6 @@ public class RedisSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return seaTunnelRowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
index 04ea6bc3e1..bf81adf0a7 100644
--- 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -157,11 +156,6 @@ public class RocketMqSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
index c095f94011..f4ef83d0b2 100644
--- 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
+++ 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
@@ -27,7 +27,6 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -97,11 +96,6 @@ public class SelectDBSink
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
-    }
-
     @Override
     public SinkWriter<SeaTunnelRow, SelectDBCommitInfo, SelectDBSinkState> 
createWriter(
             SinkWriter.Context context) throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
 
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
index bd82f4b2ce..0a109dfa41 100644
--- 
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
+++ 
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.constants.PluginType;
@@ -68,11 +67,6 @@ public class SentrySink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
index 384d42ec86..91bee53eb5 100644
--- 
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
+++ 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -50,11 +49,6 @@ public class SlackSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
index 7ba1304ef8..222947f72e 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -71,11 +70,6 @@ public class SocketSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return seaTunnelRowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index ec3960783f..ee613dd972 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -76,11 +75,6 @@ public class StarRocksSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
         }
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) {
         return new StarRocksSinkWriter(sinkConfig, seaTunnelRowType);
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
index 145d242147..9a16e6aeb1 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -83,11 +82,6 @@ public class TablestoreSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.rowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return rowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
index 69bdcf85f0..82d1069f7c 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -41,11 +40,6 @@ public class TDengineSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowType;
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index b3db64cc15..195ffbb7b6 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -119,7 +119,9 @@ public class SinkExecuteProcessor
                 saveModeHandler.ifPresent(SaveModeHandler::handleSaveMode);
             }
             DataStreamSink<Row> dataStreamSink =
-                    stream.getDataStream().sinkTo(new 
FlinkSink<>(sink)).name(sink.getPluginName());
+                    stream.getDataStream()
+                            .sinkTo(new FlinkSink<>(sink, 
stream.getCatalogTable()))
+                            .name(sink.getPluginName());
             if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
                 int parallelism = 
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
                 dataStreamSink.setParallelism(parallelism);
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index ec15567833..310c0e4309 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -121,7 +121,9 @@ public class SinkExecuteProcessor
             }
             DataStreamSink<Row> dataStreamSink =
                     stream.getDataStream()
-                            .sinkTo(SinkV1Adapter.wrap(new FlinkSink<>(sink)))
+                            .sinkTo(
+                                    SinkV1Adapter.wrap(
+                                            new FlinkSink<>(sink, 
stream.getCatalogTable())))
                             .name(sink.getPluginName());
             if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
                 int parallelism = 
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 22e3a30ae3..cbcac81aff 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -140,7 +140,7 @@ public class SinkExecuteProcessor
                 Optional<SaveModeHandler> saveModeHandler = 
saveModeSink.getSaveModeHandler();
                 saveModeHandler.ifPresent(SaveModeHandler::handleSaveMode);
             }
-            SparkSinkInjector.inject(dataset.write(), sink)
+            SparkSinkInjector.inject(dataset.write(), sink, 
datasetTableInfo.getCatalogTable())
                     .option("checkpointLocation", "/tmp")
                     .save();
         }
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 760b917f2c..84d0f0ba8b 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -141,7 +141,7 @@ public class SinkExecuteProcessor
                 Optional<SaveModeHandler> saveModeHandler = 
saveModeSink.getSaveModeHandler();
                 saveModeHandler.ifPresent(SaveModeHandler::handleSaveMode);
             }
-            SparkSinkInjector.inject(dataset.write(), sink)
+            SparkSinkInjector.inject(dataset.write(), sink, 
datasetTableInfo.getCatalogTable())
                     .option("checkpointLocation", "/tmp")
                     .mode(SaveMode.Append)
                     .save();
diff --git 
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
 
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
index c05c8cc627..34fe44a1cb 100644
--- 
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
+++ 
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.utils.ReflectionUtils;
 
 import org.junit.jupiter.api.Assertions;
@@ -93,13 +94,19 @@ public class ConnectorSpecificationCheckTest {
                                                         
factoryName.replace("Factory", "")));
                 Optional<Method> prepare = 
ReflectionUtils.getDeclaredMethod(sinkClass, "prepare");
                 Optional<Method> setTypeInfo =
-                        ReflectionUtils.getDeclaredMethod(sinkClass, 
"setTypeInfo");
+                        ReflectionUtils.getDeclaredMethod(
+                                sinkClass, "setTypeInfo", 
SeaTunnelRowType.class);
+                Optional<Method> getConsumedType =
+                        ReflectionUtils.getDeclaredMethod(sinkClass, 
"getConsumedType");
                 Assertions.assertFalse(
                         prepare.isPresent(),
                         "Please remove `prepare` method in " + 
sinkClass.getSimpleName());
                 Assertions.assertFalse(
                         setTypeInfo.isPresent(),
                         "Please remove `setTypeInfo` method in " + 
sinkClass.getSimpleName());
+                Assertions.assertFalse(
+                        getConsumedType.isPresent(),
+                        "Please remove `getConsumedType` method in " + 
sinkClass.getSimpleName());
             }
         }
     }
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index 0b47243fbc..a2082fd012 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.translation.flink.sink;
 
 import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import 
org.apache.seatunnel.translation.flink.serialization.CommitWrapperSerializer;
 import 
org.apache.seatunnel.translation.flink.serialization.FlinkSimpleVersionedSerializer;
@@ -48,8 +49,13 @@ public class FlinkSink<InputT, CommT, WriterStateT, 
GlobalCommT>
 
     private final SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, 
GlobalCommT> sink;
 
-    public FlinkSink(SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, 
GlobalCommT> sink) {
+    private final CatalogTable catalogTable;
+
+    public FlinkSink(
+            SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink,
+            CatalogTable catalogTable) {
         this.sink = sink;
+        this.catalogTable = catalogTable;
     }
 
     @Override
@@ -60,14 +66,15 @@ public class FlinkSink<InputT, CommT, WriterStateT, 
GlobalCommT>
                 new DefaultSinkWriterContext(context.getSubtaskId());
 
         if (states == null || states.isEmpty()) {
-            return new FlinkSinkWriter<>(sink.createWriter(stContext), 1, 
sink.getConsumedType());
+            return new FlinkSinkWriter<>(
+                    sink.createWriter(stContext), 1, 
catalogTable.getSeaTunnelRowType());
         } else {
             List<WriterStateT> restoredState =
                     
states.stream().map(FlinkWriterState::getState).collect(Collectors.toList());
             return new FlinkSinkWriter<>(
                     sink.restoreWriter(stContext, restoredState),
                     states.get(0).getCheckpointId() + 1,
-                    sink.getConsumedType());
+                    catalogTable.getSeaTunnelRowType());
         }
     }
 
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
index f3cd959b47..59f17cd7a5 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.translation.spark.sink;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.utils.SerializationUtils;
@@ -42,17 +43,29 @@ public class SparkSink<StateT, CommitInfoT, 
AggregatedCommitInfoT>
 
     private volatile SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, 
AggregatedCommitInfoT> sink;
 
+    private volatile CatalogTable catalogTable;
+
     private void init(DataSourceOptions options) {
         if (sink == null) {
             this.sink =
                     SerializationUtils.stringToObject(
-                            options.get(Constants.SINK)
+                            options.get(Constants.SINK_SERIALIZATION)
                                     .orElseThrow(
                                             () ->
                                                     new 
IllegalArgumentException(
                                                             "can not find sink 
"
                                                                     + "class 
string in DataSourceOptions")));
         }
+        if (catalogTable == null) {
+            this.catalogTable =
+                    SerializationUtils.stringToObject(
+                            options.get(SparkSinkInjector.SINK_CATALOG_TABLE)
+                                    .orElseThrow(
+                                            () ->
+                                                    new 
IllegalArgumentException(
+                                                            "can not find sink 
"
+                                                                    + "catalog 
table string in DataSourceOptions")));
+        }
     }
 
     @Override
@@ -61,7 +74,7 @@ public class SparkSink<StateT, CommitInfoT, 
AggregatedCommitInfoT>
         init(options);
 
         try {
-            return new SparkStreamWriter<>(sink);
+            return new SparkStreamWriter<>(sink, catalogTable);
         } catch (IOException e) {
             throw new RuntimeException("find error when createStreamWriter", 
e);
         }
@@ -73,7 +86,7 @@ public class SparkSink<StateT, CommitInfoT, 
AggregatedCommitInfoT>
         init(options);
 
         try {
-            return Optional.of(new SparkDataSourceWriter<>(sink));
+            return Optional.of(new SparkDataSourceWriter<>(sink, 
catalogTable));
         } catch (IOException e) {
             throw new RuntimeException("find error when createStreamWriter", 
e);
         }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
index 9b312bac02..e19957625c 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.translation.spark.sink;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 
@@ -31,16 +32,24 @@ public class SparkSinkInjector {
     private static final String SPARK_SINK_CLASS_NAME =
             "org.apache.seatunnel.translation.spark.sink.SparkSink";
 
+    public static final String SINK_CATALOG_TABLE = "sink.catalog.table";
+
     public static DataStreamWriter<Row> inject(
-            DataStreamWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink) {
+            DataStreamWriter<Row> dataset,
+            SeaTunnelSink<?, ?, ?, ?> sink,
+            CatalogTable catalogTable) {
         return dataset.format(SPARK_SINK_CLASS_NAME)
                 .outputMode(OutputMode.Append())
-                .option(Constants.SINK, 
SerializationUtils.objectToString(sink));
+                .option(Constants.SINK_SERIALIZATION, 
SerializationUtils.objectToString(sink))
+                .option(SINK_CATALOG_TABLE, 
SerializationUtils.objectToString(catalogTable));
     }
 
     public static DataFrameWriter<Row> inject(
-            DataFrameWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink) {
+            DataFrameWriter<Row> dataset,
+            SeaTunnelSink<?, ?, ?, ?> sink,
+            CatalogTable catalogTable) {
         return dataset.format(SPARK_SINK_CLASS_NAME)
-                .option(Constants.SINK, 
SerializationUtils.objectToString(sink));
+                .option(Constants.SINK_SERIALIZATION, 
SerializationUtils.objectToString(sink))
+                .option(SINK_CATALOG_TABLE, 
SerializationUtils.objectToString(catalogTable));
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataSourceWriter.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataSourceWriter.java
index 08279e0714..3f0aa1f4ff 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataSourceWriter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataSourceWriter.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.translation.spark.sink.writer;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -44,10 +45,14 @@ public class SparkDataSourceWriter<StateT, CommitInfoT, 
AggregatedCommitInfoT>
     @Nullable protected final SinkAggregatedCommitter<CommitInfoT, 
AggregatedCommitInfoT>
             sinkAggregatedCommitter;
 
+    protected final CatalogTable catalogTable;
+
     public SparkDataSourceWriter(
-            SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, 
AggregatedCommitInfoT> sink)
+            SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, 
AggregatedCommitInfoT> sink,
+            CatalogTable catalogTable)
             throws IOException {
         this.sink = sink;
+        this.catalogTable = catalogTable;
         this.sinkAggregatedCommitter = 
sink.createAggregatedCommitter().orElse(null);
         if (sinkAggregatedCommitter != null) {
             sinkAggregatedCommitter.init();
@@ -56,7 +61,7 @@ public class SparkDataSourceWriter<StateT, CommitInfoT, 
AggregatedCommitInfoT>
 
     @Override
     public DataWriterFactory<InternalRow> createWriterFactory() {
-        return new SparkDataWriterFactory<>(sink);
+        return new SparkDataWriterFactory<>(sink, catalogTable);
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
index 00747d6845..dc629c5512 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -32,9 +33,12 @@ import java.io.IOException;
 public class SparkDataWriterFactory<CommitInfoT, StateT> implements 
DataWriterFactory<InternalRow> {
 
     private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink;
+    private final CatalogTable catalogTable;
 
-    SparkDataWriterFactory(SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> 
sink) {
+    SparkDataWriterFactory(
+            SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink, 
CatalogTable catalogTable) {
         this.sink = sink;
+        this.catalogTable = catalogTable;
     }
 
     @Override
@@ -53,6 +57,7 @@ public class SparkDataWriterFactory<CommitInfoT, StateT> 
implements DataWriterFa
         } catch (IOException e) {
             throw new RuntimeException("Failed to create SinkCommitter.", e);
         }
-        return new SparkDataWriter<>(writer, committer, 
sink.getConsumedType(), epochId);
+        return new SparkDataWriter<>(
+                writer, committer, catalogTable.getSeaTunnelRowType(), 
epochId);
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkStreamWriter.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkStreamWriter.java
index 2978560916..bf58f1160d 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkStreamWriter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkStreamWriter.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.translation.spark.sink.writer;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -32,9 +33,10 @@ public class SparkStreamWriter<StateT, CommitInfoT, 
AggregatedCommitInfoT>
         implements StreamWriter {
 
     public SparkStreamWriter(
-            SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, 
AggregatedCommitInfoT> sink)
+            SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, 
AggregatedCommitInfoT> sink,
+            CatalogTable catalogTable)
             throws IOException {
-        super(sink);
+        super(sink, catalogTable);
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelBatchWrite.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelBatchWrite.java
index b8c08e5547..a363b88784 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelBatchWrite.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelBatchWrite.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.translation.spark.sink;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import 
org.apache.seatunnel.translation.spark.sink.write.SeaTunnelSparkDataWriterFactory;
 import 
org.apache.seatunnel.translation.spark.sink.write.SeaTunnelSparkWriterCommitMessage;
@@ -44,16 +45,20 @@ public class SeaTunnelBatchWrite<StateT, CommitInfoT, 
AggregatedCommitInfoT>
 
     private final SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> 
aggregatedCommitter;
 
+    private final CatalogTable catalogTable;
+
     public SeaTunnelBatchWrite(
-            SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, 
AggregatedCommitInfoT> sink)
+            SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, 
AggregatedCommitInfoT> sink,
+            CatalogTable catalogTable)
             throws IOException {
         this.sink = sink;
+        this.catalogTable = catalogTable;
         this.aggregatedCommitter = 
sink.createAggregatedCommitter().orElse(null);
     }
 
     @Override
     public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
-        return new SeaTunnelSparkDataWriterFactory<>(sink);
+        return new SeaTunnelSparkDataWriterFactory<>(sink, catalogTable);
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkTable.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkTable.java
index 0660163289..be20846fff 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkTable.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkTable.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.translation.spark.sink;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.utils.SerializationUtils;
@@ -45,18 +46,27 @@ public class SeaTunnelSinkTable implements Table, 
SupportsWrite {
 
     private final SeaTunnelSink<SeaTunnelRow, ?, ?, ?> sink;
 
+    private final CatalogTable catalogTable;
+
     public SeaTunnelSinkTable(Map<String, String> properties) {
         this.properties = properties;
         String sinkSerialization = 
properties.getOrDefault(Constants.SINK_SERIALIZATION, "");
         if (StringUtils.isBlank(sinkSerialization)) {
-            throw new IllegalArgumentException("sink.serialization must be 
specified");
+            throw new IllegalArgumentException(Constants.SINK_SERIALIZATION + 
" must be specified");
         }
         this.sink = SerializationUtils.stringToObject(sinkSerialization);
+        String sinkCatalogTableSerialization =
+                properties.getOrDefault(SparkSinkInjector.SINK_CATALOG_TABLE, 
"");
+        if (StringUtils.isBlank(sinkCatalogTableSerialization)) {
+            throw new IllegalArgumentException(
+                    SparkSinkInjector.SINK_CATALOG_TABLE + " must be 
specified");
+        }
+        this.catalogTable = 
SerializationUtils.stringToObject(sinkCatalogTableSerialization);
     }
 
     @Override
     public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
-        return new SeaTunnelWriteBuilder<>(sink);
+        return new SeaTunnelWriteBuilder<>(sink, catalogTable);
     }
 
     @Override
@@ -66,7 +76,7 @@ public class SeaTunnelSinkTable implements Table, 
SupportsWrite {
 
     @Override
     public StructType schema() {
-        return (StructType) TypeConverterUtils.convert(sink.getConsumedType());
+        return (StructType) 
TypeConverterUtils.convert(catalogTable.getSeaTunnelRowType());
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
index bf53eae6f7..41b0d7153b 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.translation.spark.sink;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 
@@ -30,16 +31,24 @@ public class SparkSinkInjector {
 
     private static final String SINK_NAME = 
SeaTunnelSink.class.getSimpleName();
 
+    public static final String SINK_CATALOG_TABLE = "sink.catalog.table";
+
     public static DataStreamWriter<Row> inject(
-            DataStreamWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink) {
+            DataStreamWriter<Row> dataset,
+            SeaTunnelSink<?, ?, ?, ?> sink,
+            CatalogTable catalogTable) {
         return dataset.format(SINK_NAME)
                 .outputMode(OutputMode.Append())
-                .option(Constants.SINK_SERIALIZATION, 
SerializationUtils.objectToString(sink));
+                .option(Constants.SINK_SERIALIZATION, 
SerializationUtils.objectToString(sink))
+                .option(SINK_CATALOG_TABLE, 
SerializationUtils.objectToString(catalogTable));
     }
 
     public static DataFrameWriter<Row> inject(
-            DataFrameWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink) {
+            DataFrameWriter<Row> dataset,
+            SeaTunnelSink<?, ?, ?, ?> sink,
+            CatalogTable catalogTable) {
         return dataset.format(SINK_NAME)
-                .option(Constants.SINK_SERIALIZATION, 
SerializationUtils.objectToString(sink));
+                .option(Constants.SINK_SERIALIZATION, 
SerializationUtils.objectToString(sink))
+                .option(SINK_CATALOG_TABLE, 
SerializationUtils.objectToString(catalogTable));
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
index 1cea0226fb..3b06468244 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -34,10 +35,12 @@ public class SeaTunnelSparkDataWriterFactory<CommitInfoT, 
StateT>
         implements DataWriterFactory, StreamingDataWriterFactory {
 
     private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink;
+    private final CatalogTable catalogTable;
 
     public SeaTunnelSparkDataWriterFactory(
-            SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink) {
+            SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink, 
CatalogTable catalogTable) {
         this.sink = sink;
+        this.catalogTable = catalogTable;
     }
 
     @Override
@@ -55,7 +58,8 @@ public class SeaTunnelSparkDataWriterFactory<CommitInfoT, 
StateT>
         } catch (IOException e) {
             throw new RuntimeException("Failed to create SinkCommitter.", e);
         }
-        return new SeaTunnelSparkDataWriter<>(writer, committer, 
sink.getConsumedType(), 0);
+        return new SeaTunnelSparkDataWriter<>(
+                writer, committer, catalogTable.getSeaTunnelRowType(), 0);
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWrite.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWrite.java
index 47f8fcb49a..9df74612e5 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWrite.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWrite.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.translation.spark.sink.write;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.translation.spark.sink.SeaTunnelBatchWrite;
 
@@ -30,16 +31,19 @@ import java.io.IOException;
 public class SeaTunnelWrite<AggregatedCommitInfoT, CommitInfoT, StateT> 
implements Write {
 
     private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, 
AggregatedCommitInfoT> sink;
+    private final CatalogTable catalogTable;
 
     public SeaTunnelWrite(
-            SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, 
AggregatedCommitInfoT> sink) {
+            SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, 
AggregatedCommitInfoT> sink,
+            CatalogTable catalogTable) {
         this.sink = sink;
+        this.catalogTable = catalogTable;
     }
 
     @Override
     public BatchWrite toBatch() {
         try {
-            return new SeaTunnelBatchWrite<>(sink);
+            return new SeaTunnelBatchWrite<>(sink, catalogTable);
         } catch (IOException e) {
             throw new RuntimeException("SeaTunnel Spark sink create batch 
failed", e);
         }
@@ -48,7 +52,7 @@ public class SeaTunnelWrite<AggregatedCommitInfoT, 
CommitInfoT, StateT> implemen
     @Override
     public StreamingWrite toStreaming() {
         try {
-            return new SeaTunnelBatchWrite<>(sink);
+            return new SeaTunnelBatchWrite<>(sink, catalogTable);
         } catch (IOException e) {
             throw new RuntimeException("SeaTunnel Spark sink create batch 
failed", e);
         }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWriteBuilder.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWriteBuilder.java
index fca4879b4d..6a8f0e12ee 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWriteBuilder.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWriteBuilder.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.translation.spark.sink.write;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import org.apache.spark.sql.connector.write.Write;
@@ -27,14 +28,17 @@ public class SeaTunnelWriteBuilder<StateT, CommitInfoT, 
AggregatedCommitInfoT>
         implements WriteBuilder {
 
     private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, 
AggregatedCommitInfoT> sink;
+    private final CatalogTable catalogTable;
 
     public SeaTunnelWriteBuilder(
-            SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, 
AggregatedCommitInfoT> sink) {
+            SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, 
AggregatedCommitInfoT> sink,
+            CatalogTable catalogTable) {
         this.sink = sink;
+        this.catalogTable = catalogTable;
     }
 
     @Override
     public Write build() {
-        return new SeaTunnelWrite<>(sink);
+        return new SeaTunnelWrite<>(sink, catalogTable);
     }
 }


Reply via email to