This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 68ebf15cf6 [Feature][Connector-V2] Add aerospike sink connector (#8821)
68ebf15cf6 is described below
commit 68ebf15cf6eb643cab8e4b26097dcf18b429ca74
Author: zax <[email protected]>
AuthorDate: Tue Mar 25 15:03:33 2025 +0800
[Feature][Connector-V2] Add aerospike sink connector (#8821)
---
.github/workflows/labeler/label-scope-conf.yml | 5 +
NOTICE | 7 +-
.../connector-v2/changelog/connector-aerospike.md | 6 +
docs/en/connector-v2/sink/Aerospike.md | 117 +++++++++
.../connector-v2/changelog/connector-aerospike.md | 6 +
docs/zh/connector-v2/sink/Aerospike.md | 117 +++++++++
plugin-mapping.properties | 1 +
.../connector-aerospike/pom.xml | 54 ++++
.../aerospike/config/AerospikeDataType.java | 28 ++
.../aerospike/config/AerospikeSinkOptions.java | 111 ++++++++
.../seatunnel/aerospike/config/DataFormatType.java | 44 ++++
.../exception/AerospikeConnectorException.java | 32 +++
.../aerospike/exception/AerospikeErrorCode.java | 46 ++++
.../seatunnel/aerospike/sink/AerospikeSink.java | 45 ++++
.../aerospike/sink/AerospikeSinkFactory.java | 59 +++++
.../aerospike/sink/AerospikeSinkWriter.java | 255 ++++++++++++++++++
.../aerospike/sink/AerospikeTypeConverter.java | 107 ++++++++
.../seatunnel/aerospike/AerospikeFactoryTest.java | 32 +++
seatunnel-connectors-v2/pom.xml | 1 +
seatunnel-dist/pom.xml | 7 +
seatunnel-dist/release-docs/NOTICE | 7 +-
.../connector-aerospike-e2e/pom.xml | 58 +++++
.../connector/aerospike/AbstractAerospikeIT.java | 289 +++++++++++++++++++++
.../e2e/connector/aerospike/Aerospike6IT.java | 31 +++
.../aerospike/AerospikeContainerInfo.java | 41 +++
.../src/test/resources/fake_to_aerospike_sink.conf | 72 +++++
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
27 files changed, 1577 insertions(+), 2 deletions(-)
diff --git a/.github/workflows/labeler/label-scope-conf.yml
b/.github/workflows/labeler/label-scope-conf.yml
index acb6fdc924..98e4f1e944 100644
--- a/.github/workflows/labeler/label-scope-conf.yml
+++ b/.github/workflows/labeler/label-scope-conf.yml
@@ -306,3 +306,8 @@ sls:
- changed-files:
- any-glob-to-any-file: seatunnel-connectors-v2/connector-sls/**
- all-globs-to-all-files:
'!seatunnel-connectors-v2/connector-!(sls)/**'
+aerospike:
+ - all:
+ - changed-files:
+ - any-glob-to-any-file:
seatunnel-connectors-v2/connector-aerospike/**
+ - all-globs-to-all-files:
'!seatunnel-connectors-v2/connector-!(aerospike)/**'
\ No newline at end of file
diff --git a/NOTICE b/NOTICE
index 98eabc310c..43417049fa 100644
--- a/NOTICE
+++ b/NOTICE
@@ -102,4 +102,9 @@ The class
com.hazelcast.internal.util.ConcurrentReferenceHashMap contains code w
and updated within the WildFly project (https://github.com/wildfly/wildfly).
The class org.apache.calcite.linq4j.tree.ConstantExpression contains code
-originating from the Calcite project (https://github.com/apache/calcite).
\ No newline at end of file
+originating from the Calcite project (https://github.com/apache/calcite).
+
+Aerospike Sink Connector
+Copyright 2023 The original authors.
+Contains Aerospike Client Library (https://www.aerospike.com/)
+which is licensed under the AGPL 3.0 License
(https://www.aerospike.com/terms/download/3rd-party-licenses)
\ No newline at end of file
diff --git a/docs/en/connector-v2/changelog/connector-aerospike.md
b/docs/en/connector-v2/changelog/connector-aerospike.md
new file mode 100644
index 0000000000..4e65393fe9
--- /dev/null
+++ b/docs/en/connector-v2/changelog/connector-aerospike.md
@@ -0,0 +1,6 @@
+<details><summary> Change Log </summary>
+
+| Change | Commit | Version |
+|------------------------------------------| --- | --- |
+| [Improve][connector][aerospike] add support sink connector e2e doc dist
(#8821) |https://github.com/apache/seatunnel/pull/8821| dev |
+</details>
diff --git a/docs/en/connector-v2/sink/Aerospike.md
b/docs/en/connector-v2/sink/Aerospike.md
new file mode 100644
index 0000000000..fa81c96989
--- /dev/null
+++ b/docs/en/connector-v2/sink/Aerospike.md
@@ -0,0 +1,117 @@
+import ChangeLog from '../changelog/connector-aerospike.md';
+
+# Aerospike
+
+> Aerospike sink connector
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> Seatunnel Zeta<br/>
+
+## License Compatibility Notice
+
+This connector depends on Aerospike Client Library which is licensed under
AGPL 3.0.
+When using this connector, you need to comply with AGPL 3.0 license terms.
+
+## Key Features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [cdc](../../concept/connector-v2-features.md)
+
+## Description
+
+Sink connector for Aerospike database.
+
+## Supported DataSource Info
+
+| Datasource | Supported Versions | Maven
|
+|------------|-----------------|----------------------------------------------------------------------------------------|
+| Aerospike | 4.4.17+ |
[Download](https://mvnrepository.com/artifact/com.aerospike/aerospike-client) |
+
+## Data Type Mapping
+
+| SeaTunnel Data Type | Aerospike Data Type | Storage Format
|
+|---------------------|---------------------|--------------------------------------------------------------------------------|
+| STRING | STRING | Direct string storage
|
+| INT | INTEGER | 32-bit integer
|
+| BIGINT | LONG | 64-bit integer
|
+| DOUBLE | DOUBLE | 64-bit floating point
|
+| BOOLEAN | BOOLEAN | Stored as true/false values
|
+| ARRAY | BYTEARRAY | Only support byte array type
|
+| LIST | LIST | Support generic list types
|
+| DATE | LONG | Converted to epoch milliseconds
|
+| TIMESTAMP | LONG | Converted to epoch milliseconds
|
+
+Note:
+- When using ARRAY type, SeaTunnel's array elements must be byte type
+- LIST type supports any element types that can be serialized
+- DATE/TIMESTAMP conversion uses system default time zone
+
+## Options
+
+| Name | Type | Required | Default | Description
|
+|----------------|--------|----------|---------|-----------------------------------------------------------------------------|
+| host | string | Yes | - | Aerospike server hostname or
IP address |
+| port | int | No | 3000 | Aerospike server port
|
+| namespace | string | Yes | - | Namespace in Aerospike
|
+| set | string | Yes | - | Set name in Aerospike
|
+| username | string | No | - | Username for authentication
|
+| password | string | No | - | Password for authentication
|
+| key | string | Yes | - | Field name to use as
Aerospike primary key |
+| bin_name | string | No | - | Bin name for storing data
|
+| data_format | string | No | string | Data storage format:
map/string/kv |
+| write_timeout | int | No | 200 | Write operation timeout in
milliseconds |
+| schema.field | map | No | {} | Field type mappings (e.g.
{"name":"STRING","age":"INTEGER"}) |
+
+### data_format Options
+- **map**: Store data as JSON map
+- **string**: Store data as JSON string
+- **kv**: Store each field as separate bin
+
+## Task Example
+
+### Simple Example
+
+```hocon
+env {
+ parallelism = 2
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 10
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ age = "int"
+ address = "string"
+ }
+ }
+ }
+}
+
+sink {
+ Aerospike {
+ host = "localhost"
+ port = 3000
+ namespace = "test_namespace"
+ set = "user_data"
+ key = "id"
+ data_format = "map"
+ write_timeout = 300
+ schema.field = {
+ id = "INTEGER"
+ name = "STRING"
+ age = "INTEGER"
+ address = "STRING"
+ }
+ }
+}
+```
+## Changelog
+
+<ChangeLog />
\ No newline at end of file
diff --git a/docs/zh/connector-v2/changelog/connector-aerospike.md
b/docs/zh/connector-v2/changelog/connector-aerospike.md
new file mode 100644
index 0000000000..4e65393fe9
--- /dev/null
+++ b/docs/zh/connector-v2/changelog/connector-aerospike.md
@@ -0,0 +1,6 @@
+<details><summary> Change Log </summary>
+
+| Change | Commit | Version |
+|------------------------------------------| --- | --- |
+| [Improve][connector][aerospike] add support sink connector e2e doc dist
(#8821) |https://github.com/apache/seatunnel/pull/8821| dev |
+</details>
diff --git a/docs/zh/connector-v2/sink/Aerospike.md
b/docs/zh/connector-v2/sink/Aerospike.md
new file mode 100644
index 0000000000..f730a8e8eb
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Aerospike.md
@@ -0,0 +1,117 @@
+import ChangeLog from '../changelog/connector-aerospike.md';
+
+# Aerospike
+
+> Aerospike 数据写入连接器
+
+## 许可证兼容性通知
+
+此连接器依赖于根据AGPL 3.0许可的Aerospike客户端库。
+使用此连接器时,您需要遵守AGPL 3.0许可条款。
+
+## 支持引擎
+
+> Spark<br/>
+> Flink<br/>
+> Seatunnel Zeta<br/>
+
+## 主要特性
+
+- [ ] [精确一次](../../concept/connector-v2-features.md)
+- [ ] [CDC](../../concept/connector-v2-features.md)
+
+## 描述
+
+用于向 Aerospike 数据库写入数据的连接器。
+
+## 支持的数据源
+
+| 数据源 | 支持版本 | Maven 依赖
|
+|------------|---|-------------------------------------------------------------------------|
+| Aerospike | 4.4.17+ |
[下载](https://mvnrepository.com/artifact/com.aerospike/aerospike-client) |
+
+## 数据类型映射
+
+| SeaTunnel 数据类型 | Aerospike 数据类型 | 存储格式
|
+|----------------|--------------------|------------------------------------------------------------------------------|
+| STRING | STRING | 直接存储字符串
|
+| INT | INTEGER | 32位整型
|
+| BIGINT | LONG | 64位整型
|
+| DOUBLE | DOUBLE | 64位浮点数
|
+| BOOLEAN | BOOLEAN | 存储为 true/false 值
|
+| ARRAY | BYTEARRAY | 仅支持字节数组类型
|
+| LIST | LIST | 支持泛型列表类型
|
+| DATE | LONG | 转换为纪元时间毫秒数
|
+| TIMESTAMP | LONG | 转换为纪元时间毫秒数
|
+
+注意事项:
+- 使用ARRAY类型时,SeaTunnel数组元素必须是byte类型
+- LIST类型支持可序列化的任意元素类型
+- DATE/TIMESTAMP转换使用系统默认时区
+
+## 配置选项
+
+| 参数名称 | 类型 | 必填 | 默认值 | 说明
|
+|----------------|---------|------|---------|---------------------------------------------------------------------|
+| host | string | 是 | - | Aerospike 服务器主机名或IP地址
|
+| port | int | 否 | 3000 | Aerospike 服务器端口
|
+| namespace | string | 是 | - | Aerospike 命名空间
|
+| set | string | 是 | - | Aerospike 集合名称
|
+| username | string | 否 | - | 认证用户名
|
+| password | string | 否 | - | 认证密码
|
+| key | string | 是 | - | 用作 Aerospike 主键的字段名称
|
+| bin_name | string | 否 | - | 数据存储的 bin 名称
|
+| data_format | string | 否 | string | 数据存储格式:map/string/kv
|
+| write_timeout | int | 否 | 200 | 写入操作超时时间(毫秒)
|
+| schema.field | map | 否 | {} |
字段类型映射(示例:{"name":"STRING","age":"INTEGER"}) |
+
+### data_format 选项说明
+- **map**: 以JSON对象格式存储
+- **string**: 以JSON字符串格式存储
+- **kv**: 每个字段存储为独立的bin
+
+## 任务示例
+
+### 简单示例
+
+```hocon
+env {
+ parallelism = 2
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 10
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ age = "int"
+ address = "string"
+ }
+ }
+ }
+}
+
+sink {
+ Aerospike {
+ host = "localhost"
+ port = 3000
+ namespace = "test_namespace"
+ set = "user_data"
+ key = "id"
+ data_format = "map"
+ write_timeout = 300
+ schema.field = {
+ id = "INTEGER"
+ name = "STRING"
+ age = "INTEGER"
+ address = "STRING"
+ }
+ }
+}
+```
+## Changelog
+
+<ChangeLog />
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 4450d23c31..71d56193e8 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -141,6 +141,7 @@ seatunnel.sink.Sls = connector-sls
seatunnel.source.Typesense = connector-typesense
seatunnel.sink.Typesense = connector-typesense
seatunnel.source.Opengauss-CDC = connector-cdc-opengauss
+seatunnel.sink.Aerospike = connector-aerospike
seatunnel.transform.Sql = seatunnel-transforms-v2
seatunnel.transform.FieldMapper = seatunnel-transforms-v2
diff --git a/seatunnel-connectors-v2/connector-aerospike/pom.xml
b/seatunnel-connectors-v2/connector-aerospike/pom.xml
new file mode 100644
index 0000000000..5e1329765c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-aerospike/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-v2</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-aerospike</artifactId>
+ <name>SeaTunnel : Connectors V2 : Aerospike</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.aerospike</groupId>
+ <artifactId>aerospike-client</artifactId>
+ <version>4.4.17</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>2.0.33</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/config/AerospikeDataType.java
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/config/AerospikeDataType.java
new file mode 100644
index 0000000000..3880e19823
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/config/AerospikeDataType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.aerospike.config;
+
+public enum AerospikeDataType {
+ STRING,
+ INTEGER,
+ LONG,
+ DOUBLE,
+ BOOLEAN,
+ BYTEARRAY,
+ LIST
+}
diff --git
a/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/config/AerospikeSinkOptions.java
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/config/AerospikeSinkOptions.java
new file mode 100644
index 0000000000..e4a37c5106
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/config/AerospikeSinkOptions.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.aerospike.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.Getter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Getter
+public class AerospikeSinkOptions {
+ private final String host;
+ private final int port;
+ private final String namespace;
+ private final String set;
+ private final String username;
+ private final String password;
+
+ public AerospikeSinkOptions(ReadonlyConfig config) {
+ this.host = config.get(HOST);
+ this.port = config.get(PORT);
+ this.namespace = config.get(NAMESPACE);
+ this.set = config.get(SET);
+ this.username = config.get(USERNAME);
+ this.password = config.get(PASSWORD);
+ }
+
+ public static final Option<String> HOST =
+
Options.key("host").stringType().noDefaultValue().withDescription("The
aerospike host");
+
+ public static final Option<Integer> PORT =
+
Options.key("port").intType().defaultValue(3000).withDescription("The aerospike
port");
+
+ public static final Option<String> NAMESPACE =
+ Options.key("namespace")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The aerospike namespace");
+
+ public static final Option<String> SET =
+ Options.key("set")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The aerospike set name");
+
+ public static final Option<String> USERNAME =
+ Options.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The username for Aerospike");
+
+ public static final Option<String> PASSWORD =
+ Options.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The password for Aerospike");
+
+ public static final Option<String> KEY_FIELD =
+ Options.key("key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The field used as Aerospike key");
+
+ public static final Option<String> BIN_NAME =
+ Options.key("bin_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The bin name for storing data");
+
+ public static final Option<String> DATA_FORMAT =
+ Options.key("data_format")
+ .stringType()
+ .defaultValue("string")
+ .withDescription("Data format: map/string/kv");
+
+ public static final Option<Integer> WRITE_TIMEOUT =
+ Options.key("write_timeout")
+ .intType()
+ .defaultValue(200)
+ .withDescription("Write timeout in milliseconds");
+
+ public static final Option<Map<String, String>> FIELD_TYPES =
+ Options.key("schema.field")
+ .mapType()
+ .defaultValue(new HashMap<>())
+ .withDescription(
+ "Fields to be written with their Aerospike data
types. Example: \"schema\": {\n"
+ + " \"filed\": {\n"
+ + " \"name\": \"STRING\"\n"
+ + " }\n"
+ + " }");
+}
diff --git
a/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/config/DataFormatType.java
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/config/DataFormatType.java
new file mode 100644
index 0000000000..bc95bae873
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/config/DataFormatType.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.aerospike.config;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+
+@Getter
+public enum DataFormatType implements Serializable {
+ MAP("map"),
+ STRING("string"),
+ KV("kv");
+
+ private final String format;
+
+ DataFormatType(String format) {
+ this.format = format;
+ }
+
+ public static DataFormatType fromString(String format) {
+ for (DataFormatType type : DataFormatType.values()) {
+ if (type.format.equalsIgnoreCase(format)) {
+ return type;
+ }
+ }
+ throw new IllegalArgumentException("Unknown format type: " + format);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/exception/AerospikeConnectorException.java
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/exception/AerospikeConnectorException.java
new file mode 100644
index 0000000000..711cf8317e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/exception/AerospikeConnectorException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.aerospike.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class AerospikeConnectorException extends SeaTunnelRuntimeException {
+ public AerospikeConnectorException(SeaTunnelErrorCode errorCode, String
errorMessage) {
+ super(errorCode, errorMessage);
+ }
+
+ public AerospikeConnectorException(
+ SeaTunnelErrorCode errorCode, String errorMessage, Throwable
cause) {
+ super(errorCode, errorMessage, cause);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/exception/AerospikeErrorCode.java
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/exception/AerospikeErrorCode.java
new file mode 100644
index 0000000000..c24c3696dd
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/exception/AerospikeErrorCode.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.aerospike.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum AerospikeErrorCode implements SeaTunnelErrorCode {
+ UNSUPPORTED_DATA_TYPE("AEROSPIKE-01", "Unsupported data type"),
+ WRITER_OPERATION_FAILED("AEROSPIKE-02", "Writer operation failed"),
+ WRITER_CLOSE_FAILED("AEROSPIKE-03", "Writer close failed"),
+ CONNECTION_FAILED("AEROSPIKE-04", "Connection to Aerospike failed"),
+ INVALID_CONFIG("AEROSPIKE-05", "Invalid configuration");
+
+ private final String code;
+ private final String description;
+
+ AerospikeErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeSink.java
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeSink.java
new file mode 100644
index 0000000000..e3650052e2
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeSink.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.aerospike.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+public class AerospikeSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+ private final ReadonlyConfig pluginConfig;
+ private final CatalogTable catalogTable;
+
+ public AerospikeSink(ReadonlyConfig pluginConfig, CatalogTable
catalogTable) {
+ this.pluginConfig = pluginConfig;
+ this.catalogTable = catalogTable;
+ }
+
+ @Override
+ public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) {
+ return new AerospikeSinkWriter(catalogTable.getSeaTunnelRowType(),
pluginConfig);
+ }
+
+ @Override
+ public String getPluginName() {
+ return "aerospike";
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeSinkFactory.java
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeSinkFactory.java
new file mode 100644
index 0000000000..cd3f68174c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeSinkFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.aerospike.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.aerospike.config.AerospikeSinkOptions;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class AerospikeSinkFactory implements TableSinkFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return "aerospike";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(
+ AerospikeSinkOptions.HOST,
+ AerospikeSinkOptions.PORT,
+ AerospikeSinkOptions.NAMESPACE,
+ AerospikeSinkOptions.SET)
+ .optional(
+ AerospikeSinkOptions.USERNAME,
+ AerospikeSinkOptions.PASSWORD,
+ AerospikeSinkOptions.KEY_FIELD,
+ AerospikeSinkOptions.BIN_NAME,
+ AerospikeSinkOptions.DATA_FORMAT,
+ AerospikeSinkOptions.WRITE_TIMEOUT)
+ .build();
+ }
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ return () -> new AerospikeSink(context.getOptions(),
context.getCatalogTable());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeSinkWriter.java
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeSinkWriter.java
new file mode 100644
index 0000000000..ce5df6bfc1
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeSinkWriter.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.aerospike.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.aerospike.config.AerospikeDataType;
+import
org.apache.seatunnel.connectors.seatunnel.aerospike.config.AerospikeSinkOptions;
+import
org.apache.seatunnel.connectors.seatunnel.aerospike.config.DataFormatType;
+import
org.apache.seatunnel.connectors.seatunnel.aerospike.exception.AerospikeConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.aerospike.exception.AerospikeErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import com.aerospike.client.AerospikeClient;
+import com.aerospike.client.Bin;
+import com.aerospike.client.Key;
+import com.aerospike.client.policy.ClientPolicy;
+import com.aerospike.client.policy.RecordExistsAction;
+import com.aerospike.client.policy.WritePolicy;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.TypeReference;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeParseException;
+import java.time.temporal.TemporalAccessor;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class AerospikeSinkWriter extends AbstractSinkWriter<SeaTunnelRow,
Void> {
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final ReadonlyConfig config;
+ private final SerializationSchema serializationSchema;
+ private final AerospikeClient aerospikeClient;
+ private final WritePolicy writePolicy;
+ private final AerospikeTypeConverter typeConverter;
+
+ public AerospikeSinkWriter(SeaTunnelRowType seaTunnelRowType,
ReadonlyConfig config) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.config = config;
+ this.serializationSchema = new
JsonSerializationSchema(seaTunnelRowType);
+ this.aerospikeClient = buildClient();
+
+ this.writePolicy = new WritePolicy();
+ this.writePolicy.recordExistsAction = RecordExistsAction.UPDATE;
+ this.writePolicy.totalTimeout =
config.get(AerospikeSinkOptions.WRITE_TIMEOUT);
+ this.writePolicy.socketTimeout =
config.get(AerospikeSinkOptions.WRITE_TIMEOUT);
+ this.writePolicy.sleepBetweenRetries = 0;
+ this.writePolicy.maxRetries = 0;
+ this.typeConverter = new AerospikeTypeConverter(seaTunnelRowType,
config);
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ try {
+ String data = new String(serializationSchema.serialize(element));
+ String keyField = config.get(AerospikeSinkOptions.KEY_FIELD);
+ String key =
element.getField(seaTunnelRowType.indexOf(keyField)).toString();
+
+ Key aerospikeKey =
+ new Key(
+ config.get(AerospikeSinkOptions.NAMESPACE),
+ config.get(AerospikeSinkOptions.SET),
+ key);
+
+ String formatValue =
config.get(AerospikeSinkOptions.DATA_FORMAT).toLowerCase();
+ DataFormatType formatType = DataFormatType.fromString(formatValue);
+
+ switch (formatType) {
+ case MAP:
+ Map<String, Object> dataMap =
+ JSON.parseObject(data, new
TypeReference<Map<String, Object>>() {});
+ Map<String, Object> filteredMap = new HashMap<>();
+ for (String fieldName : typeConverter.getFieldNames()) {
+ filteredMap.put(fieldName, dataMap.get(fieldName));
+ }
+ Map<String, Object> convertedMap = new HashMap<>();
+ for (Map.Entry<String, Object> entry :
filteredMap.entrySet()) {
+ String fieldName = entry.getKey();
+ Object value = entry.getValue();
+ AerospikeDataType dataType =
typeConverter.getFieldType(fieldName);
+ convertedMap.put(fieldName, convertValue(value,
dataType));
+ }
+ Bin dataBin = new
Bin(config.get(AerospikeSinkOptions.BIN_NAME), convertedMap);
+ aerospikeClient.put(writePolicy, aerospikeKey, dataBin);
+ break;
+
+ case STRING:
+ Map<String, Object> filteredDataMap = new HashMap<>();
+ for (String fieldName : typeConverter.getFieldNames()) {
+ int index = seaTunnelRowType.indexOf(fieldName);
+ filteredDataMap.put(fieldName,
element.getField(index));
+ }
+ String filteredData = JSON.toJSONString(filteredDataMap);
+ Bin stringBin =
+ new Bin(config.get(AerospikeSinkOptions.BIN_NAME),
filteredData);
+ aerospikeClient.put(writePolicy, aerospikeKey, stringBin);
+ break;
+
+ case KV:
+ Map<String, Object> fieldsMap =
+ JSON.parseObject(data, new
TypeReference<Map<String, Object>>() {});
+ List<Bin> bins = new ArrayList<>();
+ Map<String, String> configFieldTypes =
+ config.get(AerospikeSinkOptions.FIELD_TYPES);
+ for (String fieldName : configFieldTypes.keySet()) {
+ Object value = fieldsMap.get(fieldName);
+ AerospikeDataType dataType =
typeConverter.getFieldType(fieldName);
+ Object convertedValue = convertValue(value, dataType);
+ bins.add(new Bin(fieldName, convertedValue));
+ }
+ aerospikeClient.put(writePolicy, aerospikeKey,
bins.toArray(new Bin[0]));
+ break;
+
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported data format type: " + formatType);
+ }
+ } catch (Exception e) {
+ throw new AerospikeConnectorException(
+ AerospikeErrorCode.WRITER_OPERATION_FAILED, "Failed to
write record", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (Objects.nonNull(aerospikeClient)) {
+ aerospikeClient.close();
+ }
+ } catch (Exception e) {
+ throw new AerospikeConnectorException(
+ AerospikeErrorCode.WRITER_CLOSE_FAILED, "Failed to close
writer", e);
+ }
+ }
+
+ private AerospikeClient buildClient() {
+ ClientPolicy clientPolicy = new ClientPolicy();
+ clientPolicy.user = config.get(AerospikeSinkOptions.USERNAME);
+ clientPolicy.password = config.get(AerospikeSinkOptions.PASSWORD);
+ clientPolicy.timeout = config.get(AerospikeSinkOptions.WRITE_TIMEOUT);
+ clientPolicy.maxConnsPerNode = 300;
+
+ return new AerospikeClient(
+ clientPolicy,
+ config.get(AerospikeSinkOptions.HOST),
+ config.get(AerospikeSinkOptions.PORT));
+ }
+
+ private Object convertValue(Object value, AerospikeDataType dataType) {
+ if (value == null) {
+ return null;
+ }
+
+ switch (dataType) {
+ case STRING:
+ return value.toString();
+ case INTEGER:
+ if (value instanceof Number) {
+ return ((Number) value).intValue();
+ }
+ return Integer.parseInt(value.toString());
+ case LONG:
+ if (value instanceof Number) {
+ return ((Number) value).longValue();
+ } else if (value instanceof TemporalAccessor) {
+ return convertTimestampToLong(value);
+ } else if (value instanceof String) {
+ Optional<Long> timestamp = tryParseDateTime((String)
value);
+ return timestamp.orElseGet(() -> Long.parseLong((String)
value));
+ } else {
+ return Long.parseLong(value.toString());
+ }
+ case DOUBLE:
+ if (value instanceof Number) {
+ return ((Number) value).doubleValue();
+ }
+ return Double.parseDouble(value.toString());
+ case BOOLEAN:
+ if (value instanceof Boolean) {
+ return value;
+ }
+ return Boolean.parseBoolean(value.toString());
+ case BYTEARRAY:
+ if (value.getClass().isArray()) {
+ return value;
+ }
+ throw new IllegalArgumentException(
+ "Expected Array type but got: " + value.getClass());
+ case LIST:
+ if (value instanceof Iterable) {
+ return value;
+ }
+ throw new IllegalArgumentException(
+ "Expected List type but got: " + value.getClass());
+ default:
+ throw new IllegalArgumentException("Unsupported AEROSPIKE data
type: " + dataType);
+ }
+ }
+
+ private long parseDateTimeString(String datetime) {
+ try {
+ return LocalDateTime.parse(datetime)
+ .atZone(ZoneId.systemDefault())
+ .toInstant()
+ .toEpochMilli();
+ } catch (DateTimeParseException e) {
+ try {
+ return Instant.parse(datetime).toEpochMilli();
+ } catch (DateTimeParseException ex) {
+ throw new IllegalArgumentException("Unsupported datetime
format: " + datetime);
+ }
+ }
+ }
+
+ private Optional<Long> tryParseDateTime(String datetime) {
+ try {
+ return Optional.of(parseDateTimeString(datetime));
+ } catch (DateTimeParseException e) {
+ return Optional.empty();
+ }
+ }
+
+ private long convertTimestampToLong(Object timestamp) {
+ if (timestamp instanceof TemporalAccessor) {
+ Instant instant = Instant.from((TemporalAccessor) timestamp);
+ return instant.toEpochMilli();
+ }
+ throw new IllegalArgumentException("Unsupported timestamp type: " +
timestamp.getClass());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeTypeConverter.java
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeTypeConverter.java
new file mode 100644
index 0000000000..eb0c3476ac
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeTypeConverter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.aerospike.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.aerospike.config.AerospikeDataType;
+import
org.apache.seatunnel.connectors.seatunnel.aerospike.config.AerospikeSinkOptions;
+import
org.apache.seatunnel.connectors.seatunnel.aerospike.exception.AerospikeConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.aerospike.exception.AerospikeErrorCode;
+
+import lombok.Getter;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AerospikeTypeConverter {
+
+ private final Map<String, AerospikeDataType> fieldTypeMapping;
+ @Getter private final List<String> fieldNames;
+
+ public AerospikeTypeConverter(SeaTunnelRowType rowType, ReadonlyConfig
config) {
+ this.fieldTypeMapping = new HashMap<>();
+ Map<String, String> configFieldTypes =
config.get(AerospikeSinkOptions.FIELD_TYPES);
+
+ if (configFieldTypes == null || configFieldTypes.isEmpty()) {
+ String[] allFields = rowType.getFieldNames();
+ this.fieldNames = Arrays.asList(allFields);
+ for (String field : allFields) {
+ int index = rowType.indexOf(field);
+ SeaTunnelDataType<?> seaTunnelType =
rowType.getFieldType(index);
+ fieldTypeMapping.put(field, mapSeaTunnelType(seaTunnelType));
+ }
+ } else {
+ this.fieldNames = new ArrayList<>(configFieldTypes.keySet());
+ for (String fieldName : configFieldTypes.keySet()) {
+ int index = rowType.indexOf(fieldName);
+ if (index == -1) {
+ throw new AerospikeConnectorException(
+ AerospikeErrorCode.INVALID_CONFIG,
+ "Field '" + fieldName + "' not found in source
data");
+ }
+ fieldTypeMapping.put(
+ fieldName,
AerospikeDataType.valueOf(configFieldTypes.get(fieldName)));
+ }
+ }
+ }
+
+ private AerospikeDataType mapSeaTunnelType(SeaTunnelDataType<?>
seaTunnelType) {
+ switch (seaTunnelType.getSqlType()) {
+ case STRING:
+ return AerospikeDataType.STRING;
+ case INT:
+ return AerospikeDataType.INTEGER;
+ case BIGINT:
+ return AerospikeDataType.LONG;
+ case DOUBLE:
+ return AerospikeDataType.DOUBLE;
+ case BOOLEAN:
+ return AerospikeDataType.BOOLEAN;
+ case ARRAY:
+ if (!(seaTunnelType instanceof ArrayType)) {
+ throw new AerospikeConnectorException(
+ AerospikeErrorCode.UNSUPPORTED_DATA_TYPE,
+ "Invalid ARRAY type: " +
seaTunnelType.getClass().getSimpleName());
+ }
+ return AerospikeDataType.BYTEARRAY;
+ case DATE:
+ case TIMESTAMP:
+ return AerospikeDataType.LONG;
+ default:
+ throw new AerospikeConnectorException(
+ AerospikeErrorCode.UNSUPPORTED_DATA_TYPE,
+ "Unsupported SeaTunnel type: " +
seaTunnelType.getSqlType());
+ }
+ }
+
+ public AerospikeDataType getFieldType(String fieldName) {
+ AerospikeDataType type = fieldTypeMapping.get(fieldName);
+ if (type == null) {
+ throw new AerospikeConnectorException(
+ AerospikeErrorCode.UNSUPPORTED_DATA_TYPE,
+ "No type mapping for field: " + fieldName);
+ }
+ return type;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-aerospike/src/test/java/org/apache/seatunnel/connectors/seatunnel/aerospike/AerospikeFactoryTest.java
b/seatunnel-connectors-v2/connector-aerospike/src/test/java/org/apache/seatunnel/connectors/seatunnel/aerospike/AerospikeFactoryTest.java
new file mode 100644
index 0000000000..46b49bf084
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-aerospike/src/test/java/org/apache/seatunnel/connectors/seatunnel/aerospike/AerospikeFactoryTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.aerospike;
+
+import
org.apache.seatunnel.connectors.seatunnel.aerospike.sink.AerospikeSinkFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class AerospikeFactoryTest {
+
+ @Test
+ void optionRule() {
+ Assertions.assertNotNull((new AerospikeSinkFactory()).optionRule());
+ Assertions.assertNotNull((new AerospikeSinkFactory()).optionRule());
+ }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index f686098f40..ecefdcc1cd 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -83,6 +83,7 @@
<module>connector-qdrant</module>
<module>connector-sls</module>
<module>connector-typesense</module>
+ <module>connector-aerospike</module>
</modules>
<dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 54ce25e1b1..d635d2ea4e 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -961,6 +961,13 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-aerospike</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop-aws</artifactId>
diff --git a/seatunnel-dist/release-docs/NOTICE
b/seatunnel-dist/release-docs/NOTICE
index a2c7980dcf..0884ac9429 100644
--- a/seatunnel-dist/release-docs/NOTICE
+++ b/seatunnel-dist/release-docs/NOTICE
@@ -823,4 +823,9 @@ Ocelli project by Netflix Inc.
(https://github.com/Netflix/ocelli/).
=========================================================================
-
+
+===============================================================================
+ + Third-party dependencies for Aerospike connector:
+
+===============================================================================
+ + com.aerospike:aerospike-client
(https://github.com/aerospike/aerospike-client-java)
+ + Copyright 2012-2023 Aerospike, Inc.
+ + Licensed under the Apache License, Version 2.0
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/pom.xml
new file mode 100644
index 0000000000..813d95e6ae
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
+ <version>${revision}</version>
+ </parent>
+ <artifactId>connector-aerospike-e2e</artifactId>
+ <name>SeaTunnel : E2E : Connector V2 : Aerospike</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>com.aerospike</groupId>
+ <artifactId>aerospike-client</artifactId>
+ <version>6.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-fake</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-file-local</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-aerospike</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/java/org/apache/seatunnel/e2e/connector/aerospike/AbstractAerospikeIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/java/org/apache/seatunnel/e2e/connector/aerospike/AbstractAerospikeIT.java
new file mode 100644
index 0000000000..93b0410f1d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/java/org/apache/seatunnel/e2e/connector/aerospike/AbstractAerospikeIT.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.aerospike;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.aerospike.client.AerospikeClient;
+import com.aerospike.client.Bin;
+import com.aerospike.client.Host;
+import com.aerospike.client.Key;
+import com.aerospike.client.Record;
+import com.aerospike.client.policy.ClientPolicy;
+import com.aerospike.client.policy.ScanPolicy;
+import com.aerospike.client.policy.WritePolicy;
+import com.alibaba.fastjson.JSON;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class AbstractAerospikeIT extends TestSuiteBase implements
TestResource {
+
+ protected static final String NAMESPACE = "test";
+ protected static final String SET_NAME = "seatunnel";
+ private static final int AEROSPIKE_PORT = 3000;
+ private static final String AEROSPIKE_HOST = "aerospike-host";
+
+ protected AerospikeClient client;
+ protected GenericContainer<?> container;
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ container =
+ new GenericContainer<>(getDockerImage())
+ .withExposedPorts(3000, 3001, 3002, 3003)
+ .withNetworkAliases(AEROSPIKE_HOST)
+ .withNetwork(NETWORK)
+ .withEnv("AEROSPIKE_NAMESPACE", NAMESPACE)
+ .withEnv("AEROSPIKE_MEM_GB", "1")
+ .withEnv("AEROSPIKE_ACCESS_ADDRESS", AEROSPIKE_HOST)
+ .withEnv("AEROSPIKE_ALTERNATE_ACCESS_ADDRESS",
AEROSPIKE_HOST)
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+
DockerLoggerFactory.getLogger(getDockerImageName())))
+ .waitingFor(
+ Wait.forLogMessage(".*service ready:
soon.*\\n", 1)
+
.withStartupTimeout(Duration.ofMinutes(3)))
+ .withCreateContainerCmdModifier(cmd ->
cmd.withHostName(AEROSPIKE_HOST));
+
+ container.start();
+
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ ClientPolicy policy = new ClientPolicy();
+ policy.timeout = 30000;
+ policy.failIfNotConnected = true;
+ policy.readPolicyDefault.maxRetries = 10;
+ policy.writePolicyDefault.maxRetries = 10;
+
+ Host[] hosts =
+ new Host[] {new Host(container.getHost(),
container.getMappedPort(AEROSPIKE_PORT))};
+
+ client = new AerospikeClient(policy, hosts);
+
+ // Verify connection
+ if (!client.isConnected()) {
+ throw new IllegalStateException("Failed to connect to Aerospike
server");
+ }
+ }
+
+ private void insertTestData() {
+ WritePolicy writePolicy = new WritePolicy();
+ for (int i = 0; i < 100; i++) {
+ Key key = new Key(NAMESPACE, SET_NAME, "seed_" + i);
+ Bin bin1 = new Bin("id", i);
+ Bin bin2 = new Bin("data", "seed-data-" + i);
+ client.put(writePolicy, key, bin1, bin2);
+ }
+ }
+
+ @TestTemplate
+ public void testAerospikeSink(TestContainer container) throws Exception {
+ Container.ExecResult execResult =
container.executeJob("/fake_to_aerospike_sink.conf");
+ validateSinkData();
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @TestTemplate
+ public void testWriteToAerospike(TestContainer container) throws Exception
{
+ final String testKey = "multi_type_key";
+ Key key = new Key(NAMESPACE, SET_NAME, testKey);
+ Map<String, Object> complexData =
+ new HashMap<String, Object>() {
+ {
+ put("string_val", "seatunnel_test");
+ put("int_val", 2023);
+ put("double_val", 3.1415926);
+ put("bool_val", true);
+ put("long_val", 10000000000L);
+ put("byte_val", new byte[] {0x01, 0x02});
+ final List<String> places = new ArrayList<>();
+ places.add("a");
+ put("array_val", places);
+ put(
+ "nested_map",
+ new HashMap<String, Object>() {
+ {
+ put("child_str", "nested_value");
+ put("child_int", 456);
+ }
+ });
+ }
+ };
+
+ Bin mainBin = new Bin("complex_data", complexData);
+ Bin extraBin1 = new Bin("reported", 20240601);
+ Bin extraBin2 = new Bin("version", "v2.3.1");
+
+ client.put(null, key, mainBin, extraBin1, extraBin2);
+
+ Record record = client.get(null, key);
+ Assertions.assertNotNull(record, "write records should not be empty");
+ Assertions.assertEquals(3, record.bins.size(), "failed to verify the
bin quantity");
+ }
+
+ @TestTemplate
+ public void testReadFromAerospike(TestContainer container) throws
Exception {
+ testWriteToAerospike(container);
+ final String testKey = "multi_type_key";
+ Key key = new Key(NAMESPACE, SET_NAME, testKey);
+
+ Record record = client.get(null, key);
+ Assertions.assertNotNull(record, "no data of the specified key was
queried");
+
+ Assertions.assertEquals(20240601, ((Number)
record.bins.get("reported")).intValue());
+ Assertions.assertEquals("v2.3.1", record.bins.get("version"));
+
+ Map<String, Object> data = (Map<String, Object>)
record.bins.get("complex_data");
+
+ Assertions.assertEquals("seatunnel_test", data.get("string_val"));
+ Assertions.assertEquals(2023, ((Number)
data.get("int_val")).intValue());
+ Assertions.assertEquals(3.1415926, (Double) data.get("double_val"),
0.0001);
+ Assertions.assertEquals(true, data.get("bool_val"));
+ Assertions.assertEquals(10000000000L, data.get("long_val"));
+
+ Assertions.assertArrayEquals(new byte[] {0x01, 0x02}, (byte[])
data.get("byte_val"));
+
+ List<String> array = (List<String>) data.get("array_val");
+ Assertions.assertEquals("a", array.get(0));
+
+ Map<String, Object> nested = (Map<String, Object>)
data.get("nested_map");
+ Assertions.assertEquals("nested_value", nested.get("child_str"));
+ Assertions.assertEquals(456, ((Number)
nested.get("child_int")).intValue());
+ }
+
+ @TestTemplate
+ public void testUpdateData(TestContainer container) throws Exception {
+ final String testKey = "update_test_key";
+ Map<String, Object> initialData = new HashMap<>();
+ initialData.put("version", 1L);
+ initialData.put("status", "active");
+ client.put(null, new Key(NAMESPACE, SET_NAME, testKey), new
Bin("data", initialData));
+ Map<String, Object> updateData = new HashMap<>();
+ updateData.put("version", 2L);
+ updateData.put("status", "inactive");
+ updateData.put("modified_time", System.currentTimeMillis());
+ client.put(null, new Key(NAMESPACE, SET_NAME, testKey), new
Bin("data", updateData));
+
+ Record record = client.get(null, new Key(NAMESPACE, SET_NAME,
testKey));
+ Assertions.assertEquals(updateData, record.bins.get("data"), "the data
update failed");
+ }
+
+ @TestTemplate
+ public void testQueryByKey(TestContainer container) throws Exception {
+ final int testKey = 1234;
+ Map<String, Object> testData = new HashMap<>();
+ testData.put("id", 1001L);
+ testData.put(
+ "nested",
+ new HashMap<String, Object>() {
+ {
+ put("field1", "value1");
+ put("field2", 3.14);
+ }
+ });
+ client.put(null, new Key(NAMESPACE, SET_NAME, testKey), new
Bin("data", testData));
+
+ Record result = client.get(null, new Key(NAMESPACE, SET_NAME,
testKey));
+
+ Assertions.assertNotNull(result, "no data of the specified key was
queried");
+ Assertions.assertEquals(
+ testData, result.bins.get("data"), "the query result data is
inconsistent");
+
+ Map<String, Object> resultData = (Map<String, Object>)
result.bins.get("data");
+ Map<String, Object> nested = (Map<String, Object>)
resultData.get("nested");
+ Assertions.assertTrue(
+ nested.get("field2") instanceof Double, "nested field type is
incorrect");
+ }
+
+ @TestTemplate
+ public void testDeleteAll(TestContainer container) throws Exception {
+ final String tempSet = "temp_delete_set";
+
+ for (int i = 0; i < 5; i++) {
+ Key key = new Key(NAMESPACE, tempSet, "key_" + i);
+ client.put(null, key, new Bin("data", "test_value_" + i));
+ }
+
+ Assertions.assertDoesNotThrow(
+ () -> {
+ client.scanAll(
+ null,
+ NAMESPACE,
+ tempSet,
+ (key, record) -> {
+ client.delete(null, key);
+ });
+ },
+ "the delete operation throws an exception");
+
+ AtomicInteger count = new AtomicInteger();
+ client.scanAll(null, NAMESPACE, tempSet, (key, record) ->
count.incrementAndGet());
+ Assertions.assertEquals(0, count.get(), "data deletion is not
complete");
+ }
+
+ private void validateSinkData() {
+ ScanPolicy scanPolicy = new ScanPolicy();
+
+ client.scanAll(
+ scanPolicy,
+ NAMESPACE,
+ SET_NAME,
+ (key, record) -> {
+ System.out.println("key: " + key.toString());
+ System.out.println("record: " + JSON.toJSONString(record));
+ });
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() {
+ if (client != null) {
+ client.close();
+ }
+ if (container != null) {
+ container.stop();
+ }
+ }
+
+ abstract DockerImageName getDockerImage();
+
+ abstract String getDockerImageName();
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/java/org/apache/seatunnel/e2e/connector/aerospike/Aerospike6IT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/java/org/apache/seatunnel/e2e/connector/aerospike/Aerospike6IT.java
new file mode 100644
index 0000000000..cbbb9ce5d9
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/java/org/apache/seatunnel/e2e/connector/aerospike/Aerospike6IT.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.e2e.connector.aerospike;
+
+import org.testcontainers.utility.DockerImageName;
+
+public class Aerospike6IT extends AbstractAerospikeIT {
+ @Override
+ DockerImageName getDockerImage() {
+ return DockerImageName.parse("aerospike/aerospike-server:latest");
+ }
+
+ @Override
+ String getDockerImageName() {
+ return "aerospike6-e2e";
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/java/org/apache/seatunnel/e2e/connector/aerospike/AerospikeContainerInfo.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/java/org/apache/seatunnel/e2e/connector/aerospike/AerospikeContainerInfo.java
new file mode 100644
index 0000000000..7108137ba1
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/java/org/apache/seatunnel/e2e/connector/aerospike/AerospikeContainerInfo.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.e2e.connector.aerospike;
+
+public class AerospikeContainerInfo {
+ private final String host;
+ private final int port;
+ private final String image;
+
+ public AerospikeContainerInfo(String host, int port, String image) {
+ this.host = host;
+ this.port = port;
+ this.image = image;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public String getImage() {
+ return image;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/resources/fake_to_aerospike_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/resources/fake_to_aerospike_sink.conf
new file mode 100644
index 0000000000..78a0e7f782
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/resources/fake_to_aerospike_sink.conf
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = BATCH
+
+ #spark config
+ spark.app.name = SeaTunnel
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = 1g
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ row.num = 9
+ string.fake.mode = "template"
+ string.template = ["tyrantlucifer", "hailin", "kris", "fanjia", "zongwen",
"gaojun"]
+ int.fake.mode = "template"
+ int.template = [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
+ double.fake.mode = "template"
+ double.template = [44.0, 45.0, 46.0, 47.0]
+ timestamp.fake.mode = "template"
+ timestamp.template = ["2022-01-01 00:00:00", "2022-01-01 00:00:01",
"2022-01-01 00:00:02", "2022-01-01 00:00:03"]
+ schema {
+ fields {
+ c_id = int
+ c_name = string
+ c_money = double
+ c_birth = timestamp
+ }
+ }
+ }
+}
+
+sink {
+ Aerospike {
+ bin_name = "data",
+ schema = {
+ field = {
+ c_id = INTEGER
+ c_name = STRING
+ c_money = DOUBLE
+ c_birth = LONG
+ }
+ },
+ username="",
+ password="",
+ set = "seatunnel",
+ port = 3000
+ data_format = "string",
+ host = "aerospike-host",
+ namespace = "test",
+ key = "c_id"
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index f3008889df..adf120deef 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -83,6 +83,7 @@
<module>connector-email-e2e</module>
<module>connector-cdc-opengauss-e2e</module>
<module>connector-cdc-tidb-e2e</module>
+ <module>connector-aerospike-e2e</module>
</modules>
<dependencies>