This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 048c47d966 [Improve][Sls] Add sls sink connector、e2e、doc (#7830)
048c47d966 is described below
commit 048c47d966f274293d510bbec737375a41a57e66
Author: GumKey <[email protected]>
AuthorDate: Mon Oct 14 17:26:09 2024 +0800
[Improve][Sls] Add sls sink connector、e2e、doc (#7830)
Co-authored-by: XenosK <[email protected]>
---
docs/en/connector-v2/sink/Sls.md | 84 ++++++++++++++++
docs/zh/connector-v2/sink/Sls.md | 84 ++++++++++++++++
plugin-mapping.properties | 1 +
seatunnel-connectors-v2/connector-sls/pom.xml | 5 +
.../connectors/seatunnel/sls/config/Config.java | 18 ++++
.../serialization/SeatunnelRowSerialization.java | 46 +++++++++
.../connectors/seatunnel/sls/sink/SlsSink.java | 53 ++++++++++
.../seatunnel/sls/sink/SlsSinkCommitter.java} | 22 ++--
.../seatunnel/sls/sink/SlsSinkFactory.java | 56 +++++++++++
.../seatunnel/sls/sink/SlsSinkWriter.java | 112 +++++++++++++++++++++
.../seatunnel/sls/source/SlsConsumerThread.java | 2 +-
.../seatunnel/sls/source/SlsSourceReader.java | 2 +-
.../sls/state/SlsAggregatedCommitInfo.java} | 18 ++--
.../seatunnel/sls/state/SlsCommitInfo.java} | 17 ++--
.../seatunnel/sls/state/SlsSinkState.java} | 17 ++--
.../connectors/seatunnel/sls/SlsFactoryTest.java | 2 +
.../apache/seatunnel/e2e/connector/sls/SlsIT.java | 7 ++
.../src/test/resources/sls_sink_to_console.conf | 56 +++++++++++
18 files changed, 563 insertions(+), 39 deletions(-)
diff --git a/docs/en/connector-v2/sink/Sls.md b/docs/en/connector-v2/sink/Sls.md
new file mode 100644
index 0000000000..487786548d
--- /dev/null
+++ b/docs/en/connector-v2/sink/Sls.md
@@ -0,0 +1,84 @@
+# Sls
+
+> Sls sink connector
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> Seatunnel Zeta<br/>
+
+## Key Features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [cdc](../../concept/connector-v2-features.md)
+
+## Description
+
+Sink connector for Aliyun Sls.
+
+## Supported DataSource Info
+
+In order to use the Sls connector, the following dependencies are required.
+They can be downloaded via install-plugin.sh or from the Maven central
repository.
+
+| Datasource | Supported Versions | Maven
|
+|------------|--------------------|-----------------------------------------------------------------------------------|
+| Sls | Universal |
[Download](https://mvnrepository.com/artifact/org.apache.seatunnel/connector-sls)
|
+
+## Source Options
+
+| Name | Type | Required | Default
| Description
|
+|-------------------------------------|---------|----------|------------------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| project | String | Yes | -
| [Aliyun Sls
Project](https://help.aliyun.com/zh/sls/user-guide/manage-a-project?spm=a2c4g.11186623.0.0.6f9755ebyfaYSl)
|
+| logstore | String | Yes | -
| [Aliyun Sls
Logstore](https://help.aliyun.com/zh/sls/user-guide/manage-a-logstore?spm=a2c4g.11186623.0.0.13137c08nfuiBC)
|
+| endpoint | String | Yes | -
| [Aliyun Access
Endpoint](https://help.aliyun.com/zh/sls/developer-reference/api-sls-2020-12-30-endpoint?spm=a2c4g.11186623.0.0.548945a8UyJULa)
|
+| access_key_id | String | Yes | -
| [Aliyun AccessKey
ID](https://help.aliyun.com/zh/ram/user-guide/create-an-accesskey-pair?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#task-2245479)
|
+| access_key_secret | String | Yes | -
| [Aliyun AccessKey
Secret](https://help.aliyun.com/zh/ram/user-guide/create-an-accesskey-pair?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#task-2245479)
|
+| source | String | No | SeaTunnel-Source
| Data Source marking in sls
|
+| topic | String | No | SeaTunnel-Topic
| Data topic marking in sls
|
+
+## Task Example
+
+### Simple
+
+> This example write data to the sls's logstore1.And if you have not yet
installed and deployed SeaTunnel, you need to follow the instructions in
Install SeaTunnel to install and deploy SeaTunnel. And if you have not yet
installed and deployed SeaTunnel, you need to follow the instructions in
[Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy
SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel
Engine](../../start-v2/locally/quick-start- [...]
+
+[Create RAM user and
authorization](https://help.aliyun.com/zh/sls/create-a-ram-user-and-authorize-the-ram-user-to-access-log-service?spm=a2c4g.11186623.0.i4),Please
ensure thr ram user have sufficient rights to perform, reference [RAM Custom
Authorization
Example](https://help.aliyun.com/zh/sls/use-custom-policies-to-grant-permissions-to-a-ram-user?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#reference-s3z-m1l-z2b)
+
+```hocon
+# Defining the runtime environment
+env {
+ parallelism = 2
+ job.mode = "STREAMING"
+ checkpoint.interval = 30000
+}
+source {
+ FakeSource {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
+ schema = {
+ fields = {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "string"
+ }
+ }
+ }
+}
+
+sink {
+ Sls {
+ endpoint = "cn-hangzhou-intranet.log.aliyuncs.com"
+ project = "project1"
+ logstore = "logstore1"
+ access_key_id = "xxxxxxxxxxxxxxxxxxxxxxxx"
+ access_key_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
+ }
+}
+```
+
diff --git a/docs/zh/connector-v2/sink/Sls.md b/docs/zh/connector-v2/sink/Sls.md
new file mode 100644
index 0000000000..94e4f3c07a
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Sls.md
@@ -0,0 +1,84 @@
+# Sls
+
+> Sls sink connector
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> Seatunnel Zeta<br/>
+
+## 主要特性
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [cdc](../../concept/connector-v2-features.md)
+
+## 描述
+
+Sink connector for Aliyun Sls.
+
+从写入数据到阿里云Sls日志服务
+
+为了使用Sls连接器,需要以下依赖关系。
+它们可以通过install-plugin.sh或Maven中央存储库下载。
+
+| Datasource | Supported Versions | Maven
|
+|------------|--------------------|-----------------------------------------------------------------------------------|
+| Sls | Universal |
[Download](https://mvnrepository.com/artifact/org.apache.seatunnel/connector-sls)
|
+
+## 支持的数据源信息
+
+| Name | Type | Required | Default
| Description
|
+|-------------------------------------|----------|----------|-------------------|------------------------------------------------------------------------------------------------------------------------------------|
+| project | String | Yes | -
| [阿里云 Sls
项目](https://help.aliyun.com/zh/sls/user-guide/manage-a-project?spm=a2c4g.11186623.0.0.6f9755ebyfaYSl)
|
+| logstore | String | Yes | -
| [阿里云 Sls
日志库](https://help.aliyun.com/zh/sls/user-guide/manage-a-logstore?spm=a2c4g.11186623.0.0.13137c08nfuiBC)
|
+| endpoint | String | Yes | -
|
[阿里云访问服务点](https://help.aliyun.com/zh/sls/developer-reference/api-sls-2020-12-30-endpoint?spm=a2c4g.11186623.0.0.548945a8UyJULa)
|
+| access_key_id | String | Yes | -
|
[阿里云访问用户ID](https://help.aliyun.com/zh/ram/user-guide/create-an-accesskey-pair?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#task-2245479)
|
+| access_key_secret | String | Yes | -
|
[阿里云访问用户密码](https://help.aliyun.com/zh/ram/user-guide/create-an-accesskey-pair?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#task-2245479)
|
+| source | String | No | SeaTunnel-Source
| 在sls中数据来源标记
|
+| topic | String | No | SeaTunnel-Topic
| 在sls中数据主题标记
|
+
+## 任务示例
+
+### 简单示例
+
+>
此示例写入sls的logstore1的数据。如果您尚未安装和部署SeaTunnel,则需要按照安装SeaTunnel中的说明安装和部署SeaTunnel。然后按照[快速启动SeaTunnel引擎](../../Start-v2/locale/Quick-Start
SeaTunnel Engine.md)中的说明运行此作业。
+
+[创建RAM用户及授权](https://help.aliyun.com/zh/sls/create-a-ram-user-and-authorize-the-ram-user-to-access-log-service?spm=a2c4g.11186623.0.i4),
请确认RAM用户有足够的权限来读取及管理数据,参考:[RAM自定义授权示例](https://help.aliyun.com/zh/sls/use-custom-policies-to-grant-permissions-to-a-ram-user?spm=a2c4g.11186623.0.0.4a6e4e554CKhSc#reference-s3z-m1l-z2b)
+
+```hocon
+# Defining the runtime environment
+env {
+ parallelism = 2
+ job.mode = "STREAMING"
+ checkpoint.interval = 30000
+}
+source {
+ FakeSource {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
+ schema = {
+ fields = {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "string"
+ }
+ }
+ }
+}
+
+sink {
+ Sls {
+ endpoint = "cn-hangzhou-intranet.log.aliyuncs.com"
+ project = "project1"
+ logstore = "logstore1"
+ access_key_id = "xxxxxxxxxxxxxxxxxxxxxxxx"
+ access_key_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
+ }
+}
+```
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 3673ade48f..86c95bc3e2 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -149,4 +149,5 @@ seatunnel.transform.Copy = seatunnel-transforms-v2
seatunnel.transform.DynamicCompile = seatunnel-transforms-v2
seatunnel.transform.LLM = seatunnel-transforms-v2
seatunnel.transform.Embedding = seatunnel-transforms-v2
+seatunnel.sink.Sls = connector-sls
diff --git a/seatunnel-connectors-v2/connector-sls/pom.xml
b/seatunnel-connectors-v2/connector-sls/pom.xml
index dd47dd0864..b7b7293347 100644
--- a/seatunnel-connectors-v2/connector-sls/pom.xml
+++ b/seatunnel-connectors-v2/connector-sls/pom.xml
@@ -49,5 +49,10 @@
<artifactId>seatunnel-format-text</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/config/Config.java
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/config/Config.java
index 46917b8b84..d279b10c7d 100644
---
a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/config/Config.java
+++
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/config/Config.java
@@ -79,4 +79,22 @@ public class Config {
.defaultValue(-1L)
.withDescription(
"The interval for dynamically discovering topics
and partitions.");
+
+ public static final Option<String> SOURCE =
+ Options.key("source")
+ .stringType()
+ .defaultValue("SeaTunnel-Source")
+ .withDescription("Aliyun sls producer source");
+
+ public static final Option<String> TOPIC =
+ Options.key("topic")
+ .stringType()
+ .defaultValue("SeaTunnel-Topic")
+ .withDescription("Aliyun sls producer topic");
+
+ public static final Option<Integer> LOG_GROUP_SIZE =
+ Options.key("log_group_size")
+ .intType()
+ .defaultValue(100)
+ .withDescription("Aliyun sls log group write size");
}
diff --git
a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/serialization/SeatunnelRowSerialization.java
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/serialization/SeatunnelRowSerialization.java
new file mode 100644
index 0000000000..a9308d89c6
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/serialization/SeatunnelRowSerialization.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sls.serialization;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import com.aliyun.openservices.log.common.LogContent;
+import com.aliyun.openservices.log.common.LogItem;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SeatunnelRowSerialization {
+ JsonSerializationSchema jsonSerializationSchema;
+
+ public SeatunnelRowSerialization(SeaTunnelRowType rowType) {
+ this.jsonSerializationSchema = new JsonSerializationSchema(rowType);
+ }
+
+ public List<LogItem> serializeRow(SeaTunnelRow row) {
+ List<LogItem> logGroup = new ArrayList<LogItem>();
+ LogItem logItem = new LogItem();
+ String rowJson = new String(jsonSerializationSchema.serialize(row));
+ LogContent content = new LogContent("content", rowJson);
+ logItem.PushBack(content);
+ logGroup.add(logItem);
+ return logGroup;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSink.java
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSink.java
new file mode 100644
index 0000000000..cad767e635
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSink.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sls.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.sls.state.SlsAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsSinkState;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class SlsSink
+ implements SeaTunnelSink<
+ SeaTunnelRow, SlsSinkState, SlsCommitInfo,
SlsAggregatedCommitInfo> {
+ private final ReadonlyConfig pluginConfig;
+ private final SeaTunnelRowType seaTunnelRowType;
+
+ public SlsSink(ReadonlyConfig pluginConfig, SeaTunnelRowType rowType) {
+ this.pluginConfig = pluginConfig;
+ this.seaTunnelRowType = rowType;
+ }
+
+ @Override
+ public String getPluginName() {
+ return
org.apache.seatunnel.connectors.seatunnel.sls.config.Config.CONNECTOR_IDENTITY;
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, SlsCommitInfo, SlsSinkState> createWriter(
+ SinkWriter.Context context) throws IOException {
+ return new SlsSinkWriter(context, seaTunnelRowType, pluginConfig,
Collections.emptyList());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkCommitter.java
similarity index 58%
copy from
seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java
copy to
seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkCommitter.java
index 1d7c2ab2da..18ea3d582d 100644
---
a/seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkCommitter.java
@@ -15,17 +15,21 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.sls;
+package org.apache.seatunnel.connectors.seatunnel.sls.sink;
-import org.apache.seatunnel.connectors.seatunnel.sls.source.SlsSourceFactory;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsCommitInfo;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import java.io.IOException;
+import java.util.List;
-public class SlsFactoryTest {
-
- @Test
- void optionRule() {
- Assertions.assertNotNull((new SlsSourceFactory()).optionRule());
+public class SlsSinkCommitter implements SinkCommitter<SlsCommitInfo> {
+ @Override
+ public List<SlsCommitInfo> commit(List<SlsCommitInfo> commitInfos) throws
IOException {
+ // nothing to do, when write function, data had sended
+ return null;
}
+
+ @Override
+ public void abort(List<SlsCommitInfo> commitInfos) throws IOException {}
}
diff --git
a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkFactory.java
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkFactory.java
new file mode 100644
index 0000000000..82b0d59898
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sls.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.sls.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class SlsSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return Config.CONNECTOR_IDENTITY;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(
+ Config.ENDPOINT,
+ Config.PROJECT,
+ Config.LOGSTORE,
+ Config.ACCESS_KEY_ID,
+ Config.ACCESS_KEY_SECRET)
+ .optional(Config.SOURCE, Config.TOPIC)
+ .build();
+ }
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ return () ->
+ new SlsSink(
+ context.getOptions(),
+
context.getCatalogTable().getTableSchema().toPhysicalRowDataType());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkWriter.java
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkWriter.java
new file mode 100644
index 0000000000..4de6efc64d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/sink/SlsSinkWriter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.sls.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.sls.serialization.SeatunnelRowSerialization;
+import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsSinkState;
+
+import com.aliyun.openservices.log.Client;
+import com.aliyun.openservices.log.common.LogItem;
+import com.aliyun.openservices.log.request.PutLogsRequest;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import static
org.apache.seatunnel.connectors.seatunnel.sls.config.Config.ACCESS_KEY_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.sls.config.Config.ACCESS_KEY_SECRET;
+import static
org.apache.seatunnel.connectors.seatunnel.sls.config.Config.ENDPOINT;
+import static
org.apache.seatunnel.connectors.seatunnel.sls.config.Config.LOGSTORE;
+import static
org.apache.seatunnel.connectors.seatunnel.sls.config.Config.LOG_GROUP_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.sls.config.Config.PROJECT;
+import static
org.apache.seatunnel.connectors.seatunnel.sls.config.Config.SOURCE;
+import static
org.apache.seatunnel.connectors.seatunnel.sls.config.Config.TOPIC;
+
+@Slf4j
+public class SlsSinkWriter implements SinkWriter<SeaTunnelRow, SlsCommitInfo,
SlsSinkState> {
+
+ private final Client client;
+ private final String project;
+ private final String logStore;
+ private final String topic;
+ private final String source;
+ private final Integer logGroupSize;
+ private final SinkWriter.Context context;
+ private final List<SlsSinkState> slsStates;
+ private final SeatunnelRowSerialization seatunnelRowSerialization;
+
+ public SlsSinkWriter(
+ SinkWriter.Context context,
+ SeaTunnelRowType seaTunnelRowType,
+ ReadonlyConfig pluginConfig,
+ List<SlsSinkState> slsStates) {
+
+ this.client =
+ new Client(
+ pluginConfig.get(ENDPOINT),
+ pluginConfig.get(ACCESS_KEY_ID),
+ pluginConfig.get(ACCESS_KEY_SECRET));
+ this.project = pluginConfig.get(PROJECT);
+ this.logStore = pluginConfig.get(LOGSTORE);
+ this.topic = pluginConfig.get(TOPIC);
+ this.source = pluginConfig.get(SOURCE);
+ this.logGroupSize = pluginConfig.get(LOG_GROUP_SIZE);
+ this.context = context;
+ this.slsStates = slsStates;
+ this.seatunnelRowSerialization = new
SeatunnelRowSerialization(seaTunnelRowType);
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ List<LogItem> data =
this.seatunnelRowSerialization.serializeRow(element);
+ PutLogsRequest plr = new PutLogsRequest(project, logStore, topic,
source, data);
+ try {
+ this.client.PutLogs(plr);
+ } catch (Throwable e) {
+ log.error("write logs failed", e);
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public Optional<SlsCommitInfo> prepareCommit() throws IOException {
+ // nothing to do, when write function, data had sended
+ return Optional.empty();
+ }
+
+ @Override
+ public void abortPrepare() {}
+
+ @Override
+ public List<SlsSinkState> snapshotState(long checkpointId) {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.client.shutdown();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsConsumerThread.java
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsConsumerThread.java
index 7a2b9f65ba..9816f7d7d6 100644
---
a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsConsumerThread.java
+++
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsConsumerThread.java
@@ -55,7 +55,7 @@ public class SlsConsumerThread implements Runnable {
} finally {
try {
if (client != null) {
- /** now do nothine, do not need close */
+ client.shutdown();
}
} catch (Throwable t) {
throw new RuntimeException(t);
diff --git
a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java
index 819b3b07d6..9b4ed8308d 100644
---
a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java
+++
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java
@@ -163,7 +163,7 @@ public class SlsSourceReader implements
SourceReader<SeaTunnelRow, SlsSourceSpli
return
sourceSplits.stream().map(SlsSourceSplit::copy).collect(Collectors.toList());
}
- // 接受
+ // received splits and do somethins for this
@Override
public void addSplits(List<SlsSourceSplit> splits) {
running = true;
diff --git
a/seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsAggregatedCommitInfo.java
similarity index 69%
copy from
seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java
copy to
seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsAggregatedCommitInfo.java
index 1d7c2ab2da..eac7d7946f 100644
---
a/seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsAggregatedCommitInfo.java
@@ -15,17 +15,15 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.sls;
+package org.apache.seatunnel.connectors.seatunnel.sls.state;
-import org.apache.seatunnel.connectors.seatunnel.sls.source.SlsSourceFactory;
+import lombok.AllArgsConstructor;
+import lombok.Data;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import java.util.List;
-public class SlsFactoryTest {
-
- @Test
- void optionRule() {
- Assertions.assertNotNull((new SlsSourceFactory()).optionRule());
- }
+@Data
+@AllArgsConstructor
+public class SlsAggregatedCommitInfo {
+ List<SlsCommitInfo> commitInfos;
}
diff --git
a/seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsCommitInfo.java
similarity index 69%
copy from
seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java
copy to
seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsCommitInfo.java
index 1d7c2ab2da..f378950d46 100644
---
a/seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsCommitInfo.java
@@ -15,17 +15,16 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.sls;
+package org.apache.seatunnel.connectors.seatunnel.sls.state;
-import org.apache.seatunnel.connectors.seatunnel.sls.source.SlsSourceFactory;
+import lombok.AllArgsConstructor;
+import lombok.Data;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import java.io.Serializable;
-public class SlsFactoryTest {
+@Data
+@AllArgsConstructor
+public class SlsCommitInfo implements Serializable {
- @Test
- void optionRule() {
- Assertions.assertNotNull((new SlsSourceFactory()).optionRule());
- }
+ private final String data;
}
diff --git
a/seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsSinkState.java
similarity index 69%
copy from
seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java
copy to
seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsSinkState.java
index 1d7c2ab2da..6d1aaf1aac 100644
---
a/seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/state/SlsSinkState.java
@@ -15,17 +15,16 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.sls;
+package org.apache.seatunnel.connectors.seatunnel.sls.state;
-import org.apache.seatunnel.connectors.seatunnel.sls.source.SlsSourceFactory;
+import lombok.AllArgsConstructor;
+import lombok.Data;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import java.io.Serializable;
-public class SlsFactoryTest {
+@Data
+@AllArgsConstructor
+public class SlsSinkState implements Serializable {
- @Test
- void optionRule() {
- Assertions.assertNotNull((new SlsSourceFactory()).optionRule());
- }
+ private final String data;
}
diff --git
a/seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java
b/seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java
index 1d7c2ab2da..84780c77dd 100644
---
a/seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-sls/src/test/java/org/apache/seatunnel/connectors/seatunnel/sls/SlsFactoryTest.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.sls;
+import org.apache.seatunnel.connectors.seatunnel.sls.sink.SlsSinkFactory;
import org.apache.seatunnel.connectors.seatunnel.sls.source.SlsSourceFactory;
import org.junit.jupiter.api.Assertions;
@@ -27,5 +28,6 @@ public class SlsFactoryTest {
@Test
void optionRule() {
Assertions.assertNotNull((new SlsSourceFactory()).optionRule());
+ Assertions.assertNotNull((new SlsSinkFactory()).optionRule());
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/java/org/apache/seatunnel/e2e/connector/sls/SlsIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/java/org/apache/seatunnel/e2e/connector/sls/SlsIT.java
index 07d368ec8c..6a7d52515f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/java/org/apache/seatunnel/e2e/connector/sls/SlsIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/java/org/apache/seatunnel/e2e/connector/sls/SlsIT.java
@@ -44,6 +44,13 @@ public class SlsIT extends TestSuiteBase implements
TestResource {
@Override
public void tearDown() throws Exception {}
+ @TestTemplate
+ public void testSlsStreamingSink(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult1 =
container.executeJob("/sls_sink_to_console.conf");
+ Assertions.assertEquals(0, execResult1.getExitCode(),
execResult1.getStderr());
+ }
+
@TestTemplate
public void testSlsStreamingSource(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/resources/sls_sink_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/resources/sls_sink_to_console.conf
new file mode 100644
index 0000000000..2b3a280459
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-sls-e2e/src/test/resources/sls_sink_to_console.conf
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
+ schema = {
+ fields = {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "string"
+ }
+ }
+ }
+}
+
+sink {
+ Sls {
+ endpoint = "xxxxxx"
+ project = "xxxxxx"
+ logstore = "xxxxxx"
+ access_key_id = "xxxxxx"
+ access_key_secret = "xxxxxxx"
+ }
+}
\ No newline at end of file