This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6b7c53d03c [Feature][Restapi] Allow metrics information to be
associated to logical plan nodes (#7786)
6b7c53d03c is described below
commit 6b7c53d03c82cce55ca8cfa5324830e476390997
Author: Guangdong Liu <[email protected]>
AuthorDate: Fri Oct 11 14:31:24 2024 +0800
[Feature][Restapi] Allow metrics information to be associated to logical
plan nodes (#7786)
---
docs/en/seatunnel-engine/rest-api-v1.md | 43 ++++++++++++++---
docs/en/seatunnel-engine/rest-api-v2.md | 43 ++++++++++++++---
docs/zh/seatunnel-engine/rest-api-v1.md | 44 +++++++++++++----
docs/zh/seatunnel-engine/rest-api-v2.md | 43 ++++++++++++++---
.../apache/seatunnel/api/sink/SeaTunnelSink.java | 10 ++++
.../api/sink/multitablesink/MultiTableSink.java | 20 +++++++-
.../seatunnel/activemq/sink/ActivemqSink.java | 13 ++++-
.../activemq/sink/ActivemqSinkFactory.java | 5 +-
.../amazondynamodb/sink/AmazonDynamoDBSink.java | 7 +++
.../seatunnel/amazonsqs/sink/AmazonSqsSink.java | 17 +++++--
.../amazonsqs/sink/AmazonSqsSinkFactory.java | 3 +-
.../seatunnel/assertion/sink/AssertSink.java | 8 ++++
.../seatunnel/cassandra/sink/CassandraSink.java | 7 +++
.../clickhouse/sink/file/ClickhouseFileSink.java | 6 +++
.../seatunnel/console/sink/ConsoleSink.java | 14 +++++-
.../seatunnel/console/sink/ConsoleSinkFactory.java | 5 +-
.../seatunnel/datahub/sink/DataHubSink.java | 7 +++
.../connectors/seatunnel/sink/DingTalkSink.java | 7 +++
.../seatunnel/connectors/doris/sink/DorisSink.java | 5 ++
.../seatunnel/connectors/druid/sink/DruidSink.java | 6 +++
.../seatunnel/easysearch/sink/EasysearchSink.java | 8 ++++
.../elasticsearch/sink/ElasticsearchSink.java | 5 ++
.../connectors/seatunnel/email/sink/EmailSink.java | 17 +++++--
.../seatunnel/file/cos/sink/CosFileSink.java | 8 ++++
.../seatunnel/file/ftp/sink/FtpFileSink.java | 11 +++++
.../seatunnel/file/hdfs/sink/HdfsFileSink.java | 8 ++++
.../seatunnel/file/oss/jindo/sink/OssFileSink.java | 8 ++++
.../seatunnel/file/local/sink/LocalFileSink.java | 10 ++++
.../seatunnel/file/obs/sink/ObsFileSink.java | 8 ++++
.../seatunnel/file/oss/sink/OssFileSink.java | 11 +++++
.../seatunnel/file/s3/sink/S3FileSink.java | 9 +++-
.../seatunnel/file/sftp/sink/SftpFileSink.java | 11 +++++
.../google/firestore/sink/FirestoreSink.java | 7 +++
.../connectors/seatunnel/hbase/sink/HbaseSink.java | 13 +++--
.../connectors/seatunnel/hive/sink/HiveSink.java | 5 ++
.../connectors/seatunnel/http/sink/HttpSink.java | 13 ++++-
.../seatunnel/http/sink/HttpSinkFactory.java | 3 +-
.../seatunnel/feishu/sink/FeishuSink.java | 13 +++--
.../seatunnel/feishu/sink/FeishuSinkFactory.java | 3 +-
.../seatunnel/wechat/sink/WeChatSink.java | 13 +++--
.../connectors/seatunnel/hudi/sink/HudiSink.java | 5 ++
.../seatunnel/iceberg/sink/IcebergSink.java | 7 ++-
.../seatunnel/influxdb/sink/InfluxDBSink.java | 12 ++++-
.../connectors/seatunnel/iotdb/sink/IoTDBSink.java | 8 ++++
.../connectors/seatunnel/jdbc/sink/JdbcSink.java | 5 ++
.../connectors/seatunnel/kafka/sink/KafkaSink.java | 12 ++++-
.../seatunnel/kafka/sink/KafkaSinkFactory.java | 5 +-
.../connectors/seatunnel/kudu/sink/KuduSink.java | 13 ++++-
.../seatunnel/maxcompute/sink/MaxcomputeSink.java | 8 ++++
.../seatunnel/milvus/sink/MilvusSink.java | 5 ++
.../connectors/seatunnel/neo4j/sink/Neo4jSink.java | 7 +++
.../seatunnel/paimon/sink/PaimonSink.java | 5 ++
.../seatunnel/pulsar/sink/PulsarSink.java | 18 +++++--
.../seatunnel/pulsar/sink/PulsarSinkFactory.java | 5 +-
.../seatunnel/qdrant/sink/QdrantSink.java | 6 +++
.../seatunnel/rabbitmq/sink/RabbitmqSink.java | 7 +++
.../connectors/seatunnel/redis/sink/RedisSink.java | 12 +++--
.../seatunnel/rocketmq/sink/RocketMqSink.java | 7 +++
.../connectors/selectdb/sink/SelectDBSink.java | 6 +++
.../seatunnel/sentry/sink/SentrySink.java | 7 +++
.../connectors/seatunnel/slack/sink/SlackSink.java | 7 +++
.../seatunnel/socket/sink/SocketSink.java | 7 +++
.../seatunnel/starrocks/sink/StarRocksSink.java | 11 +++--
.../seatunnel/tablestore/sink/TablestoreSink.java | 7 +++
.../seatunnel/tdengine/sink/TDengineSink.java | 7 +++
.../seatunnel/typesense/sink/TypesenseSink.java | 9 +++-
.../connector/ConnectorSpecificationCheckTest.java | 12 +++++
.../seatunnel/e2e/sink/inmemory/InMemorySink.java | 5 ++
.../org/apache/seatunnel/engine/e2e/RestApiIT.java | 42 +++++++++++++++++
.../seatunnel/engine/core/job/JobDAGInfo.java | 39 +++++++++++++++
.../seatunnel/engine/core/job/VertexInfo.java | 4 ++
.../seatunnel/engine/server/dag/DAGUtils.java | 55 +++++++++++++++++++++-
.../server/rest/RestHttpGetCommandProcessor.java | 2 +-
.../engine/server/rest/servlet/BaseServlet.java | 3 +-
.../apache/seatunnel/engine/server/TestUtils.java | 26 +++++++---
.../server/checkpoint/CheckpointPlanTest.java | 27 ++++++++---
.../seatunnel/engine/server/dag/TaskTest.java | 27 ++++++++---
.../spark/sink/SeaTunnelSinkWithBuffer.java | 7 +++
78 files changed, 830 insertions(+), 124 deletions(-)
diff --git a/docs/en/seatunnel-engine/rest-api-v1.md
b/docs/en/seatunnel-engine/rest-api-v1.md
index 674542c23e..ec9d8f13b9 100644
--- a/docs/en/seatunnel-engine/rest-api-v1.md
+++ b/docs/en/seatunnel-engine/rest-api-v1.md
@@ -161,10 +161,18 @@ network:
"jobStatus": "",
"createTime": "",
"jobDag": {
- "vertices": [
+ "jobId": "",
+ "vertexInfoMap": [
+ {
+ "vertexId": 1,
+ "type": "",
+ "vertexName": "",
+ "tablePaths": [
+ ""
+ ]
+ }
],
- "edges": [
- ]
+ "pipelineEdges": {}
},
"metrics": {
"sourceReceivedCount": "",
@@ -218,10 +226,18 @@ This API has been deprecated, please use
/hazelcast/rest/maps/job-info/:jobId in
"jobStatus": "",
"createTime": "",
"jobDag": {
- "vertices": [
+ "jobId": "",
+ "vertexInfoMap": [
+ {
+ "vertexId": 1,
+ "type": "",
+ "vertexName": "",
+ "tablePaths": [
+ ""
+ ]
+ }
],
- "edges": [
- ]
+ "pipelineEdges": {}
},
"metrics": {
"SourceReceivedCount": "",
@@ -289,7 +305,20 @@ When we can't get the job info, the response will be:
"errorMsg": null,
"createTime": "",
"finishTime": "",
- "jobDag": "",
+ "jobDag": {
+ "jobId": "",
+ "vertexInfoMap": [
+ {
+ "vertexId": 1,
+ "type": "",
+ "vertexName": "",
+ "tablePaths": [
+ ""
+ ]
+ }
+ ],
+ "pipelineEdges": {}
+ },
"metrics": ""
}
]
diff --git a/docs/en/seatunnel-engine/rest-api-v2.md
b/docs/en/seatunnel-engine/rest-api-v2.md
index 643b4e51d8..e5b9d5d718 100644
--- a/docs/en/seatunnel-engine/rest-api-v2.md
+++ b/docs/en/seatunnel-engine/rest-api-v2.md
@@ -128,10 +128,18 @@ seatunnel:
"jobStatus": "",
"createTime": "",
"jobDag": {
- "vertices": [
+ "jobId": "",
+ "vertexInfoMap": [
+ {
+ "vertexId": 1,
+ "type": "",
+ "vertexName": "",
+ "tablePaths": [
+ ""
+ ]
+ }
],
- "edges": [
- ]
+ "pipelineEdges": {}
},
"metrics": {
"sourceReceivedCount": "",
@@ -185,10 +193,18 @@ This API has been deprecated, please use /job-info/:jobId
instead
"jobStatus": "",
"createTime": "",
"jobDag": {
- "vertices": [
+ "jobId": "",
+ "vertexInfoMap": [
+ {
+ "vertexId": 1,
+ "type": "",
+ "vertexName": "",
+ "tablePaths": [
+ ""
+ ]
+ }
],
- "edges": [
- ]
+ "pipelineEdges": {}
},
"metrics": {
"SourceReceivedCount": "",
@@ -256,7 +272,20 @@ When we can't get the job info, the response will be:
"errorMsg": null,
"createTime": "",
"finishTime": "",
- "jobDag": "",
+ "jobDag": {
+ "jobId": "",
+ "vertexInfoMap": [
+ {
+ "vertexId": 1,
+ "type": "",
+ "vertexName": "",
+ "tablePaths": [
+ ""
+ ]
+ }
+ ],
+ "pipelineEdges": {}
+ },
"metrics": ""
}
]
diff --git a/docs/zh/seatunnel-engine/rest-api-v1.md
b/docs/zh/seatunnel-engine/rest-api-v1.md
index 639a7318cd..5154922ec0 100644
--- a/docs/zh/seatunnel-engine/rest-api-v1.md
+++ b/docs/zh/seatunnel-engine/rest-api-v1.md
@@ -159,10 +159,18 @@ network:
"jobStatus": "",
"createTime": "",
"jobDag": {
- "vertices": [
+ "jobId": "",
+ "vertexInfoMap": [
+ {
+ "vertexId": 1,
+ "type": "",
+ "vertexName": "",
+ "tablePaths": [
+ ""
+ ]
+ }
],
- "edges": [
- ]
+ "pipelineEdges": {}
},
"metrics": {
"SourceReceivedCount": "",
@@ -230,10 +238,18 @@ network:
"jobStatus": "",
"createTime": "",
"jobDag": {
- "vertices": [
+ "jobId": "",
+ "vertexInfoMap": [
+ {
+ "vertexId": 1,
+ "type": "",
+ "vertexName": "",
+ "tablePaths": [
+ ""
+ ]
+ }
],
- "edges": [
- ]
+ "pipelineEdges": {}
},
"metrics": {
"sourceReceivedCount": "",
@@ -287,8 +303,20 @@ network:
"errorMsg": null,
"createTime": "",
"finishTime": "",
- "jobDag": "",
- "metrics": ""
+ "jobDag": {
+ "jobId": "",
+ "vertexInfoMap": [
+ {
+ "vertexId": 1,
+ "type": "",
+ "vertexName": "",
+ "tablePaths": [
+ ""
+ ]
+ }
+ ],
+ "pipelineEdges": {}
+ }, "metrics": ""
}
]
```
diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md
b/docs/zh/seatunnel-engine/rest-api-v2.md
index cdd27595bf..df884fa18e 100644
--- a/docs/zh/seatunnel-engine/rest-api-v2.md
+++ b/docs/zh/seatunnel-engine/rest-api-v2.md
@@ -124,10 +124,18 @@ seatunnel:
"jobStatus": "",
"createTime": "",
"jobDag": {
- "vertices": [
+ "jobId": "",
+ "vertexInfoMap": [
+ {
+ "vertexId": 1,
+ "type": "",
+ "vertexName": "",
+ "tablePaths": [
+ ""
+ ]
+ }
],
- "edges": [
- ]
+ "pipelineEdges": {}
},
"metrics": {
"SourceReceivedCount": "",
@@ -195,10 +203,18 @@ seatunnel:
"jobStatus": "",
"createTime": "",
"jobDag": {
- "vertices": [
+ "jobId": "",
+ "vertexInfoMap": [
+ {
+ "vertexId": 1,
+ "type": "",
+ "vertexName": "",
+ "tablePaths": [
+ ""
+ ]
+ }
],
- "edges": [
- ]
+ "pipelineEdges": {}
},
"metrics": {
"sourceReceivedCount": "",
@@ -252,7 +268,20 @@ seatunnel:
"errorMsg": null,
"createTime": "",
"finishTime": "",
- "jobDag": "",
+ "jobDag": {
+ "jobId": "",
+ "vertexInfoMap": [
+ {
+ "vertexId": 1,
+ "type": "",
+ "vertexName": "",
+ "tablePaths": [
+ ""
+ ]
+ }
+ ],
+ "pipelineEdges": {}
+ },
"metrics": ""
}
]
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index cd869a3ca8..954bec748c 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -21,6 +21,7 @@ import
org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.SeaTunnelJobAware;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -135,4 +136,13 @@ public interface SeaTunnelSink<IN, StateT, CommitInfoT,
AggregatedCommitInfoT>
default Optional<Serializer<AggregatedCommitInfoT>>
getAggregatedCommitInfoSerializer() {
return Optional.empty();
}
+
+ /**
+ * Get the catalog table of the sink.
+ *
+ * @return Optional of catalog table.
+ */
+ default Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.empty();
+ }
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
index 3db7a8b7d2..2c3e9c6582 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -32,6 +33,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import lombok.Getter;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -157,7 +159,18 @@ public class MultiTableSink
}
public List<TablePath> getSinkTables() {
- return
sinks.keySet().stream().map(TablePath::of).collect(Collectors.toList());
+
+ List<TablePath> tablePaths = new ArrayList<>();
+ List<SeaTunnelSink> values = new ArrayList<>(sinks.values());
+ for (int i = 0; i < values.size(); i++) {
+ if (values.get(i).getWriteCatalogTable().isPresent()) {
+ tablePaths.add(
+ ((CatalogTable)
values.get(i).getWriteCatalogTable().get()).getTablePath());
+ } else {
+ tablePaths.add(TablePath.of(sinks.keySet().toArray(new
String[0])[i]));
+ }
+ }
+ return tablePaths;
}
@Override
@@ -170,4 +183,9 @@ public class MultiTableSink
public void setJobContext(JobContext jobContext) {
sinks.values().forEach(sink -> sink.setJobContext(jobContext));
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return SeaTunnelSink.super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSink.java
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSink.java
index d1d3701795..85ecb347a7 100644
---
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSink.java
+++
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSink.java
@@ -19,25 +19,29 @@ package
org.apache.seatunnel.connectors.seatunnel.activemq.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import java.io.IOException;
+import java.util.Optional;
public class ActivemqSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
private final SeaTunnelRowType seaTunnelRowType;
private final ReadonlyConfig pluginConfig;
+ private final CatalogTable catalogTable;
@Override
public String getPluginName() {
return "ActiveMQ";
}
- public ActivemqSink(ReadonlyConfig pluginConfig, SeaTunnelRowType rowType)
{
+ public ActivemqSink(ReadonlyConfig pluginConfig, CatalogTable
catalogTable) {
this.pluginConfig = pluginConfig;
- this.seaTunnelRowType = rowType;
+ this.catalogTable = catalogTable;
+ this.seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
}
@Override
@@ -45,4 +49,9 @@ public class ActivemqSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
throws IOException {
return new ActivemqSinkWriter(pluginConfig, seaTunnelRowType);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.of(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkFactory.java
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkFactory.java
index 7f0dca38f6..ec40d648ae 100644
---
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkFactory.java
@@ -75,9 +75,6 @@ public class ActivemqSinkFactory implements TableSinkFactory {
@Override
public TableSink createSink(TableSinkFactoryContext context) {
- return () ->
- new ActivemqSink(
- context.getOptions(),
-
context.getCatalogTable().getTableSchema().toPhysicalRowDataType());
+ return () -> new ActivemqSink(context.getOptions(),
context.getCatalogTable());
}
}
diff --git
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
index f26b388cd7..68dcc84a42 100644
---
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
+++
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -36,6 +37,7 @@ import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import com.google.auto.service.AutoService;
import java.io.IOException;
+import java.util.Optional;
import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.ACCESS_KEY_ID;
import static
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.REGION;
@@ -85,4 +87,9 @@ public class AmazonDynamoDBSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
throws IOException {
return new AmazonDynamoDBWriter(amazondynamodbSourceOptions, rowType);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSink.java
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSink.java
index 04cbdba2a5..9952217f07 100644
---
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSink.java
+++
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSink.java
@@ -19,25 +19,29 @@ package
org.apache.seatunnel.connectors.seatunnel.amazonsqs.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import java.io.IOException;
+import java.util.Optional;
public class AmazonSqsSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
- private SeaTunnelRowType typeInfo;
- private ReadonlyConfig pluginConfig;
+ private final SeaTunnelRowType typeInfo;
+ private final ReadonlyConfig pluginConfig;
+ private final CatalogTable catalogTable;
@Override
public String getPluginName() {
return "AmazonSqs";
}
- public AmazonSqsSink(ReadonlyConfig pluginConfig, SeaTunnelRowType
typeInfo) {
- this.typeInfo = typeInfo;
+ public AmazonSqsSink(ReadonlyConfig pluginConfig, CatalogTable
catalogTable) {
+ this.typeInfo = catalogTable.getTableSchema().toPhysicalRowDataType();
this.pluginConfig = pluginConfig;
+ this.catalogTable = catalogTable;
}
@Override
@@ -45,4 +49,9 @@ public class AmazonSqsSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
throws IOException {
return new AmazonSqsSinkWriter(typeInfo, pluginConfig);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.of(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkFactory.java
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkFactory.java
index 81ccf9a37b..030e9d221a 100644
---
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkFactory.java
@@ -41,8 +41,7 @@ public class AmazonSqsSinkFactory implements TableSinkFactory
{
public TableSink createSink(TableSinkFactoryContext context) {
ReadonlyConfig config = context.getOptions();
CatalogTable catalogTable = context.getCatalogTable();
- return () ->
- new AmazonSqsSink(config,
catalogTable.getTableSchema().toPhysicalRowDataType());
+ return () -> new AmazonSqsSink(config, catalogTable);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index 927943adc2..e84b6fbcb2 100644
---
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -39,6 +39,7 @@ import com.google.common.base.Throwables;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import static
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.CATALOG_TABLE_RULES;
@@ -56,6 +57,7 @@ public class AssertSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
private final AssertTableRule assertTableRule;
private final Map<String, AssertCatalogTableRule> assertCatalogTableRule;
private final String catalogTableName;
+ private final CatalogTable catalogTable;
public AssertSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
@@ -93,6 +95,7 @@ public class AssertSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
new ConfigException.BadValue(
RULES.key(), "Assert rule config is empty, please
add rule config."));
}
+ this.catalogTable = catalogTable;
}
private void initTableRule(CatalogTable catalogTable, Config tableConfig,
String tableName) {
@@ -130,4 +133,9 @@ public class AssertSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
public String getPluginName() {
return "Assert";
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.of(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
index 09a50b8c95..9b37d94266 100644
---
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
+++
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -43,6 +44,7 @@ import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.HOST;
import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.KEYSPACE;
@@ -122,4 +124,9 @@ public class CassandraSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
throws IOException {
return new CassandraSinkWriter(cassandraParameters, seaTunnelRowType,
tableSchema);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index cc63179f16..bb445d4282 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -212,4 +213,9 @@ public class ClickhouseFileSink
public Optional<Serializer<CKFileAggCommitInfo>>
getAggregatedCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return SeaTunnelSink.super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index 62ebab6a9f..6e7b78e195 100644
---
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -20,10 +20,13 @@ package
org.apache.seatunnel.connectors.seatunnel.console.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import java.util.Optional;
+
import static
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DATA;
import static
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DELAY;
@@ -32,11 +35,13 @@ public class ConsoleSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
private final SeaTunnelRowType seaTunnelRowType;
private final boolean isPrintData;
private final int delayMs;
+ private final CatalogTable catalogTable;
- public ConsoleSink(SeaTunnelRowType seaTunnelRowType, ReadonlyConfig
options) {
- this.seaTunnelRowType = seaTunnelRowType;
+ public ConsoleSink(CatalogTable catalogTable, ReadonlyConfig options) {
+ this.catalogTable = catalogTable;
this.isPrintData = options.get(LOG_PRINT_DATA);
this.delayMs = options.get(LOG_PRINT_DELAY);
+ this.seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
}
@Override
@@ -48,4 +53,9 @@ public class ConsoleSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
public String getPluginName() {
return "Console";
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
index fa5c7deae9..72987032bc 100644
---
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
@@ -62,9 +62,6 @@ public class ConsoleSinkFactory implements TableSinkFactory {
@Override
public TableSink createSink(TableSinkFactoryContext context) {
ReadonlyConfig options = context.getOptions();
- return () ->
- new ConsoleSink(
-
context.getCatalogTable().getTableSchema().toPhysicalRowDataType(),
- options);
+ return () -> new ConsoleSink(context.getCatalogTable(), options);
}
}
diff --git
a/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSink.java
b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSink.java
index 8eeffb3aca..b22c236e7a 100644
---
a/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSink.java
+++
b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter.Context;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -35,6 +36,7 @@ import
org.apache.seatunnel.connectors.seatunnel.datahub.exception.DataHubConnec
import com.google.auto.service.AutoService;
import java.io.IOException;
+import java.util.Optional;
import static
org.apache.seatunnel.connectors.seatunnel.datahub.config.DataHubConfig.ACCESS_ID;
import static
org.apache.seatunnel.connectors.seatunnel.datahub.config.DataHubConfig.ACCESS_KEY;
@@ -93,4 +95,9 @@ public class DataHubSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
pluginConfig.getInt(TIMEOUT.key()),
pluginConfig.getInt(RETRY_TIMES.key()));
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
index 8d9d4b973e..42ec1e688a 100644
---
a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
+++
b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter.Context;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -32,6 +33,7 @@ import
org.apache.seatunnel.connectors.seatunnel.exception.DingTalkConnectorExce
import com.google.auto.service.AutoService;
import java.io.IOException;
+import java.util.Optional;
import static
org.apache.seatunnel.connectors.seatunnel.config.DingTalkConfig.SECRET;
import static
org.apache.seatunnel.connectors.seatunnel.config.DingTalkConfig.URL;
@@ -73,4 +75,9 @@ public class DingTalkSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
return new DingTalkWriter(
pluginConfig.getString(URL.key()),
pluginConfig.getString(SECRET.key()));
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
index c746fea00c..c0a9a2a5a1 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
@@ -133,4 +133,9 @@ public class DorisSink
catalogTable,
config.get(DorisOptions.CUSTOM_SQL)));
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.of(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java
index ad515aeeb7..786ab56e2c 100644
---
a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java
+++
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import java.io.IOException;
+import java.util.Optional;
import static
org.apache.seatunnel.connectors.druid.config.DruidConfig.BATCH_SIZE;
import static
org.apache.seatunnel.connectors.druid.config.DruidConfig.COORDINATOR_URL;
@@ -58,4 +59,9 @@ public class DruidSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
config.get(DATASOURCE),
config.get(BATCH_SIZE));
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSink.java
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSink.java
index d96f7eb6b5..0eb4fbcd48 100644
---
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSink.java
+++
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSink.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.easysearch.state.EasysearchAggregatedCommitInfo;
@@ -30,6 +31,8 @@ import
org.apache.seatunnel.connectors.seatunnel.easysearch.state.EasysearchSink
import com.google.auto.service.AutoService;
+import java.util.Optional;
+
import static
org.apache.seatunnel.connectors.seatunnel.easysearch.config.SinkConfig.MAX_BATCH_SIZE;
import static
org.apache.seatunnel.connectors.seatunnel.easysearch.config.SinkConfig.MAX_RETRY_COUNT;
@@ -75,4 +78,9 @@ public class EasysearchSink
return new EasysearchSinkWriter(
context, seaTunnelRowType, pluginConfig, maxBatchSize,
maxRetryCount);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return SeaTunnelSink.super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
index fed65733e0..ffe2b0520b 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -95,4 +95,9 @@ public class ElasticsearchSink
new DefaultSaveModeHandler(
schemaSaveMode, dataSaveMode, catalog, tablePath,
null, null));
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
index 0a3df90a12..24f9c2295f 100644
---
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
+++
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
@@ -27,13 +27,17 @@ import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig;
import org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkConfig;
+import lombok.Getter;
+
+import java.util.Optional;
+
public class EmailSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportMultiTableSink {
- private SeaTunnelRowType seaTunnelRowType;
- private ReadonlyConfig readonlyConfig;
- private CatalogTable catalogTable;
- private EmailSinkConfig pluginConfig;
+ private final SeaTunnelRowType seaTunnelRowType;
+ @Getter private ReadonlyConfig readonlyConfig;
+ private final CatalogTable catalogTable;
+ private final EmailSinkConfig pluginConfig;
public EmailSink(ReadonlyConfig config, CatalogTable table) {
this.readonlyConfig = config;
@@ -51,4 +55,9 @@ public class EmailSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
public String getPluginName() {
return EmailConfig.CONNECTOR_IDENTITY;
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.of(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSink.java
index 9a1885da4f..8783c268ba 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSink.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
@@ -33,6 +34,8 @@ import
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
import com.google.auto.service.AutoService;
+import java.util.Optional;
+
@AutoService(SeaTunnelSink.class)
public class CosFileSink extends BaseFileSink {
@Override
@@ -60,4 +63,9 @@ public class CosFileSink extends BaseFileSink {
}
hadoopConf = CosConf.buildWithConfig(pluginConfig);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
index f4b271e035..ac481be25b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
@@ -23,7 +23,12 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;
+import java.util.Optional;
+
public class FtpFileSink extends BaseMultipleTableFileSink {
+
+ private final CatalogTable catalogTable;
+
@Override
public String getPluginName() {
return FileSystemType.FTP.getFileSystemPluginName();
@@ -31,5 +36,11 @@ public class FtpFileSink extends BaseMultipleTableFileSink {
public FtpFileSink(ReadonlyConfig readonlyConfig, CatalogTable
catalogTable) {
super(FtpConf.buildWithConfig(readonlyConfig), readonlyConfig,
catalogTable);
+ this.catalogTable = catalogTable;
+ }
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
index 26c0f4f049..5e098ea2d2 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
@@ -21,10 +21,13 @@ import
org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import com.google.auto.service.AutoService;
+import java.util.Optional;
+
@AutoService(SeaTunnelSink.class)
public class HdfsFileSink extends BaseHdfsFileSink {
@@ -37,4 +40,9 @@ public class HdfsFileSink extends BaseHdfsFileSink {
public void prepare(Config pluginConfig) throws PrepareFailException {
super.prepare(pluginConfig);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSink.java
index ac6ee94992..03663d5c76 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSink.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
@@ -33,6 +34,8 @@ import
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
import com.google.auto.service.AutoService;
+import java.util.Optional;
+
@AutoService(SeaTunnelSink.class)
public class OssFileSink extends BaseFileSink {
@Override
@@ -60,4 +63,9 @@ public class OssFileSink extends BaseFileSink {
}
hadoopConf = OssConf.buildWithConfig(pluginConfig);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
index 94741941bf..4042843d50 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
@@ -23,14 +23,24 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;
+import java.util.Optional;
+
public class LocalFileSink extends BaseMultipleTableFileSink {
+ private final CatalogTable catalogTable;
+
public LocalFileSink(ReadonlyConfig readonlyConfig, CatalogTable
catalogTable) {
super(new LocalFileHadoopConf(), readonlyConfig, catalogTable);
+ this.catalogTable = catalogTable;
}
@Override
public String getPluginName() {
return FileSystemType.LOCAL.getFileSystemPluginName();
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
index 8f303b6a45..67a17fc954 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
@@ -33,6 +34,8 @@ import
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
import com.google.auto.service.AutoService;
+import java.util.Optional;
+
@AutoService(SeaTunnelSink.class)
public class ObsFileSink extends BaseFileSink {
@Override
@@ -60,4 +63,9 @@ public class ObsFileSink extends BaseFileSink {
}
hadoopConf = ObsConf.buildWithConfig(pluginConfig);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
index de4726fd5c..11a3df2942 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
@@ -23,13 +23,24 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssHadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;
+import java.util.Optional;
+
public class OssFileSink extends BaseMultipleTableFileSink {
+
+ private final CatalogTable catalogTable;
+
public OssFileSink(ReadonlyConfig readonlyConfig, CatalogTable
catalogTable) {
super(OssHadoopConf.buildWithConfig(readonlyConfig), readonlyConfig,
catalogTable);
+ this.catalogTable = catalogTable;
}
@Override
public String getPluginName() {
return FileSystemType.OSS.getFileSystemPluginName();
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
index 2a636bcbcc..b0b6d9fbbb 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
@@ -44,8 +44,8 @@ import static
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory
public class S3FileSink extends BaseMultipleTableFileSink implements
SupportSaveMode {
- private CatalogTable catalogTable;
- private ReadonlyConfig readonlyConfig;
+ private final CatalogTable catalogTable;
+ private final ReadonlyConfig readonlyConfig;
private static final String S3 = "S3";
@@ -89,4 +89,9 @@ public class S3FileSink extends BaseMultipleTableFileSink
implements SupportSave
new DefaultSaveModeHandler(
schemaSaveMode, dataSaveMode, catalog, catalogTable,
null));
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java
index dd3f1080b8..415ae3a5d6 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java
@@ -23,13 +23,24 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConf;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;
+import java.util.Optional;
+
public class SftpFileSink extends BaseMultipleTableFileSink {
+
+ private final CatalogTable catalogTable;
+
public SftpFileSink(ReadonlyConfig readonlyConfig, CatalogTable
catalogTable) {
super(SftpConf.buildWithConfig(readonlyConfig), readonlyConfig,
catalogTable);
+ this.catalogTable = catalogTable;
}
@Override
public String getPluginName() {
return FileSystemType.SFTP.getFileSystemPluginName();
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
index ab7c02057d..6149ba9358 100644
---
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
+++
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -36,6 +37,7 @@ import
org.apache.seatunnel.connectors.seatunnel.google.firestore.exception.Fire
import com.google.auto.service.AutoService;
import java.io.IOException;
+import java.util.Optional;
import static
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.COLLECTION;
import static
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.PROJECT_ID;
@@ -76,4 +78,9 @@ public class FirestoreSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
throws IOException {
return new FirestoreSinkWriter(rowType, firestoreParameters);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
index 0a46b1baef..e8d7b8b205 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
@@ -52,15 +52,15 @@ public class HbaseSink
SupportMultiTableSink,
SupportSaveMode {
- private ReadonlyConfig config;
+ private final ReadonlyConfig config;
- private CatalogTable catalogTable;
+ private final CatalogTable catalogTable;
private final HbaseParameters hbaseParameters;
- private SeaTunnelRowType seaTunnelRowType;
+ private final SeaTunnelRowType seaTunnelRowType;
- private List<Integer> rowkeyColumnIndexes = new ArrayList<>();
+ private final List<Integer> rowkeyColumnIndexes = new ArrayList<>();
private int versionColumnIndex = -1;
@@ -110,4 +110,9 @@ public class HbaseSink
new DefaultSaveModeHandler(
schemaSaveMode, dataSaveMode, catalog, tablePath,
null, null));
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index b5602c13f8..997c42f9fa 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -244,4 +244,9 @@ public class HiveSink
}
return writeStrategy;
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
index 9dfe688c11..5ac6b927ca 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -34,15 +35,17 @@ import
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorExc
import java.io.IOException;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
public class HttpSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportMultiTableSink {
protected final HttpParameter httpParameter = new HttpParameter();
+ protected CatalogTable catalogTable;
protected SeaTunnelRowType seaTunnelRowType;
protected Config pluginConfig;
- public HttpSink(Config pluginConfig, SeaTunnelRowType rowType) {
+ public HttpSink(Config pluginConfig, CatalogTable catalogTable) {
this.pluginConfig = pluginConfig;
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
HttpConfig.URL.key());
if (!result.isSuccess()) {
@@ -71,7 +74,8 @@ public class HttpSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
entry ->
String.valueOf(entry.getValue().unwrapped()),
(v1, v2) -> v2)));
}
- this.seaTunnelRowType = rowType;
+ this.catalogTable = catalogTable;
+ this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
}
@Override
@@ -83,4 +87,9 @@ public class HttpSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
public HttpSinkWriter createWriter(SinkWriter.Context context) throws
IOException {
return new HttpSinkWriter(seaTunnelRowType, httpParameter);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
index 313d26dd3f..6ed6765d57 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
@@ -38,8 +38,7 @@ public class HttpSinkFactory implements TableSinkFactory {
@Override
public TableSink createSink(TableSinkFactoryContext context) {
CatalogTable catalogTable = context.getCatalogTable();
- return () ->
- new HttpSink(context.getOptions().toConfig(),
catalogTable.getSeaTunnelRowType());
+ return () -> new HttpSink(context.getOptions().toConfig(),
catalogTable);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSink.java
b/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSink.java
index b3fbaa6a5b..25af9636c8 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSink.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSink.java
@@ -19,16 +19,23 @@ package
org.apache.seatunnel.connectors.seatunnel.feishu.sink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSink;
+import java.util.Optional;
+
public class FeishuSink extends HttpSink {
- public FeishuSink(Config pluginConfig, SeaTunnelRowType rowType) {
- super(pluginConfig, rowType);
+ public FeishuSink(Config pluginConfig, CatalogTable catalogTable) {
+ super(pluginConfig, catalogTable);
}
@Override
public String getPluginName() {
return "Feishu";
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java
b/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java
index f9cd6ee01c..3052ba78d1 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java
@@ -31,8 +31,7 @@ public class FeishuSinkFactory extends HttpSinkFactory {
@Override
public TableSink createSink(TableSinkFactoryContext context) {
CatalogTable catalogTable = context.getCatalogTable();
- return () ->
- new FeishuSink(context.getOptions().toConfig(),
catalogTable.getSeaTunnelRowType());
+ return () -> new FeishuSink(context.getOptions().toConfig(),
catalogTable);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
index f438167c39..a3e910b620 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
@@ -20,15 +20,17 @@ package
org.apache.seatunnel.connectors.seatunnel.wechat.sink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSink;
import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSinkWriter;
import
org.apache.seatunnel.connectors.seatunnel.wechat.sink.config.WeChatSinkConfig;
+import java.util.Optional;
+
public class WeChatSink extends HttpSink {
- public WeChatSink(Config pluginConfig, SeaTunnelRowType rowType) {
- super(pluginConfig, rowType);
+ public WeChatSink(Config pluginConfig, CatalogTable catalogTable) {
+ super(pluginConfig, catalogTable);
}
@Override
@@ -44,4 +46,9 @@ public class WeChatSink extends HttpSink {
new WeChatBotMessageSerializationSchema(
new WeChatSinkConfig(pluginConfig), seaTunnelRowType));
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
index 5bdc3b8c3a..13c245336a 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
@@ -143,4 +143,9 @@ public class HudiSink
catalogTable,
null));
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
index 417d92ac49..bdced4b52b 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
@@ -55,7 +55,7 @@ public class IcebergSink
IcebergAggregatedCommitInfo>,
SupportSaveMode,
SupportMultiTableSink {
- private static String PLUGIN_NAME = "Iceberg";
+ private static final String PLUGIN_NAME = "Iceberg";
private final SinkConfig config;
private final ReadonlyConfig readonlyConfig;
private final CatalogTable catalogTable;
@@ -133,4 +133,9 @@ public class IcebergSink
catalogTable,
null));
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
index 4d940f63cc..840379f7bd 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
@@ -26,12 +26,14 @@ import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
import java.io.IOException;
+import java.util.Optional;
public class InfluxDBSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportMultiTableSink {
- private SeaTunnelRowType seaTunnelRowType;
- private SinkConfig sinkConfig;
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final SinkConfig sinkConfig;
+ private final CatalogTable catalogTable;
@Override
public String getPluginName() {
@@ -41,10 +43,16 @@ public class InfluxDBSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
public InfluxDBSink(SinkConfig sinkConfig, CatalogTable catalogTable) {
this.sinkConfig = sinkConfig;
this.seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
+ this.catalogTable = catalogTable;
}
@Override
public InfluxDBSinkWriter createWriter(SinkWriter.Context context) throws
IOException {
return new InfluxDBSinkWriter(sinkConfig, seaTunnelRowType);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
index 263ab04973..65c29d57cb 100644
---
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
+++
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -34,6 +35,8 @@ import
org.apache.seatunnel.connectors.seatunnel.iotdb.exception.IotdbConnectorE
import com.google.auto.service.AutoService;
+import java.util.Optional;
+
import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.KEY_DEVICE;
import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.NODE_URLS;
import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.PASSWORD;
@@ -78,4 +81,9 @@ public class IoTDBSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) {
return new IoTDBSinkWriter(pluginConfig, seaTunnelRowType);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 2ccfba19f2..b35150900f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -237,4 +237,9 @@ public class JdbcSink
}
return Optional.empty();
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index e7945d9ed1..4deb30f547 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedCommitInfo;
@@ -43,10 +44,12 @@ public class KafkaSink
private final ReadonlyConfig pluginConfig;
private final SeaTunnelRowType seaTunnelRowType;
+ private final CatalogTable catalogTable;
- public KafkaSink(ReadonlyConfig pluginConfig, SeaTunnelRowType rowType) {
+ public KafkaSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
this.pluginConfig = pluginConfig;
- this.seaTunnelRowType = rowType;
+ this.catalogTable = catalogTable;
+ this.seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
}
@Override
@@ -81,4 +84,9 @@ public class KafkaSink
public String getPluginName() {
return
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONNECTOR_IDENTITY;
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index fe6965132d..ed3278602a 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -50,9 +50,6 @@ public class KafkaSinkFactory implements TableSinkFactory {
@Override
public TableSink createSink(TableSinkFactoryContext context) {
- return () ->
- new KafkaSink(
- context.getOptions(),
-
context.getCatalogTable().getTableSchema().toPhysicalRowDataType());
+ return () -> new KafkaSink(context.getOptions(),
context.getCatalogTable());
}
}
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
index def4a2b366..d56a08db43 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
@@ -30,6 +30,7 @@ import
org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSinkState;
import java.io.IOException;
+import java.util.Optional;
/**
* Kudu Sink implementation by using SeaTunnel sink API. This class contains
the method to create
@@ -40,11 +41,14 @@ public class KuduSink
SeaTunnelRow, KuduSinkState, KuduCommitInfo,
KuduAggregatedCommitInfo>,
SupportMultiTableSink {
- private KuduSinkConfig kuduSinkConfig;
- private SeaTunnelRowType seaTunnelRowType;
+ private final KuduSinkConfig kuduSinkConfig;
+ private final SeaTunnelRowType seaTunnelRowType;
+
+ private final CatalogTable catalogTable;
public KuduSink(KuduSinkConfig kuduSinkConfig, CatalogTable catalogTable) {
this.kuduSinkConfig = kuduSinkConfig;
+ this.catalogTable = catalogTable;
this.seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
}
@@ -57,4 +61,9 @@ public class KuduSink
public KuduSinkWriter createWriter(SinkWriter.Context context) throws
IOException {
return new KuduSinkWriter(seaTunnelRowType, kuduSinkConfig);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
index 6abce7e417..91e8c12dca 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -33,6 +34,8 @@ import org.slf4j.LoggerFactory;
import com.google.auto.service.AutoService;
+import java.util.Optional;
+
import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
@AutoService(SeaTunnelSink.class)
@@ -61,4 +64,9 @@ public class MaxcomputeSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) {
return new MaxcomputeWriter(this.pluginConfig, this.typeInfo);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
index 2015be1973..10f4b6ca69 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
@@ -112,4 +112,9 @@ public class MilvusSink
catalogTable,
null));
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
index 26f127509e..c3af6c7a90 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkQueryInfo;
@@ -29,6 +30,7 @@ import
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkQueryInfo
import com.google.auto.service.AutoService;
import java.io.IOException;
+import java.util.Optional;
import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.PLUGIN_NAME;
@@ -58,4 +60,9 @@ public class Neo4jSink implements SeaTunnelSink<SeaTunnelRow,
Void, Void, Void>
throws IOException {
return new Neo4jSinkWriter(neo4JSinkQueryInfo, rowType);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return SeaTunnelSink.super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index fbf04a5038..73d2151b89 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -158,4 +158,9 @@ public class PaimonSink
public void setLoadTable(Table table) {
this.table = table;
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java
index 05a007df9a..989e24b024 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
@@ -47,13 +48,15 @@ public class PulsarSink
implements SeaTunnelSink<
SeaTunnelRow, PulsarSinkState, PulsarCommitInfo,
PulsarAggregatedCommitInfo> {
- private SeaTunnelRowType seaTunnelRowType;
- private PulsarClientConfig clientConfig;
- private ReadonlyConfig readonlyConfig;
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final PulsarClientConfig clientConfig;
+ private final ReadonlyConfig readonlyConfig;
+ private final CatalogTable catalogTable;
- public PulsarSink(ReadonlyConfig readonlyConfig, SeaTunnelRowType
seaTunnelRowType) {
+ public PulsarSink(ReadonlyConfig readonlyConfig, CatalogTable
catalogTable) {
this.readonlyConfig = readonlyConfig;
- this.seaTunnelRowType = seaTunnelRowType;
+ this.seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
+ this.catalogTable = catalogTable;
/** client config */
PulsarClientConfig.Builder clientConfigBuilder =
@@ -96,4 +99,9 @@ public class PulsarSink
public String getPluginName() {
return PulsarConfigUtil.IDENTIFIER;
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java
index c5b13e2876..7781ba7b94 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java
@@ -53,9 +53,6 @@ public class PulsarSinkFactory implements TableSinkFactory {
@Override
public TableSink createSink(TableSinkFactoryContext context) {
- return () ->
- new PulsarSink(
- context.getOptions(),
-
context.getCatalogTable().getTableSchema().toPhysicalRowDataType());
+ return () -> new PulsarSink(context.getOptions(),
context.getCatalogTable());
}
}
diff --git
a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSink.java
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSink.java
index 85119032c8..16904903e5 100644
---
a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSink.java
+++
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSink.java
@@ -27,6 +27,7 @@ import
org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantConfig;
import
org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantParameters;
import java.io.IOException;
+import java.util.Optional;
public class QdrantSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportMultiTableSink {
@@ -47,4 +48,9 @@ public class QdrantSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
public QdrantSinkWriter createWriter(SinkWriter.Context context) throws
IOException {
return new QdrantSinkWriter(catalogTable, qdrantParameters);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
index 1dcd792404..7d4f26272b 100644
---
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
+++
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -36,6 +37,7 @@ import
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConn
import com.google.auto.service.AutoService;
import java.io.IOException;
+import java.util.Optional;
import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST;
import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD;
@@ -88,4 +90,9 @@ public class RabbitmqSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
throws IOException {
return new RabbitmqSinkWriter(rabbitMQConfig, seaTunnelRowType);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
index a87ee1ebf7..ddb1901205 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
@@ -28,13 +28,14 @@ import
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import java.io.IOException;
+import java.util.Optional;
public class RedisSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportMultiTableSink {
private final RedisParameters redisParameters = new RedisParameters();
- private SeaTunnelRowType seaTunnelRowType;
- private ReadonlyConfig readonlyConfig;
- private CatalogTable catalogTable;
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final ReadonlyConfig readonlyConfig;
+ private final CatalogTable catalogTable;
public RedisSink(ReadonlyConfig config, CatalogTable table) {
this.readonlyConfig = config;
@@ -52,4 +53,9 @@ public class RedisSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
public RedisSinkWriter createWriter(SinkWriter.Context context) throws
IOException {
return new RedisSinkWriter(seaTunnelRowType, redisParameters);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
index bf81adf0a7..9fda05dece 100644
---
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -38,6 +39,7 @@ import
org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConn
import com.google.auto.service.AutoService;
import java.io.IOException;
+import java.util.Optional;
import static
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACCESS_KEY;
import static
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACL_ENABLED;
@@ -161,4 +163,9 @@ public class RocketMqSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
throws IOException {
return new RocketMqSinkWriter(producerMetadata, seaTunnelRowType);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
index f4ef83d0b2..33222116cc 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -140,4 +141,9 @@ public class SelectDBSink
public Optional<Serializer<SelectDBCommitInfo>>
getAggregatedCommitInfoSerializer() {
return Optional.empty();
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return SeaTunnelSink.super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
index 0a109dfa41..1298715633 100644
---
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
+++
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.PluginType;
@@ -34,6 +35,7 @@ import
org.apache.seatunnel.connectors.seatunnel.sentry.exception.SentryConnecto
import com.google.auto.service.AutoService;
import java.io.IOException;
+import java.util.Optional;
/** @description: SentrySink class */
@AutoService(SeaTunnelSink.class)
@@ -72,4 +74,9 @@ public class SentrySink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
throws IOException {
return new SentrySinkWriter(seaTunnelRowType, pluginConfig);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
index 91bee53eb5..e4b6ad2766 100644
---
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
+++
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -36,6 +37,7 @@ import
org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorE
import com.google.auto.service.AutoService;
import java.io.IOException;
+import java.util.Optional;
/** Slack sink class */
@AutoService(SeaTunnelSink.class)
@@ -77,4 +79,9 @@ public class SlackSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
}
this.pluginConfig = pluginConfig;
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
index 222947f72e..87bff65a9a 100644
---
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
+++
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -36,6 +37,7 @@ import
org.apache.seatunnel.connectors.seatunnel.socket.exception.SocketConnecto
import com.google.auto.service.AutoService;
import java.io.IOException;
+import java.util.Optional;
import static
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
import static
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;
@@ -75,4 +77,9 @@ public class SocketSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
throws IOException {
return new SocketSinkWriter(sinkConfig, seaTunnelRowType);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index 35c9ed9e37..61f279fc12 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -40,10 +40,10 @@ import java.util.Optional;
public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportSaveMode {
- private SeaTunnelRowType seaTunnelRowType;
+ private final SeaTunnelRowType seaTunnelRowType;
private final SinkConfig sinkConfig;
- private DataSaveMode dataSaveMode;
- private SchemaSaveMode schemaSaveMode;
+ private final DataSaveMode dataSaveMode;
+ private final SchemaSaveMode schemaSaveMode;
private final CatalogTable catalogTable;
public StarRocksSink(
@@ -88,4 +88,9 @@ public class StarRocksSink extends
AbstractSimpleSink<SeaTunnelRow, Void>
catalogTable,
sinkConfig.getCustomSql()));
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.of(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
index 9a16e6aeb1..2656263850 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -36,6 +37,7 @@ import
org.apache.seatunnel.connectors.seatunnel.tablestore.exception.Tablestore
import com.google.auto.service.AutoService;
import java.io.IOException;
+import java.util.Optional;
import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_ID;
import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_SECRET;
@@ -87,4 +89,9 @@ public class TablestoreSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
throws IOException {
return new TablestoreWriter(tablestoreOptions, rowType);
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
index 82d1069f7c..194c94dda5 100644
---
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
+++
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -29,6 +30,7 @@ import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import com.google.auto.service.AutoService;
import java.io.IOException;
+import java.util.Optional;
@AutoService(SeaTunnelSink.class)
public class TDengineSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
@@ -55,4 +57,9 @@ public class TDengineSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
public String getPluginName() {
return "TDengine";
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return super.getWriteCatalogTable();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/sink/TypesenseSink.java
b/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/sink/TypesenseSink.java
index e52638f83e..fe4d7190c5 100644
---
a/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/sink/TypesenseSink.java
+++
b/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/sink/TypesenseSink.java
@@ -51,8 +51,8 @@ public class TypesenseSink
SupportMultiTableSink,
SupportSaveMode {
- private ReadonlyConfig config;
- private CatalogTable catalogTable;
+ private final ReadonlyConfig config;
+ private final CatalogTable catalogTable;
private final int maxBatchSize;
private final int maxRetryCount;
@@ -93,4 +93,9 @@ public class TypesenseSink
new DefaultSaveModeHandler(
schemaSaveMode, dataSaveMode, catalog, tablePath,
null, null));
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.of(catalogTable);
+ }
}
diff --git
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
index 3628a5dce6..59c4bf67f5 100644
---
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
+++
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
@@ -142,6 +142,8 @@ public class ConnectorSpecificationCheckTest {
sinkClass, "setTypeInfo",
SeaTunnelRowType.class);
Optional<Method> getConsumedType =
ReflectionUtils.getDeclaredMethod(sinkClass,
"getConsumedType");
+ Optional<Method> getWriteCatalogTable =
+ ReflectionUtils.getDeclaredMethod(sinkClass,
"getWriteCatalogTable");
Assertions.assertFalse(
prepare.isPresent(),
"Please remove `prepare` method in " +
sinkClass.getSimpleName());
@@ -151,6 +153,16 @@ public class ConnectorSpecificationCheckTest {
Assertions.assertFalse(
getConsumedType.isPresent(),
"Please remove `getConsumedType` method in " +
sinkClass.getSimpleName());
+ Assertions.assertTrue(
+ getWriteCatalogTable.isPresent(),
+ "Please implement `getWriteCatalogTable` method in "
+ + sinkClass.getSimpleName());
+ Assertions.assertEquals(
+ Optional.class,
+ getWriteCatalogTable.get().getReturnType(),
+ "The `getWriteCatalogTable` method should return
Optional<CatalogTable> in "
+ + sinkClass.getSimpleName());
+
log.info(
"Check sink connector {} successfully",
factory.getClass().getSimpleName());
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
index 9e3852fb3c..ff48999461 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
@@ -80,4 +80,9 @@ public class InMemorySink
public Optional<SaveModeHandler> getSaveModeHandler() {
return Optional.of(new InMemorySaveModeHandler(catalogTable));
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 378da4df17..8e5b15cc3d 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -47,6 +47,8 @@ import java.util.concurrent.TimeUnit;
import static io.restassured.RestAssured.given;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.CONTEXT_PATH;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;
@@ -327,6 +329,46 @@ public class RestApiIT {
+
batchJobProxy.getJobId())
.then()
.statusCode(200)
+ .body(
+ "jobDag.jobId",
+ equalTo(
+ Long.toString(
+
batchJobProxy.getJobId())))
+ .body("jobDag.pipelineEdges",
hasKey("1"))
+
.body("jobDag.pipelineEdges['1']", hasSize(1))
+ .body(
+
"jobDag.pipelineEdges['1'][0].inputVertexId",
+ equalTo("1"))
+ .body(
+
"jobDag.pipelineEdges['1'][0].targetVertexId",
+ equalTo("2"))
+ .body("jobDag.vertexInfoMap",
hasSize(2))
+ .body(
+
"jobDag.vertexInfoMap[0].vertexId",
+ equalTo(1))
+ .body(
+
"jobDag.vertexInfoMap[0].type",
+ equalTo("source"))
+ .body(
+
"jobDag.vertexInfoMap[0].vertexName",
+ equalTo(
+ "pipeline-1
[Source[0]-FakeSource]"))
+ .body(
+
"jobDag.vertexInfoMap[0].tablePaths[0]",
+ equalTo("fake"))
+ .body(
+
"jobDag.vertexInfoMap[1].vertexId",
+ equalTo(2))
+ .body(
+
"jobDag.vertexInfoMap[1].type",
+ equalTo("sink"))
+ .body(
+
"jobDag.vertexInfoMap[1].vertexName",
+ equalTo(
+ "pipeline-1
[Sink[0]-console-MultiTableSink]"))
+ .body(
+
"jobDag.vertexInfoMap[1].tablePaths[0]",
+ equalTo("fake"))
.body("jobName",
equalTo("fake_to_console"))
.body("jobStatus",
equalTo("FINISHED"));
});
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
index ecb797f80a..ee6326acbd 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
@@ -17,6 +17,10 @@
package org.apache.seatunnel.engine.core.job;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+import com.hazelcast.internal.json.JsonArray;
+import com.hazelcast.internal.json.JsonObject;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -32,4 +36,39 @@ public class JobDAGInfo implements Serializable {
Long jobId;
Map<Integer, List<Edge>> pipelineEdges;
Map<Long, VertexInfo> vertexInfoMap;
+
+ public JsonObject toJsonObject() {
+ JsonObject pipelineEdgesJsonObject = new JsonObject();
+
+ for (Map.Entry<Integer, List<Edge>> entry : pipelineEdges.entrySet()) {
+ JsonArray jsonArray = new JsonArray();
+ for (Edge edge : entry.getValue()) {
+ JsonObject edgeJsonObject = new JsonObject();
+ edgeJsonObject.add("inputVertexId",
edge.getInputVertexId().toString());
+ edgeJsonObject.add("targetVertexId",
edge.getTargetVertexId().toString());
+ jsonArray.add(edgeJsonObject);
+ }
+ pipelineEdgesJsonObject.add(entry.getKey().toString(), jsonArray);
+ }
+
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.add("jobId", jobId.toString());
+ jsonObject.add("pipelineEdges", pipelineEdgesJsonObject);
+ JsonArray vertexInfoMapString = new JsonArray();
+ for (Map.Entry<Long, VertexInfo> entry : vertexInfoMap.entrySet()) {
+ JsonObject vertexInfoJsonObj = new JsonObject();
+ VertexInfo vertexInfo = entry.getValue();
+ vertexInfoJsonObj.add("vertexId", vertexInfo.getVertexId());
+ vertexInfoJsonObj.add("type", vertexInfo.getType().getType());
+ vertexInfoJsonObj.add("vertexName", vertexInfo.getConnectorType());
+ JsonArray tablePaths = new JsonArray();
+ for (TablePath tablePath : vertexInfo.getTablePaths()) {
+ tablePaths.add(tablePath.toString());
+ }
+ vertexInfoJsonObj.add("tablePaths", tablePaths);
+ vertexInfoMapString.add(vertexInfoJsonObj);
+ }
+ jsonObject.add("vertexInfoMap", vertexInfoMapString);
+ return jsonObject;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/VertexInfo.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/VertexInfo.java
index 20602c9e83..e7e00e9ef6 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/VertexInfo.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/VertexInfo.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.core.job;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.constants.PluginType;
import lombok.AllArgsConstructor;
@@ -24,6 +25,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
+import java.util.List;
@Data
@NoArgsConstructor
@@ -35,4 +37,6 @@ public class VertexInfo implements Serializable {
private PluginType type;
private String connectorType;
+
+ private List<TablePath> tablePaths;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
index e3bba41b5f..e7b41de73d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
@@ -17,8 +17,15 @@
package org.apache.seatunnel.engine.server.dag;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.engine.common.config.EngineConfig;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.ActionUtils;
+import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
import org.apache.seatunnel.engine.core.job.Edge;
@@ -28,12 +35,17 @@ import org.apache.seatunnel.engine.core.job.VertexInfo;
import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlanGenerator;
import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
+@Slf4j
public class DAGUtils {
public static JobDAGInfo getJobDAGInfo(
@@ -69,7 +81,8 @@ public class DAGUtils {
vertex.getVertexId(),
ActionUtils.getActionType(
vertex.getAction()),
-
vertex.getAction().getName()));
+
vertex.getAction().getName(),
+
getTablePaths(vertex.getAction())));
});
});
return new JobDAGInfo(
@@ -89,7 +102,8 @@ public class DAGUtils {
new VertexInfo(
v.getVertexId(),
ActionUtils.getActionType(v.getAction()),
- v.getAction().getName()))
+ v.getAction().getName(),
+
getTablePaths(v.getAction())))
.collect(
Collectors.toMap(VertexInfo::getVertexId,
Function.identity()));
@@ -119,4 +133,41 @@ public class DAGUtils {
jobImmutableInformation.getJobId(), pipelineWithEdges,
vertexInfoMap);
}
}
+
+ private static List<TablePath> getTablePaths(Action action) {
+
+ List<TablePath> tablePaths = new ArrayList<>();
+ if (action instanceof SourceAction) {
+ SourceAction sourceAction = (SourceAction) action;
+
+ try {
+
+ List<CatalogTable> producedCatalogTables =
+ sourceAction.getSource().getProducedCatalogTables();
+ List<TablePath> sourceTablePaths =
+ producedCatalogTables.stream()
+ .map(CatalogTable::getTablePath)
+ .collect(Collectors.toList());
+ tablePaths.addAll(sourceTablePaths);
+ } catch (UnsupportedOperationException e) {
+ // ignore
+ log.warn(
+ "SourceAction {} does not support
getProducedCatalogTables, fallback to default table path",
+ action.getName());
+ tablePaths.add(TablePath.DEFAULT);
+ }
+ } else if (action instanceof SinkAction) {
+ SeaTunnelSink seaTunnelSink = ((SinkAction<?, ?, ?, ?>)
action).getSink();
+ if (seaTunnelSink instanceof MultiTableSink) {
+ List<TablePath> sinkTablePaths =
+ new ArrayList<>(((MultiTableSink)
seaTunnelSink).getSinkTables());
+ tablePaths.addAll(sinkTablePaths);
+ } else {
+ Optional<CatalogTable> catalogTable =
seaTunnelSink.getWriteCatalogTable();
+ catalogTable.ifPresent(table ->
tablePaths.add(table.getTablePath()));
+ }
+ }
+
+ return tablePaths;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 51335cc9f2..d052629f2e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -754,7 +754,7 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
DateTimeUtils.toString(
jobState.getFinishTime(),
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
- .add(RestConstant.JOB_DAG, JsonUtils.toJsonString(jobDAGInfo))
+ .add(RestConstant.JOB_DAG, jobDAGInfo.toJsonObject())
.add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
.add(RestConstant.METRICS,
toJsonObject(getJobMetrics(jobMetrics)));
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
index faa62bf54b..ce5fc74d3c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
@@ -26,7 +26,6 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.utils.DateTimeUtils;
-import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
@@ -287,7 +286,7 @@ public class BaseServlet extends HttpServlet {
DateTimeUtils.toString(
jobState.getFinishTime(),
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
- .add(RestConstant.JOB_DAG, JsonUtils.toJsonString(jobDAGInfo))
+ .add(RestConstant.JOB_DAG, jobDAGInfo.toJsonObject())
.add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
.add(RestConstant.METRICS,
toJsonObject(getJobMetrics(jobMetrics)));
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
index 79d487d50a..2b63ac8018 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
@@ -22,9 +22,13 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
@@ -47,6 +51,7 @@ import com.google.common.collect.Sets;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -78,12 +83,19 @@ public class TestUtils {
fake.setParallelism(3);
LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 3);
+ List<Column> columns = new ArrayList<>();
+ columns.add(PhysicalColumn.of("id", BasicType.INT_TYPE, 11L, 0, true,
111, ""));
+
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("default", TablePath.DEFAULT),
+ TableSchema.builder().columns(columns).build(),
+ new HashMap<>(),
+ Collections.emptyList(),
+ "fake");
+
ConsoleSink consoleSink =
- new ConsoleSink(
- new SeaTunnelRowType(
- new String[] {"id"},
- new SeaTunnelDataType<?>[]
{BasicType.INT_TYPE}),
- ReadonlyConfig.fromMap(new HashMap<>()));
+ new ConsoleSink(catalogTable, ReadonlyConfig.fromMap(new
HashMap<>()));
consoleSink.setJobContext(jobContext);
Action console =
new SinkAction<>(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
index eb7cff6db9..5714e68aa6 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
@@ -22,9 +22,13 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
@@ -49,8 +53,10 @@ import org.junit.jupiter.api.Test;
import com.google.common.collect.ImmutableMap;
import com.hazelcast.map.IMap;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
@@ -132,12 +138,19 @@ public class CheckpointPlanTest extends
AbstractSeaTunnelServerTest {
fake.setParallelism(parallelism);
LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake,
parallelism);
+ List<Column> columns = new ArrayList<>();
+ columns.add(PhysicalColumn.of("id", BasicType.INT_TYPE, 11L, 0, true,
111, ""));
+
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("default", TablePath.DEFAULT),
+ TableSchema.builder().columns(columns).build(),
+ new HashMap<>(),
+ Collections.emptyList(),
+ "fake");
+
ConsoleSink consoleSink =
- new ConsoleSink(
- new SeaTunnelRowType(
- new String[] {"id"},
- new SeaTunnelDataType<?>[]
{BasicType.INT_TYPE}),
- ReadonlyConfig.fromMap(new HashMap<>()));
+ new ConsoleSink(catalogTable, ReadonlyConfig.fromMap(new
HashMap<>()));
consoleSink.setJobContext(jobContext);
Action console =
new SinkAction<>(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index 10d426f589..25391c5c18 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -22,9 +22,13 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
@@ -55,8 +59,10 @@ import com.hazelcast.map.IMap;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.concurrent.Executors;
public class TaskTest extends AbstractSeaTunnelServerTest {
@@ -114,15 +120,22 @@ public class TaskTest extends AbstractSeaTunnelServerTest
{
Collections.emptySet());
LogicalVertex fake2Vertex = new LogicalVertex(fake2.getId(), fake2, 2);
+ List<Column> columns = new ArrayList<>();
+ columns.add(PhysicalColumn.of("id", BasicType.INT_TYPE, 11L, 0, true,
111, ""));
+
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("default", TablePath.DEFAULT),
+ TableSchema.builder().columns(columns).build(),
+ new HashMap<>(),
+ Collections.emptyList(),
+ "fake");
+
Action console =
new SinkAction<>(
idGenerator.getNextId(),
"console",
- new ConsoleSink(
- new SeaTunnelRowType(
- new String[] {"id"},
- new SeaTunnelDataType<?>[]
{BasicType.INT_TYPE}),
- ReadonlyConfig.fromMap(new HashMap<>())),
+ new ConsoleSink(catalogTable,
ReadonlyConfig.fromMap(new HashMap<>())),
Sets.newHashSet(new URL("file:///console.jar")),
Collections.emptySet());
LogicalVertex consoleVertex = new LogicalVertex(console.getId(),
console, 2);
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBuffer.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBuffer.java
index ab1ac3f00e..f234004837 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBuffer.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBuffer.java
@@ -19,9 +19,11 @@ package org.apache.seatunnel.translation.spark.sink;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import java.io.IOException;
+import java.util.Optional;
public class SeaTunnelSinkWithBuffer implements SeaTunnelSink<SeaTunnelRow,
Void, Void, Void> {
@@ -35,4 +37,9 @@ public class SeaTunnelSinkWithBuffer implements
SeaTunnelSink<SeaTunnelRow, Void
throws IOException {
return new SeaTunnelSinkWithBufferWriter();
}
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return SeaTunnelSink.super.getWriteCatalogTable();
+ }
}