This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8de7408100 [Improve] Remove use `SeaTunnelSink::getConsumedType`
method and mark it as deprecated (#5755)
8de7408100 is described below
commit 8de74081002abcb36fee48faacb59843f7ff86c9
Author: Jia Fan <[email protected]>
AuthorDate: Tue Nov 7 10:31:25 2023 +0800
[Improve] Remove use `SeaTunnelSink::getConsumedType` method and mark it as
deprecated (#5755)
---
.../org/apache/seatunnel/api/sink/SeaTunnelSink.java | 6 +++++-
.../amazondynamodb/sink/AmazonDynamoDBSink.java | 6 ------
.../seatunnel/amazonsqs/sink/AmazonSqsSink.java | 6 ------
.../seatunnel/assertion/sink/AssertSink.java | 6 ------
.../seatunnel/cassandra/sink/CassandraSink.java | 6 ------
.../clickhouse/sink/client/ClickhouseSink.java | 6 ------
.../clickhouse/sink/file/ClickhouseFileSink.java | 6 ------
.../common/multitablesink/MultiTableSink.java | 6 ------
.../seatunnel/console/sink/ConsoleSink.java | 6 ------
.../seatunnel/datahub/sink/DataHubSink.java | 6 ------
.../connectors/seatunnel/sink/DingTalkSink.java | 6 ------
.../seatunnel/connectors/doris/sink/DorisSink.java | 6 ------
.../elasticsearch/sink/ElasticsearchSink.java | 6 ------
.../connectors/seatunnel/email/sink/EmailSink.java | 6 ------
.../connectors/seatunnel/file/sink/BaseFileSink.java | 6 ------
.../google/firestore/sink/FirestoreSink.java | 6 ------
.../connectors/seatunnel/hbase/sink/HbaseSink.java | 6 ------
.../connectors/seatunnel/http/sink/HttpSink.java | 6 ------
.../seatunnel/influxdb/sink/InfluxDBSink.java | 6 ------
.../connectors/seatunnel/iotdb/sink/IoTDBSink.java | 6 ------
.../connectors/seatunnel/jdbc/sink/JdbcSink.java | 6 ------
.../connectors/seatunnel/kafka/sink/KafkaSink.java | 6 ------
.../connectors/seatunnel/kudu/sink/KuduSink.java | 6 ------
.../seatunnel/maxcompute/sink/MaxcomputeSink.java | 6 ------
.../seatunnel/mongodb/sink/MongodbSink.java | 6 ------
.../connectors/seatunnel/neo4j/sink/Neo4jSink.java | 6 ------
.../connectors/seatunnel/paimon/sink/PaimonSink.java | 6 ------
.../seatunnel/rabbitmq/sink/RabbitmqSink.java | 6 ------
.../connectors/seatunnel/redis/sink/RedisSink.java | 6 ------
.../seatunnel/rocketmq/sink/RocketMqSink.java | 6 ------
.../connectors/selectdb/sink/SelectDBSink.java | 6 ------
.../connectors/seatunnel/sentry/sink/SentrySink.java | 6 ------
.../connectors/seatunnel/slack/sink/SlackSink.java | 6 ------
.../connectors/seatunnel/socket/sink/SocketSink.java | 6 ------
.../seatunnel/starrocks/sink/StarRocksSink.java | 6 ------
.../seatunnel/tablestore/sink/TablestoreSink.java | 6 ------
.../seatunnel/tdengine/sink/TDengineSink.java | 6 ------
.../starter/flink/execution/SinkExecuteProcessor.java | 4 +++-
.../starter/flink/execution/SinkExecuteProcessor.java | 4 +++-
.../starter/spark/execution/SinkExecuteProcessor.java | 2 +-
.../starter/spark/execution/SinkExecuteProcessor.java | 2 +-
.../connector/ConnectorSpecificationCheckTest.java | 9 ++++++++-
.../seatunnel/translation/flink/sink/FlinkSink.java | 13 ++++++++++---
.../seatunnel/translation/spark/sink/SparkSink.java | 19 ++++++++++++++++---
.../translation/spark/sink/SparkSinkInjector.java | 17 +++++++++++++----
.../spark/sink/writer/SparkDataSourceWriter.java | 9 +++++++--
.../spark/sink/writer/SparkDataWriterFactory.java | 9 +++++++--
.../spark/sink/writer/SparkStreamWriter.java | 6 ++++--
.../translation/spark/sink/SeaTunnelBatchWrite.java | 9 +++++++--
.../translation/spark/sink/SeaTunnelSinkTable.java | 16 +++++++++++++---
.../translation/spark/sink/SparkSinkInjector.java | 17 +++++++++++++----
.../sink/write/SeaTunnelSparkDataWriterFactory.java | 8 ++++++--
.../translation/spark/sink/write/SeaTunnelWrite.java | 10 +++++++---
.../spark/sink/write/SeaTunnelWriteBuilder.java | 8 ++++++--
54 files changed, 130 insertions(+), 254 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index 913d644958..cd869a3ca8 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -64,9 +64,13 @@ public interface SeaTunnelSink<IN, StateT, CommitInfoT,
AggregatedCommitInfoT>
/**
* Get the data type of the records consumed by this sink.
*
+ * @deprecated instead by {@link
org.apache.seatunnel.api.table.factory.Factory}
* @return SeaTunnel data type.
*/
- SeaTunnelDataType<IN> getConsumedType();
+ @Deprecated
+ default SeaTunnelDataType<IN> getConsumedType() {
+ throw new UnsupportedOperationException("getConsumedType method is not
supported");
+ }
/**
* This method will be called to creat {@link SinkWriter}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
index 694a1f6929..f26b388cd7 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -81,11 +80,6 @@ public class AmazonDynamoDBSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
this.rowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return rowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSink.java
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSink.java
index 3dc12b2168..04cbdba2a5 100644
---
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSink.java
+++
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSink.java
@@ -19,7 +19,6 @@ package
org.apache.seatunnel.connectors.seatunnel.amazonsqs.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -41,11 +40,6 @@ public class AmazonSqsSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
this.pluginConfig = pluginConfig;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return typeInfo;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index a312159929..6a93f83abc 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -26,7 +26,6 @@ import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertCatalogTableRule;
@@ -97,11 +96,6 @@ public class AssertSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
}
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return seaTunnelRowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) {
return new AssertSinkWriter(
diff --git
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
index 748280e66a..7614991333 100644
---
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
+++
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -117,11 +116,6 @@ public class CassandraSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
this.seaTunnelRowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index fe1b25e909..2aa545e676 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -26,7 +26,6 @@ import
org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -243,9 +242,4 @@ public class ClickhouseSink
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.option.setSeaTunnelRowType(seaTunnelRowType);
}
-
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.option.getSeaTunnelRowType();
- }
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index 762815ee04..7a56a0010e 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -27,7 +27,6 @@ import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -191,11 +190,6 @@ public class ClickhouseFileSink
this.readerOption.setSeaTunnelRowType(seaTunnelRowType);
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.readerOption.getSeaTunnelRowType();
- }
-
@Override
public SinkWriter<SeaTunnelRow, CKFileCommitInfo, ClickhouseSinkState>
createWriter(
SinkWriter.Context context) throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
index 18165b3342..7abb176117 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
@@ -26,7 +26,6 @@ import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import java.io.IOException;
@@ -58,11 +57,6 @@ public class MultiTableSink
return "MultiTableSink";
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- throw new UnsupportedOperationException("MultiTableSink only support
CatalogTable");
- }
-
@Override
public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState>
createWriter(
SinkWriter.Context context) throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index 7e3c3e5bbe..d26c819695 100644
---
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -20,7 +20,6 @@ package
org.apache.seatunnel.connectors.seatunnel.console.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -41,11 +40,6 @@ public class ConsoleSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
this.delayMs = options.get(LOG_PRINT_DELAY);
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) {
return new ConsoleSinkWriter(seaTunnelRowType, context, isPrintData,
delayMs);
diff --git
a/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSink.java
b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSink.java
index cf8eafcf6e..8eeffb3aca 100644
---
a/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSink.java
+++
b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter.Context;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -82,11 +81,6 @@ public class DataHubSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
this.seaTunnelRowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(Context
context) throws IOException {
return new DataHubWriter(
diff --git
a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
index a078e23856..8d9d4b973e 100644
---
a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
+++
b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter.Context;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -69,11 +68,6 @@ public class DingTalkSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
this.seaTunnelRowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(Context
context) throws IOException {
return new DingTalkWriter(
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
index 2c6d6ae742..861a93b8be 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
@@ -27,7 +27,6 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -90,11 +89,6 @@ public class DorisSink
this.seaTunnelRowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
- }
-
@Override
public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState>
createWriter(
SinkWriter.Context context) throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
index 4ef7a26eb9..f1ab596b24 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchAggregatedCommitInfo;
@@ -70,11 +69,6 @@ public class ElasticsearchSink
this.seaTunnelRowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
- }
-
@Override
public SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo,
ElasticsearchSinkState> createWriter(
SinkWriter.Context context) {
diff --git
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
index c0e2c981f6..c1b8ffdd37 100644
---
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
+++
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -40,11 +39,6 @@ public class EmailSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
this.seaTunnelRowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) {
return new EmailSinkWriter(seaTunnelRowType, pluginConfig);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
index 62b39a14a6..4d9244f779 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
@@ -26,7 +26,6 @@ import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
@@ -67,11 +66,6 @@ public abstract class BaseFileSink
this.fileSystemUtils = new FileSystemUtils(hadoopConf);
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return seaTunnelRowType;
- }
-
@Override
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState>
restoreWriter(
SinkWriter.Context context, List<FileSinkState> states) throws
IOException {
diff --git
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
index 32e475b6d1..ab7c02057d 100644
---
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
+++
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -72,11 +71,6 @@ public class FirestoreSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
this.rowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return rowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
index 78db280a6d..81452eb989 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -94,11 +93,6 @@ public class HbaseSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
}
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return seaTunnelRowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
index 767293b67b..d8a1c5fafe 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -89,11 +88,6 @@ public class HttpSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
this.seaTunnelRowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
index 0ba29fc95f..9cc03272d1 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -70,11 +69,6 @@ public class InfluxDBSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
this.seaTunnelRowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return seaTunnelRowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
index 96e59627c0..263ab04973 100644
---
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
+++
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -75,11 +74,6 @@ public class IoTDBSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
this.seaTunnelRowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) {
return new IoTDBSinkWriter(pluginConfig, seaTunnelRowType);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index cbed80c7fe..4234503a09 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -33,7 +33,6 @@ import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
@@ -146,11 +145,6 @@ public class JdbcSink
return Optional.empty();
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
- }
-
@Override
public Optional<Serializer<JdbcAggregatedCommitInfo>>
getAggregatedCommitInfoSerializer() {
if (jdbcSinkConfig.isExactlyOnce()) {
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index 43b6b853a2..e7945d9ed1 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedCommitInfo;
@@ -50,11 +49,6 @@ public class KafkaSink
this.seaTunnelRowType = rowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
- }
-
@Override
public SinkWriter<SeaTunnelRow, KafkaCommitInfo, KafkaSinkState>
createWriter(
SinkWriter.Context context) {
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
index 5ea299ac71..0c8494afe8 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
@@ -25,7 +25,6 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.PluginType;
@@ -63,11 +62,6 @@ public class KuduSink
this.seaTunnelRowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
- }
-
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
index 31c5ab0791..c5acadb173 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -58,11 +57,6 @@ public class MaxcomputeSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
this.typeInfo = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.typeInfo;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) {
return new MaxcomputeWriter(this.pluginConfig);
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
index 160aa966a0..8c86b119f9 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
@@ -25,7 +25,6 @@ import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
@@ -115,11 +114,6 @@ public class MongodbSink
this.seaTunnelRowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return seaTunnelRowType;
- }
-
@Override
public SinkWriter<SeaTunnelRow, MongodbCommitInfo, DocumentBulk>
createWriter(
SinkWriter.Context context) {
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
index 4ab0070d75..26f127509e 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkQueryInfo;
@@ -54,11 +53,6 @@ public class Neo4jSink implements
SeaTunnelSink<SeaTunnelRow, Void, Void, Void>
this.rowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.rowType;
- }
-
@Override
public SinkWriter<SeaTunnelRow, Void, Void>
createWriter(SinkWriter.Context context)
throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index dccef82cdd..fec1a7d7ce 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -26,7 +26,6 @@ import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -124,11 +123,6 @@ public class PaimonSink
this.seaTunnelRowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return seaTunnelRowType;
- }
-
@Override
public SinkWriter<SeaTunnelRow, PaimonCommitInfo, PaimonSinkState>
createWriter(
SinkWriter.Context context) throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
index 15dc55f30c..1dcd792404 100644
---
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
+++
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -84,11 +83,6 @@ public class RabbitmqSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
this.seaTunnelRowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType getConsumedType() {
- return seaTunnelRowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
index 3b7cfee6a0..381576c6c9 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -75,11 +74,6 @@ public class RedisSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
this.seaTunnelRowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return seaTunnelRowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
index 04ea6bc3e1..bf81adf0a7 100644
---
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -157,11 +156,6 @@ public class RocketMqSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
this.seaTunnelRowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
index c095f94011..f4ef83d0b2 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
@@ -27,7 +27,6 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -97,11 +96,6 @@ public class SelectDBSink
this.seaTunnelRowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
- }
-
@Override
public SinkWriter<SeaTunnelRow, SelectDBCommitInfo, SelectDBSinkState>
createWriter(
SinkWriter.Context context) throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
index bd82f4b2ce..0a109dfa41 100644
---
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
+++
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.PluginType;
@@ -68,11 +67,6 @@ public class SentrySink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
this.seaTunnelRowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
index 384d42ec86..91bee53eb5 100644
---
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
+++
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -50,11 +49,6 @@ public class SlackSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
this.seaTunnelRowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
index 7ba1304ef8..222947f72e 100644
---
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
+++
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -71,11 +70,6 @@ public class SocketSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
this.seaTunnelRowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return seaTunnelRowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index ec3960783f..ee613dd972 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -76,11 +75,6 @@ public class StarRocksSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
}
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) {
return new StarRocksSinkWriter(sinkConfig, seaTunnelRowType);
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
index 145d242147..9a16e6aeb1 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -83,11 +82,6 @@ public class TablestoreSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
this.rowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return rowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
index 69bdcf85f0..82d1069f7c 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -41,11 +40,6 @@ public class TDengineSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
this.seaTunnelRowType = seaTunnelRowType;
}
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
- return this.seaTunnelRowType;
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index b3db64cc15..195ffbb7b6 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -119,7 +119,9 @@ public class SinkExecuteProcessor
saveModeHandler.ifPresent(SaveModeHandler::handleSaveMode);
}
DataStreamSink<Row> dataStreamSink =
- stream.getDataStream().sinkTo(new
FlinkSink<>(sink)).name(sink.getPluginName());
+ stream.getDataStream()
+ .sinkTo(new FlinkSink<>(sink,
stream.getCatalogTable()))
+ .name(sink.getPluginName());
if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
int parallelism =
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
dataStreamSink.setParallelism(parallelism);
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index ec15567833..310c0e4309 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -121,7 +121,9 @@ public class SinkExecuteProcessor
}
DataStreamSink<Row> dataStreamSink =
stream.getDataStream()
- .sinkTo(SinkV1Adapter.wrap(new FlinkSink<>(sink)))
+ .sinkTo(
+ SinkV1Adapter.wrap(
+ new FlinkSink<>(sink,
stream.getCatalogTable())))
.name(sink.getPluginName());
if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
int parallelism =
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 22e3a30ae3..cbcac81aff 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -140,7 +140,7 @@ public class SinkExecuteProcessor
Optional<SaveModeHandler> saveModeHandler =
saveModeSink.getSaveModeHandler();
saveModeHandler.ifPresent(SaveModeHandler::handleSaveMode);
}
- SparkSinkInjector.inject(dataset.write(), sink)
+ SparkSinkInjector.inject(dataset.write(), sink,
datasetTableInfo.getCatalogTable())
.option("checkpointLocation", "/tmp")
.save();
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 760b917f2c..84d0f0ba8b 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -141,7 +141,7 @@ public class SinkExecuteProcessor
Optional<SaveModeHandler> saveModeHandler =
saveModeSink.getSaveModeHandler();
saveModeHandler.ifPresent(SaveModeHandler::handleSaveMode);
}
- SparkSinkInjector.inject(dataset.write(), sink)
+ SparkSinkInjector.inject(dataset.write(), sink,
datasetTableInfo.getCatalogTable())
.option("checkpointLocation", "/tmp")
.mode(SaveMode.Append)
.save();
diff --git
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
index c05c8cc627..34fe44a1cb 100644
---
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
+++
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.junit.jupiter.api.Assertions;
@@ -93,13 +94,19 @@ public class ConnectorSpecificationCheckTest {
factoryName.replace("Factory", "")));
Optional<Method> prepare =
ReflectionUtils.getDeclaredMethod(sinkClass, "prepare");
Optional<Method> setTypeInfo =
- ReflectionUtils.getDeclaredMethod(sinkClass,
"setTypeInfo");
+ ReflectionUtils.getDeclaredMethod(
+ sinkClass, "setTypeInfo",
SeaTunnelRowType.class);
+ Optional<Method> getConsumedType =
+ ReflectionUtils.getDeclaredMethod(sinkClass,
"getConsumedType");
Assertions.assertFalse(
prepare.isPresent(),
"Please remove `prepare` method in " +
sinkClass.getSimpleName());
Assertions.assertFalse(
setTypeInfo.isPresent(),
"Please remove `setTypeInfo` method in " +
sinkClass.getSimpleName());
+ Assertions.assertFalse(
+ getConsumedType.isPresent(),
+ "Please remove `getConsumedType` method in " +
sinkClass.getSimpleName());
}
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index 0b47243fbc..a2082fd012 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.translation.flink.sink;
import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.translation.flink.serialization.CommitWrapperSerializer;
import
org.apache.seatunnel.translation.flink.serialization.FlinkSimpleVersionedSerializer;
@@ -48,8 +49,13 @@ public class FlinkSink<InputT, CommT, WriterStateT,
GlobalCommT>
private final SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT,
GlobalCommT> sink;
- public FlinkSink(SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT,
GlobalCommT> sink) {
+ private final CatalogTable catalogTable;
+
+ public FlinkSink(
+ SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink,
+ CatalogTable catalogTable) {
this.sink = sink;
+ this.catalogTable = catalogTable;
}
@Override
@@ -60,14 +66,15 @@ public class FlinkSink<InputT, CommT, WriterStateT,
GlobalCommT>
new DefaultSinkWriterContext(context.getSubtaskId());
if (states == null || states.isEmpty()) {
- return new FlinkSinkWriter<>(sink.createWriter(stContext), 1,
sink.getConsumedType());
+ return new FlinkSinkWriter<>(
+ sink.createWriter(stContext), 1,
catalogTable.getSeaTunnelRowType());
} else {
List<WriterStateT> restoredState =
states.stream().map(FlinkWriterState::getState).collect(Collectors.toList());
return new FlinkSinkWriter<>(
sink.restoreWriter(stContext, restoredState),
states.get(0).getCheckpointId() + 1,
- sink.getConsumedType());
+ catalogTable.getSeaTunnelRowType());
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
index f3cd959b47..59f17cd7a5 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.translation.spark.sink;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.utils.SerializationUtils;
@@ -42,17 +43,29 @@ public class SparkSink<StateT, CommitInfoT,
AggregatedCommitInfoT>
private volatile SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink;
+ private volatile CatalogTable catalogTable;
+
private void init(DataSourceOptions options) {
if (sink == null) {
this.sink =
SerializationUtils.stringToObject(
- options.get(Constants.SINK)
+ options.get(Constants.SINK_SERIALIZATION)
.orElseThrow(
() ->
new
IllegalArgumentException(
"can not find sink
"
+ "class
string in DataSourceOptions")));
}
+ if (catalogTable == null) {
+ this.catalogTable =
+ SerializationUtils.stringToObject(
+ options.get(SparkSinkInjector.SINK_CATALOG_TABLE)
+ .orElseThrow(
+ () ->
+ new
IllegalArgumentException(
+ "can not find sink
"
+ + "catalog
table string in DataSourceOptions")));
+ }
}
@Override
@@ -61,7 +74,7 @@ public class SparkSink<StateT, CommitInfoT,
AggregatedCommitInfoT>
init(options);
try {
- return new SparkStreamWriter<>(sink);
+ return new SparkStreamWriter<>(sink, catalogTable);
} catch (IOException e) {
throw new RuntimeException("find error when createStreamWriter",
e);
}
@@ -73,7 +86,7 @@ public class SparkSink<StateT, CommitInfoT,
AggregatedCommitInfoT>
init(options);
try {
- return Optional.of(new SparkDataSourceWriter<>(sink));
+ return Optional.of(new SparkDataSourceWriter<>(sink,
catalogTable));
} catch (IOException e) {
throw new RuntimeException("find error when createStreamWriter",
e);
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
index 9b312bac02..e19957625c 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.translation.spark.sink;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.utils.SerializationUtils;
@@ -31,16 +32,24 @@ public class SparkSinkInjector {
private static final String SPARK_SINK_CLASS_NAME =
"org.apache.seatunnel.translation.spark.sink.SparkSink";
+ public static final String SINK_CATALOG_TABLE = "sink.catalog.table";
+
public static DataStreamWriter<Row> inject(
- DataStreamWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink) {
+ DataStreamWriter<Row> dataset,
+ SeaTunnelSink<?, ?, ?, ?> sink,
+ CatalogTable catalogTable) {
return dataset.format(SPARK_SINK_CLASS_NAME)
.outputMode(OutputMode.Append())
- .option(Constants.SINK,
SerializationUtils.objectToString(sink));
+ .option(Constants.SINK_SERIALIZATION,
SerializationUtils.objectToString(sink))
+ .option(SINK_CATALOG_TABLE,
SerializationUtils.objectToString(catalogTable));
}
public static DataFrameWriter<Row> inject(
- DataFrameWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink) {
+ DataFrameWriter<Row> dataset,
+ SeaTunnelSink<?, ?, ?, ?> sink,
+ CatalogTable catalogTable) {
return dataset.format(SPARK_SINK_CLASS_NAME)
- .option(Constants.SINK,
SerializationUtils.objectToString(sink));
+ .option(Constants.SINK_SERIALIZATION,
SerializationUtils.objectToString(sink))
+ .option(SINK_CATALOG_TABLE,
SerializationUtils.objectToString(catalogTable));
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataSourceWriter.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataSourceWriter.java
index 08279e0714..3f0aa1f4ff 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataSourceWriter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataSourceWriter.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.translation.spark.sink.writer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -44,10 +45,14 @@ public class SparkDataSourceWriter<StateT, CommitInfoT,
AggregatedCommitInfoT>
@Nullable protected final SinkAggregatedCommitter<CommitInfoT,
AggregatedCommitInfoT>
sinkAggregatedCommitter;
+ protected final CatalogTable catalogTable;
+
public SparkDataSourceWriter(
- SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink)
+ SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink,
+ CatalogTable catalogTable)
throws IOException {
this.sink = sink;
+ this.catalogTable = catalogTable;
this.sinkAggregatedCommitter =
sink.createAggregatedCommitter().orElse(null);
if (sinkAggregatedCommitter != null) {
sinkAggregatedCommitter.init();
@@ -56,7 +61,7 @@ public class SparkDataSourceWriter<StateT, CommitInfoT,
AggregatedCommitInfoT>
@Override
public DataWriterFactory<InternalRow> createWriterFactory() {
- return new SparkDataWriterFactory<>(sink);
+ return new SparkDataWriterFactory<>(sink, catalogTable);
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
index 00747d6845..dc629c5512 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriterFactory.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -32,9 +33,12 @@ import java.io.IOException;
public class SparkDataWriterFactory<CommitInfoT, StateT> implements
DataWriterFactory<InternalRow> {
private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink;
+ private final CatalogTable catalogTable;
- SparkDataWriterFactory(SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?>
sink) {
+ SparkDataWriterFactory(
+ SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink,
CatalogTable catalogTable) {
this.sink = sink;
+ this.catalogTable = catalogTable;
}
@Override
@@ -53,6 +57,7 @@ public class SparkDataWriterFactory<CommitInfoT, StateT>
implements DataWriterFa
} catch (IOException e) {
throw new RuntimeException("Failed to create SinkCommitter.", e);
}
- return new SparkDataWriter<>(writer, committer,
sink.getConsumedType(), epochId);
+ return new SparkDataWriter<>(
+ writer, committer, catalogTable.getSeaTunnelRowType(),
epochId);
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkStreamWriter.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkStreamWriter.java
index 2978560916..bf58f1160d 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkStreamWriter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkStreamWriter.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.translation.spark.sink.writer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -32,9 +33,10 @@ public class SparkStreamWriter<StateT, CommitInfoT,
AggregatedCommitInfoT>
implements StreamWriter {
public SparkStreamWriter(
- SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink)
+ SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink,
+ CatalogTable catalogTable)
throws IOException {
- super(sink);
+ super(sink, catalogTable);
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelBatchWrite.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelBatchWrite.java
index b8c08e5547..a363b88784 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelBatchWrite.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelBatchWrite.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.translation.spark.sink;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.translation.spark.sink.write.SeaTunnelSparkDataWriterFactory;
import
org.apache.seatunnel.translation.spark.sink.write.SeaTunnelSparkWriterCommitMessage;
@@ -44,16 +45,20 @@ public class SeaTunnelBatchWrite<StateT, CommitInfoT,
AggregatedCommitInfoT>
private final SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT>
aggregatedCommitter;
+ private final CatalogTable catalogTable;
+
public SeaTunnelBatchWrite(
- SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink)
+ SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink,
+ CatalogTable catalogTable)
throws IOException {
this.sink = sink;
+ this.catalogTable = catalogTable;
this.aggregatedCommitter =
sink.createAggregatedCommitter().orElse(null);
}
@Override
public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
- return new SeaTunnelSparkDataWriterFactory<>(sink);
+ return new SeaTunnelSparkDataWriterFactory<>(sink, catalogTable);
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkTable.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkTable.java
index 0660163289..be20846fff 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkTable.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkTable.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.translation.spark.sink;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.utils.SerializationUtils;
@@ -45,18 +46,27 @@ public class SeaTunnelSinkTable implements Table,
SupportsWrite {
private final SeaTunnelSink<SeaTunnelRow, ?, ?, ?> sink;
+ private final CatalogTable catalogTable;
+
public SeaTunnelSinkTable(Map<String, String> properties) {
this.properties = properties;
String sinkSerialization =
properties.getOrDefault(Constants.SINK_SERIALIZATION, "");
if (StringUtils.isBlank(sinkSerialization)) {
- throw new IllegalArgumentException("sink.serialization must be
specified");
+ throw new IllegalArgumentException(Constants.SINK_SERIALIZATION +
" must be specified");
}
this.sink = SerializationUtils.stringToObject(sinkSerialization);
+ String sinkCatalogTableSerialization =
+ properties.getOrDefault(SparkSinkInjector.SINK_CATALOG_TABLE,
"");
+ if (StringUtils.isBlank(sinkCatalogTableSerialization)) {
+ throw new IllegalArgumentException(
+ SparkSinkInjector.SINK_CATALOG_TABLE + " must be
specified");
+ }
+ this.catalogTable =
SerializationUtils.stringToObject(sinkCatalogTableSerialization);
}
@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
- return new SeaTunnelWriteBuilder<>(sink);
+ return new SeaTunnelWriteBuilder<>(sink, catalogTable);
}
@Override
@@ -66,7 +76,7 @@ public class SeaTunnelSinkTable implements Table,
SupportsWrite {
@Override
public StructType schema() {
- return (StructType) TypeConverterUtils.convert(sink.getConsumedType());
+ return (StructType)
TypeConverterUtils.convert(catalogTable.getSeaTunnelRowType());
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
index bf53eae6f7..41b0d7153b 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.translation.spark.sink;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.utils.SerializationUtils;
@@ -30,16 +31,24 @@ public class SparkSinkInjector {
private static final String SINK_NAME =
SeaTunnelSink.class.getSimpleName();
+ public static final String SINK_CATALOG_TABLE = "sink.catalog.table";
+
public static DataStreamWriter<Row> inject(
- DataStreamWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink) {
+ DataStreamWriter<Row> dataset,
+ SeaTunnelSink<?, ?, ?, ?> sink,
+ CatalogTable catalogTable) {
return dataset.format(SINK_NAME)
.outputMode(OutputMode.Append())
- .option(Constants.SINK_SERIALIZATION,
SerializationUtils.objectToString(sink));
+ .option(Constants.SINK_SERIALIZATION,
SerializationUtils.objectToString(sink))
+ .option(SINK_CATALOG_TABLE,
SerializationUtils.objectToString(catalogTable));
}
public static DataFrameWriter<Row> inject(
- DataFrameWriter<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink) {
+ DataFrameWriter<Row> dataset,
+ SeaTunnelSink<?, ?, ?, ?> sink,
+ CatalogTable catalogTable) {
return dataset.format(SINK_NAME)
- .option(Constants.SINK_SERIALIZATION,
SerializationUtils.objectToString(sink));
+ .option(Constants.SINK_SERIALIZATION,
SerializationUtils.objectToString(sink))
+ .option(SINK_CATALOG_TABLE,
SerializationUtils.objectToString(catalogTable));
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
index 1cea0226fb..3b06468244 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriterFactory.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -34,10 +35,12 @@ public class SeaTunnelSparkDataWriterFactory<CommitInfoT,
StateT>
implements DataWriterFactory, StreamingDataWriterFactory {
private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink;
+ private final CatalogTable catalogTable;
public SeaTunnelSparkDataWriterFactory(
- SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink) {
+ SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT, ?> sink,
CatalogTable catalogTable) {
this.sink = sink;
+ this.catalogTable = catalogTable;
}
@Override
@@ -55,7 +58,8 @@ public class SeaTunnelSparkDataWriterFactory<CommitInfoT,
StateT>
} catch (IOException e) {
throw new RuntimeException("Failed to create SinkCommitter.", e);
}
- return new SeaTunnelSparkDataWriter<>(writer, committer,
sink.getConsumedType(), 0);
+ return new SeaTunnelSparkDataWriter<>(
+ writer, committer, catalogTable.getSeaTunnelRowType(), 0);
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWrite.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWrite.java
index 47f8fcb49a..9df74612e5 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWrite.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWrite.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.translation.spark.sink.write;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.spark.sink.SeaTunnelBatchWrite;
@@ -30,16 +31,19 @@ import java.io.IOException;
public class SeaTunnelWrite<AggregatedCommitInfoT, CommitInfoT, StateT>
implements Write {
private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink;
+ private final CatalogTable catalogTable;
public SeaTunnelWrite(
- SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink) {
+ SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink,
+ CatalogTable catalogTable) {
this.sink = sink;
+ this.catalogTable = catalogTable;
}
@Override
public BatchWrite toBatch() {
try {
- return new SeaTunnelBatchWrite<>(sink);
+ return new SeaTunnelBatchWrite<>(sink, catalogTable);
} catch (IOException e) {
throw new RuntimeException("SeaTunnel Spark sink create batch
failed", e);
}
@@ -48,7 +52,7 @@ public class SeaTunnelWrite<AggregatedCommitInfoT,
CommitInfoT, StateT> implemen
@Override
public StreamingWrite toStreaming() {
try {
- return new SeaTunnelBatchWrite<>(sink);
+ return new SeaTunnelBatchWrite<>(sink, catalogTable);
} catch (IOException e) {
throw new RuntimeException("SeaTunnel Spark sink create batch
failed", e);
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWriteBuilder.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWriteBuilder.java
index fca4879b4d..6a8f0e12ee 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWriteBuilder.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/write/SeaTunnelWriteBuilder.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.translation.spark.sink.write;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.spark.sql.connector.write.Write;
@@ -27,14 +28,17 @@ public class SeaTunnelWriteBuilder<StateT, CommitInfoT,
AggregatedCommitInfoT>
implements WriteBuilder {
private final SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink;
+ private final CatalogTable catalogTable;
public SeaTunnelWriteBuilder(
- SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink) {
+ SeaTunnelSink<SeaTunnelRow, StateT, CommitInfoT,
AggregatedCommitInfoT> sink,
+ CatalogTable catalogTable) {
this.sink = sink;
+ this.catalogTable = catalogTable;
}
@Override
public Write build() {
- return new SeaTunnelWrite<>(sink);
+ return new SeaTunnelWrite<>(sink, catalogTable);
}
}