This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 68ebf15cf6 [Feature][Connector-V2] Add aerospike sink connector (#8821)
68ebf15cf6 is described below

commit 68ebf15cf6eb643cab8e4b26097dcf18b429ca74
Author: zax <[email protected]>
AuthorDate: Tue Mar 25 15:03:33 2025 +0800

    [Feature][Connector-V2] Add aerospike sink connector (#8821)
---
 .github/workflows/labeler/label-scope-conf.yml     |   5 +
 NOTICE                                             |   7 +-
 .../connector-v2/changelog/connector-aerospike.md  |   6 +
 docs/en/connector-v2/sink/Aerospike.md             | 117 +++++++++
 .../connector-v2/changelog/connector-aerospike.md  |   6 +
 docs/zh/connector-v2/sink/Aerospike.md             | 117 +++++++++
 plugin-mapping.properties                          |   1 +
 .../connector-aerospike/pom.xml                    |  54 ++++
 .../aerospike/config/AerospikeDataType.java        |  28 ++
 .../aerospike/config/AerospikeSinkOptions.java     | 111 ++++++++
 .../seatunnel/aerospike/config/DataFormatType.java |  44 ++++
 .../exception/AerospikeConnectorException.java     |  32 +++
 .../aerospike/exception/AerospikeErrorCode.java    |  46 ++++
 .../seatunnel/aerospike/sink/AerospikeSink.java    |  45 ++++
 .../aerospike/sink/AerospikeSinkFactory.java       |  59 +++++
 .../aerospike/sink/AerospikeSinkWriter.java        | 255 ++++++++++++++++++
 .../aerospike/sink/AerospikeTypeConverter.java     | 107 ++++++++
 .../seatunnel/aerospike/AerospikeFactoryTest.java  |  32 +++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 seatunnel-dist/pom.xml                             |   7 +
 seatunnel-dist/release-docs/NOTICE                 |   7 +-
 .../connector-aerospike-e2e/pom.xml                |  58 +++++
 .../connector/aerospike/AbstractAerospikeIT.java   | 289 +++++++++++++++++++++
 .../e2e/connector/aerospike/Aerospike6IT.java      |  31 +++
 .../aerospike/AerospikeContainerInfo.java          |  41 +++
 .../src/test/resources/fake_to_aerospike_sink.conf |  72 +++++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 27 files changed, 1577 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/labeler/label-scope-conf.yml 
b/.github/workflows/labeler/label-scope-conf.yml
index acb6fdc924..98e4f1e944 100644
--- a/.github/workflows/labeler/label-scope-conf.yml
+++ b/.github/workflows/labeler/label-scope-conf.yml
@@ -306,3 +306,8 @@ sls:
       - changed-files:
           - any-glob-to-any-file: seatunnel-connectors-v2/connector-sls/**
           - all-globs-to-all-files: 
'!seatunnel-connectors-v2/connector-!(sls)/**'
+aerospike:
+  - all:
+      - changed-files:
+          - any-glob-to-any-file: 
seatunnel-connectors-v2/connector-aerospike/**
+          - all-globs-to-all-files: 
'!seatunnel-connectors-v2/connector-!(aerospike)/**'
\ No newline at end of file
diff --git a/NOTICE b/NOTICE
index 98eabc310c..43417049fa 100644
--- a/NOTICE
+++ b/NOTICE
@@ -102,4 +102,9 @@ The class 
com.hazelcast.internal.util.ConcurrentReferenceHashMap contains code w
 and updated within the WildFly project (https://github.com/wildfly/wildfly).
 
 The class org.apache.calcite.linq4j.tree.ConstantExpression contains code
-originating from the Calcite project (https://github.com/apache/calcite).
\ No newline at end of file
+originating from the Calcite project (https://github.com/apache/calcite).
+
+Aerospike Sink Connector
+Copyright 2023 The original authors.
+Contains Aerospike Client Library (https://www.aerospike.com/)
+which is licensed under the AGPL 3.0 License 
(https://www.aerospike.com/terms/download/3rd-party-licenses)
\ No newline at end of file
diff --git a/docs/en/connector-v2/changelog/connector-aerospike.md 
b/docs/en/connector-v2/changelog/connector-aerospike.md
new file mode 100644
index 0000000000..4e65393fe9
--- /dev/null
+++ b/docs/en/connector-v2/changelog/connector-aerospike.md
@@ -0,0 +1,6 @@
+<details><summary> Change Log </summary>
+
+| Change                                   | Commit | Version |
+|------------------------------------------| --- | --- |
+| [Improve][connector][aerospike] add support sink connector e2e doc dist 
(#8821) |https://github.com/apache/seatunnel/pull/8821| dev |
+</details>
diff --git a/docs/en/connector-v2/sink/Aerospike.md 
b/docs/en/connector-v2/sink/Aerospike.md
new file mode 100644
index 0000000000..fa81c96989
--- /dev/null
+++ b/docs/en/connector-v2/sink/Aerospike.md
@@ -0,0 +1,117 @@
+import ChangeLog from '../changelog/connector-aerospike.md';
+
+# Aerospike
+
+> Aerospike sink connector
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> Seatunnel Zeta<br/>
+
+## License Compatibility Notice
+
+This connector depends on Aerospike Client Library which is licensed under 
AGPL 3.0.                                                                       
                                                                         
+When using this connector, you need to comply with AGPL 3.0 license terms.
+
+## Key Features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [cdc](../../concept/connector-v2-features.md)
+
+## Description
+
+Sink connector for Aerospike database.
+
+## Supported DataSource Info
+
+| Datasource | Supported Versions | Maven                                      
                                            |
+|------------|-----------------|----------------------------------------------------------------------------------------|
+| Aerospike  | 4.4.17+               | 
[Download](https://mvnrepository.com/artifact/com.aerospike/aerospike-client) |
+
+## Data Type Mapping
+
+| SeaTunnel Data Type | Aerospike Data Type | Storage Format                   
                                              |
+|---------------------|---------------------|--------------------------------------------------------------------------------|
+| STRING              | STRING              | Direct string storage            
                                             |
+| INT                 | INTEGER             | 32-bit integer                   
                                             |
+| BIGINT              | LONG                | 64-bit integer                   
                                             |
+| DOUBLE              | DOUBLE              | 64-bit floating point            
                                             |
+| BOOLEAN             | BOOLEAN             | Stored as true/false values      
                                             |
+| ARRAY               | BYTEARRAY           | Only support byte array type     
                                             |
+| LIST                | LIST                | Support generic list types       
                                            |
+| DATE                | LONG                | Converted to epoch milliseconds  
                                            |
+| TIMESTAMP           | LONG                | Converted to epoch milliseconds  
                                            |
+
+Note:
+- When using ARRAY type, SeaTunnel's array elements must be byte type
+- LIST type supports any element types that can be serialized
+- DATE/TIMESTAMP conversion uses system default time zone
+
+## Options
+
+| Name           | Type   | Required | Default | Description                   
                                              |
+|----------------|--------|----------|---------|-----------------------------------------------------------------------------|
+| host           | string | Yes      | -       | Aerospike server hostname or 
IP address                                     |
+| port           | int    | No       | 3000    | Aerospike server port         
                                              |
+| namespace      | string | Yes      | -       | Namespace in Aerospike        
                                              |
+| set            | string | Yes      | -       | Set name in Aerospike         
                                              |
+| username       | string | No       | -       | Username for authentication   
                                             |
+| password       | string | No       | -       | Password for authentication   
                                             |
+| key            | string | Yes      | -       | Field name to use as 
Aerospike primary key                                 |
+| bin_name       | string | No       | -       | Bin name for storing data     
                                             |
+| data_format    | string | No       | string  | Data storage format: 
map/string/kv                                         |
+| write_timeout  | int    | No       | 200     | Write operation timeout in 
milliseconds                                    |
+| schema.field   | map    | No       | {}      | Field type mappings (e.g. 
{"name":"STRING","age":"INTEGER"})               |
+
+### data_format Options
+- **map**: Store data as JSON map
+- **string**: Store data as JSON string
+- **kv**: Store each field as separate bin
+
+## Task Example
+
+### Simple Example
+
+```hocon
+env {
+  parallelism = 2
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    row.num = 10
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+        age = "int"
+        address = "string"
+      }
+    }
+  }
+}
+
+sink {
+  Aerospike {
+    host = "localhost"
+    port = 3000
+    namespace = "test_namespace"
+    set = "user_data"
+    key = "id"
+    data_format = "map"
+    write_timeout = 300
+    schema.field = {
+      id = "INTEGER"
+      name = "STRING"
+      age = "INTEGER"
+      address = "STRING"
+    }
+  }
+}
+```
+## Changelog
+
+<ChangeLog />
\ No newline at end of file
diff --git a/docs/zh/connector-v2/changelog/connector-aerospike.md 
b/docs/zh/connector-v2/changelog/connector-aerospike.md
new file mode 100644
index 0000000000..4e65393fe9
--- /dev/null
+++ b/docs/zh/connector-v2/changelog/connector-aerospike.md
@@ -0,0 +1,6 @@
+<details><summary> Change Log </summary>
+
+| Change                                   | Commit | Version |
+|------------------------------------------| --- | --- |
+| [Improve][connector][aerospike] add support sink connector e2e doc dist 
(#8821) |https://github.com/apache/seatunnel/pull/8821| dev |
+</details>
diff --git a/docs/zh/connector-v2/sink/Aerospike.md 
b/docs/zh/connector-v2/sink/Aerospike.md
new file mode 100644
index 0000000000..f730a8e8eb
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Aerospike.md
@@ -0,0 +1,117 @@
+import ChangeLog from '../changelog/connector-aerospike.md';
+
+# Aerospike
+
+> Aerospike 数据写入连接器
+
+## 许可证兼容性通知
+
+此连接器依赖于根据AGPL 3.0许可的Aerospike客户端库。
+使用此连接器时,您需要遵守AGPL 3.0许可条款。
+
+## 支持引擎
+
+> Spark<br/>
+> Flink<br/>
+> Seatunnel Zeta<br/>
+
+## 主要特性
+
+- [ ] [精确一次](../../concept/connector-v2-features.md)
+- [ ] [CDC](../../concept/connector-v2-features.md)
+
+## 描述
+
+用于向 Aerospike 数据库写入数据的连接器。
+
+## 支持的数据源
+
+|   数据源    | 支持版本 | Maven 依赖                                                   
           |
+|------------|---|-------------------------------------------------------------------------|
+| Aerospike  | 4.4.17+ | 
[下载](https://mvnrepository.com/artifact/com.aerospike/aerospike-client) |
+
+## 数据类型映射
+
+| SeaTunnel 数据类型 | Aerospike 数据类型 | 存储格式                                       
                                |
+|----------------|--------------------|------------------------------------------------------------------------------|
+| STRING         | STRING             | 直接存储字符串                                
                               |
+| INT            | INTEGER            | 32位整型                                  
                                   |
+| BIGINT         | LONG               | 64位整型                                  
                                   |
+| DOUBLE         | DOUBLE             | 64位浮点数                                 
                                  |
+| BOOLEAN        | BOOLEAN            | 存储为 true/false 值                       
                                  |
+| ARRAY          | BYTEARRAY          | 仅支持字节数组类型                              
                             |
+| LIST           | LIST               | 支持泛型列表类型                               
                              |
+| DATE           | LONG               | 转换为纪元时间毫秒数                             
                           |
+| TIMESTAMP      | LONG               | 转换为纪元时间毫秒数                             
                           |
+
+注意事项:
+- 使用ARRAY类型时,SeaTunnel数组元素必须是byte类型
+- LIST类型支持可序列化的任意元素类型
+- DATE/TIMESTAMP转换使用系统默认时区
+
+## 配置选项
+
+| 参数名称        | 类型    | 必填 | 默认值  | 说明                                         
                        |
+|----------------|---------|------|---------|---------------------------------------------------------------------|
+| host           | string  | 是   | -       | Aerospike 服务器主机名或IP地址             
                         |
+| port           | int     | 否   | 3000    | Aerospike 服务器端口                   
                             |
+| namespace      | string  | 是   | -       | Aerospike 命名空间                    
                              |
+| set            | string  | 是   | -       | Aerospike 集合名称                    
                              |
+| username       | string  | 否   | -       | 认证用户名                             
                             |
+| password       | string  | 否   | -       | 认证密码                              
                              |
+| key            | string  | 是   | -       | 用作 Aerospike 主键的字段名称              
                         |
+| bin_name       | string  | 否   | -       | 数据存储的 bin 名称                      
                           |
+| data_format    | string  | 否   | string  | 数据存储格式:map/string/kv              
                           |
+| write_timeout  | int     | 否   | 200     | 写入操作超时时间(毫秒)                      
                      |
+| schema.field   | map     | 否   | {}      | 
字段类型映射(示例:{"name":"STRING","age":"INTEGER"})             |
+
+### data_format 选项说明
+- **map**: 以JSON对象格式存储
+- **string**: 以JSON字符串格式存储
+- **kv**: 每个字段存储为独立的bin
+
+## 任务示例
+
+### 简单示例
+
+```hocon
+env {
+  parallelism = 2
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    row.num = 10
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+        age = "int"
+        address = "string"
+      }
+    }
+  }
+}
+
+sink {
+  Aerospike {
+    host = "localhost"
+    port = 3000
+    namespace = "test_namespace"
+    set = "user_data"
+    key = "id"
+    data_format = "map"
+    write_timeout = 300
+    schema.field = {
+      id = "INTEGER"
+      name = "STRING"
+      age = "INTEGER"
+      address = "STRING"
+    }
+  }
+}
+```
+## Changelog
+
+<ChangeLog />
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 4450d23c31..71d56193e8 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -141,6 +141,7 @@ seatunnel.sink.Sls = connector-sls
 seatunnel.source.Typesense = connector-typesense
 seatunnel.sink.Typesense = connector-typesense
 seatunnel.source.Opengauss-CDC = connector-cdc-opengauss
+seatunnel.sink.Aerospike = connector-aerospike
 
 seatunnel.transform.Sql = seatunnel-transforms-v2
 seatunnel.transform.FieldMapper = seatunnel-transforms-v2
diff --git a/seatunnel-connectors-v2/connector-aerospike/pom.xml 
b/seatunnel-connectors-v2/connector-aerospike/pom.xml
new file mode 100644
index 0000000000..5e1329765c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-aerospike/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connectors-v2</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-aerospike</artifactId>
+    <name>SeaTunnel : Connectors V2 : Aerospike</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aerospike</groupId>
+            <artifactId>aerospike-client</artifactId>
+            <version>4.4.17</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>2.0.33</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/config/AerospikeDataType.java
 
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/config/AerospikeDataType.java
new file mode 100644
index 0000000000..3880e19823
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/config/AerospikeDataType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.aerospike.config;
+
+public enum AerospikeDataType {
+    STRING,
+    INTEGER,
+    LONG,
+    DOUBLE,
+    BOOLEAN,
+    BYTEARRAY,
+    LIST
+}
diff --git 
a/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/config/AerospikeSinkOptions.java
 
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/config/AerospikeSinkOptions.java
new file mode 100644
index 0000000000..e4a37c5106
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/config/AerospikeSinkOptions.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.aerospike.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.Getter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Getter
+public class AerospikeSinkOptions {
+    private final String host;
+    private final int port;
+    private final String namespace;
+    private final String set;
+    private final String username;
+    private final String password;
+
+    public AerospikeSinkOptions(ReadonlyConfig config) {
+        this.host = config.get(HOST);
+        this.port = config.get(PORT);
+        this.namespace = config.get(NAMESPACE);
+        this.set = config.get(SET);
+        this.username = config.get(USERNAME);
+        this.password = config.get(PASSWORD);
+    }
+
+    public static final Option<String> HOST =
+            
Options.key("host").stringType().noDefaultValue().withDescription("The 
aerospike host");
+
+    public static final Option<Integer> PORT =
+            
Options.key("port").intType().defaultValue(3000).withDescription("The aerospike 
port");
+
+    public static final Option<String> NAMESPACE =
+            Options.key("namespace")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The aerospike namespace");
+
+    public static final Option<String> SET =
+            Options.key("set")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The aerospike set name");
+
+    public static final Option<String> USERNAME =
+            Options.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The username for Aerospike");
+
+    public static final Option<String> PASSWORD =
+            Options.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The password for Aerospike");
+
+    public static final Option<String> KEY_FIELD =
+            Options.key("key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The field used as Aerospike key");
+
+    public static final Option<String> BIN_NAME =
+            Options.key("bin_name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The bin name for storing data");
+
+    public static final Option<String> DATA_FORMAT =
+            Options.key("data_format")
+                    .stringType()
+                    .defaultValue("string")
+                    .withDescription("Data format: map/string/kv");
+
+    public static final Option<Integer> WRITE_TIMEOUT =
+            Options.key("write_timeout")
+                    .intType()
+                    .defaultValue(200)
+                    .withDescription("Write timeout in milliseconds");
+
+    public static final Option<Map<String, String>> FIELD_TYPES =
+            Options.key("schema.field")
+                    .mapType()
+                    .defaultValue(new HashMap<>())
+                    .withDescription(
+                            "Fields to be written with their Aerospike data 
types. Example:  \"schema\": {\n"
+                                    + "        \"filed\": {\n"
+                                    + "          \"name\": \"STRING\"\n"
+                                    + "        }\n"
+                                    + "      }");
+}
diff --git 
a/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/config/DataFormatType.java
 
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/config/DataFormatType.java
new file mode 100644
index 0000000000..bc95bae873
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/config/DataFormatType.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.aerospike.config;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+
+@Getter
+public enum DataFormatType implements Serializable {
+    MAP("map"),
+    STRING("string"),
+    KV("kv");
+
+    private final String format;
+
+    DataFormatType(String format) {
+        this.format = format;
+    }
+
+    public static DataFormatType fromString(String format) {
+        for (DataFormatType type : DataFormatType.values()) {
+            if (type.format.equalsIgnoreCase(format)) {
+                return type;
+            }
+        }
+        throw new IllegalArgumentException("Unknown format type: " + format);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/exception/AerospikeConnectorException.java
 
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/exception/AerospikeConnectorException.java
new file mode 100644
index 0000000000..711cf8317e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/exception/AerospikeConnectorException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.aerospike.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class AerospikeConnectorException extends SeaTunnelRuntimeException {
+    public AerospikeConnectorException(SeaTunnelErrorCode errorCode, String 
errorMessage) {
+        super(errorCode, errorMessage);
+    }
+
+    public AerospikeConnectorException(
+            SeaTunnelErrorCode errorCode, String errorMessage, Throwable 
cause) {
+        super(errorCode, errorMessage, cause);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/exception/AerospikeErrorCode.java
 
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/exception/AerospikeErrorCode.java
new file mode 100644
index 0000000000..c24c3696dd
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/exception/AerospikeErrorCode.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.aerospike.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum AerospikeErrorCode implements SeaTunnelErrorCode {
+    UNSUPPORTED_DATA_TYPE("AEROSPIKE-01", "Unsupported data type"),
+    WRITER_OPERATION_FAILED("AEROSPIKE-02", "Writer operation failed"),
+    WRITER_CLOSE_FAILED("AEROSPIKE-03", "Writer close failed"),
+    CONNECTION_FAILED("AEROSPIKE-04", "Connection to Aerospike failed"),
+    INVALID_CONFIG("AEROSPIKE-05", "Invalid configuration");
+
+    private final String code;
+    private final String description;
+
+    AerospikeErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return this.code;
+    }
+
+    @Override
+    public String getDescription() {
+        return this.description;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeSink.java
 
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeSink.java
new file mode 100644
index 0000000000..e3650052e2
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeSink.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.aerospike.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+public class AerospikeSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+    private final ReadonlyConfig pluginConfig;
+    private final CatalogTable catalogTable;
+
+    public AerospikeSink(ReadonlyConfig pluginConfig, CatalogTable 
catalogTable) {
+        this.pluginConfig = pluginConfig;
+        this.catalogTable = catalogTable;
+    }
+
+    @Override
+    public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) {
+        return new AerospikeSinkWriter(catalogTable.getSeaTunnelRowType(), 
pluginConfig);
+    }
+
+    @Override
+    public String getPluginName() {
+        return "aerospike";
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeSinkFactory.java
 
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeSinkFactory.java
new file mode 100644
index 0000000000..cd3f68174c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeSinkFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.aerospike.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.aerospike.config.AerospikeSinkOptions;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class AerospikeSinkFactory implements TableSinkFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return "aerospike";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(
+                        AerospikeSinkOptions.HOST,
+                        AerospikeSinkOptions.PORT,
+                        AerospikeSinkOptions.NAMESPACE,
+                        AerospikeSinkOptions.SET)
+                .optional(
+                        AerospikeSinkOptions.USERNAME,
+                        AerospikeSinkOptions.PASSWORD,
+                        AerospikeSinkOptions.KEY_FIELD,
+                        AerospikeSinkOptions.BIN_NAME,
+                        AerospikeSinkOptions.DATA_FORMAT,
+                        AerospikeSinkOptions.WRITE_TIMEOUT)
+                .build();
+    }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        return () -> new AerospikeSink(context.getOptions(), 
context.getCatalogTable());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeSinkWriter.java
 
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeSinkWriter.java
new file mode 100644
index 0000000000..ce5df6bfc1
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeSinkWriter.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.aerospike.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.aerospike.config.AerospikeDataType;
+import 
org.apache.seatunnel.connectors.seatunnel.aerospike.config.AerospikeSinkOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.aerospike.config.DataFormatType;
+import 
org.apache.seatunnel.connectors.seatunnel.aerospike.exception.AerospikeConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.aerospike.exception.AerospikeErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import com.aerospike.client.AerospikeClient;
+import com.aerospike.client.Bin;
+import com.aerospike.client.Key;
+import com.aerospike.client.policy.ClientPolicy;
+import com.aerospike.client.policy.RecordExistsAction;
+import com.aerospike.client.policy.WritePolicy;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.TypeReference;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeParseException;
+import java.time.temporal.TemporalAccessor;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class AerospikeSinkWriter extends AbstractSinkWriter<SeaTunnelRow, 
Void> {
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final ReadonlyConfig config;
+    private final SerializationSchema serializationSchema;
+    private final AerospikeClient aerospikeClient;
+    private final WritePolicy writePolicy;
+    private final AerospikeTypeConverter typeConverter;
+
+    public AerospikeSinkWriter(SeaTunnelRowType seaTunnelRowType, 
ReadonlyConfig config) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.config = config;
+        this.serializationSchema = new 
JsonSerializationSchema(seaTunnelRowType);
+        this.aerospikeClient = buildClient();
+
+        this.writePolicy = new WritePolicy();
+        this.writePolicy.recordExistsAction = RecordExistsAction.UPDATE;
+        this.writePolicy.totalTimeout = 
config.get(AerospikeSinkOptions.WRITE_TIMEOUT);
+        this.writePolicy.socketTimeout = 
config.get(AerospikeSinkOptions.WRITE_TIMEOUT);
+        this.writePolicy.sleepBetweenRetries = 0;
+        this.writePolicy.maxRetries = 0;
+        this.typeConverter = new AerospikeTypeConverter(seaTunnelRowType, 
config);
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        try {
+            String data = new String(serializationSchema.serialize(element));
+            String keyField = config.get(AerospikeSinkOptions.KEY_FIELD);
+            String key = 
element.getField(seaTunnelRowType.indexOf(keyField)).toString();
+
+            Key aerospikeKey =
+                    new Key(
+                            config.get(AerospikeSinkOptions.NAMESPACE),
+                            config.get(AerospikeSinkOptions.SET),
+                            key);
+
+            String formatValue = 
config.get(AerospikeSinkOptions.DATA_FORMAT).toLowerCase();
+            DataFormatType formatType = DataFormatType.fromString(formatValue);
+
+            switch (formatType) {
+                case MAP:
+                    Map<String, Object> dataMap =
+                            JSON.parseObject(data, new 
TypeReference<Map<String, Object>>() {});
+                    Map<String, Object> filteredMap = new HashMap<>();
+                    for (String fieldName : typeConverter.getFieldNames()) {
+                        filteredMap.put(fieldName, dataMap.get(fieldName));
+                    }
+                    Map<String, Object> convertedMap = new HashMap<>();
+                    for (Map.Entry<String, Object> entry : 
filteredMap.entrySet()) {
+                        String fieldName = entry.getKey();
+                        Object value = entry.getValue();
+                        AerospikeDataType dataType = 
typeConverter.getFieldType(fieldName);
+                        convertedMap.put(fieldName, convertValue(value, 
dataType));
+                    }
+                    Bin dataBin = new 
Bin(config.get(AerospikeSinkOptions.BIN_NAME), convertedMap);
+                    aerospikeClient.put(writePolicy, aerospikeKey, dataBin);
+                    break;
+
+                case STRING:
+                    Map<String, Object> filteredDataMap = new HashMap<>();
+                    for (String fieldName : typeConverter.getFieldNames()) {
+                        int index = seaTunnelRowType.indexOf(fieldName);
+                        filteredDataMap.put(fieldName, 
element.getField(index));
+                    }
+                    String filteredData = JSON.toJSONString(filteredDataMap);
+                    Bin stringBin =
+                            new Bin(config.get(AerospikeSinkOptions.BIN_NAME), 
filteredData);
+                    aerospikeClient.put(writePolicy, aerospikeKey, stringBin);
+                    break;
+
+                case KV:
+                    Map<String, Object> fieldsMap =
+                            JSON.parseObject(data, new 
TypeReference<Map<String, Object>>() {});
+                    List<Bin> bins = new ArrayList<>();
+                    Map<String, String> configFieldTypes =
+                            config.get(AerospikeSinkOptions.FIELD_TYPES);
+                    for (String fieldName : configFieldTypes.keySet()) {
+                        Object value = fieldsMap.get(fieldName);
+                        AerospikeDataType dataType = 
typeConverter.getFieldType(fieldName);
+                        Object convertedValue = convertValue(value, dataType);
+                        bins.add(new Bin(fieldName, convertedValue));
+                    }
+                    aerospikeClient.put(writePolicy, aerospikeKey, 
bins.toArray(new Bin[0]));
+                    break;
+
+                default:
+                    throw new IllegalArgumentException(
+                            "Unsupported data format type: " + formatType);
+            }
+        } catch (Exception e) {
+            throw new AerospikeConnectorException(
+                    AerospikeErrorCode.WRITER_OPERATION_FAILED, "Failed to 
write record", e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            if (Objects.nonNull(aerospikeClient)) {
+                aerospikeClient.close();
+            }
+        } catch (Exception e) {
+            throw new AerospikeConnectorException(
+                    AerospikeErrorCode.WRITER_CLOSE_FAILED, "Failed to close 
writer", e);
+        }
+    }
+
+    private AerospikeClient buildClient() {
+        ClientPolicy clientPolicy = new ClientPolicy();
+        clientPolicy.user = config.get(AerospikeSinkOptions.USERNAME);
+        clientPolicy.password = config.get(AerospikeSinkOptions.PASSWORD);
+        clientPolicy.timeout = config.get(AerospikeSinkOptions.WRITE_TIMEOUT);
+        clientPolicy.maxConnsPerNode = 300;
+
+        return new AerospikeClient(
+                clientPolicy,
+                config.get(AerospikeSinkOptions.HOST),
+                config.get(AerospikeSinkOptions.PORT));
+    }
+
+    private Object convertValue(Object value, AerospikeDataType dataType) {
+        if (value == null) {
+            return null;
+        }
+
+        switch (dataType) {
+            case STRING:
+                return value.toString();
+            case INTEGER:
+                if (value instanceof Number) {
+                    return ((Number) value).intValue();
+                }
+                return Integer.parseInt(value.toString());
+            case LONG:
+                if (value instanceof Number) {
+                    return ((Number) value).longValue();
+                } else if (value instanceof TemporalAccessor) {
+                    return convertTimestampToLong(value);
+                } else if (value instanceof String) {
+                    Optional<Long> timestamp = tryParseDateTime((String) 
value);
+                    return timestamp.orElseGet(() -> Long.parseLong((String) 
value));
+                } else {
+                    return Long.parseLong(value.toString());
+                }
+            case DOUBLE:
+                if (value instanceof Number) {
+                    return ((Number) value).doubleValue();
+                }
+                return Double.parseDouble(value.toString());
+            case BOOLEAN:
+                if (value instanceof Boolean) {
+                    return value;
+                }
+                return Boolean.parseBoolean(value.toString());
+            case BYTEARRAY:
+                if (value.getClass().isArray()) {
+                    return value;
+                }
+                throw new IllegalArgumentException(
+                        "Expected Array type but got: " + value.getClass());
+            case LIST:
+                if (value instanceof Iterable) {
+                    return value;
+                }
+                throw new IllegalArgumentException(
+                        "Expected List type but got: " + value.getClass());
+            default:
+                throw new IllegalArgumentException("Unsupported AEROSPIKE data 
type: " + dataType);
+        }
+    }
+
+    private long parseDateTimeString(String datetime) {
+        try {
+            return LocalDateTime.parse(datetime)
+                    .atZone(ZoneId.systemDefault())
+                    .toInstant()
+                    .toEpochMilli();
+        } catch (DateTimeParseException e) {
+            try {
+                return Instant.parse(datetime).toEpochMilli();
+            } catch (DateTimeParseException ex) {
+                throw new IllegalArgumentException("Unsupported datetime 
format: " + datetime);
+            }
+        }
+    }
+
+    private Optional<Long> tryParseDateTime(String datetime) {
+        try {
+            return Optional.of(parseDateTimeString(datetime));
+        } catch (DateTimeParseException e) {
+            return Optional.empty();
+        }
+    }
+
+    private long convertTimestampToLong(Object timestamp) {
+        if (timestamp instanceof TemporalAccessor) {
+            Instant instant = Instant.from((TemporalAccessor) timestamp);
+            return instant.toEpochMilli();
+        }
+        throw new IllegalArgumentException("Unsupported timestamp type: " + 
timestamp.getClass());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeTypeConverter.java
 
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeTypeConverter.java
new file mode 100644
index 0000000000..eb0c3476ac
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-aerospike/src/main/java/org/apache/seatunnel/connectors/seatunnel/aerospike/sink/AerospikeTypeConverter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.aerospike.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.aerospike.config.AerospikeDataType;
+import 
org.apache.seatunnel.connectors.seatunnel.aerospike.config.AerospikeSinkOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.aerospike.exception.AerospikeConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.aerospike.exception.AerospikeErrorCode;
+
+import lombok.Getter;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AerospikeTypeConverter {
+
+    private final Map<String, AerospikeDataType> fieldTypeMapping;
+    @Getter private final List<String> fieldNames;
+
+    public AerospikeTypeConverter(SeaTunnelRowType rowType, ReadonlyConfig 
config) {
+        this.fieldTypeMapping = new HashMap<>();
+        Map<String, String> configFieldTypes = 
config.get(AerospikeSinkOptions.FIELD_TYPES);
+
+        if (configFieldTypes == null || configFieldTypes.isEmpty()) {
+            String[] allFields = rowType.getFieldNames();
+            this.fieldNames = Arrays.asList(allFields);
+            for (String field : allFields) {
+                int index = rowType.indexOf(field);
+                SeaTunnelDataType<?> seaTunnelType = 
rowType.getFieldType(index);
+                fieldTypeMapping.put(field, mapSeaTunnelType(seaTunnelType));
+            }
+        } else {
+            this.fieldNames = new ArrayList<>(configFieldTypes.keySet());
+            for (String fieldName : configFieldTypes.keySet()) {
+                int index = rowType.indexOf(fieldName);
+                if (index == -1) {
+                    throw new AerospikeConnectorException(
+                            AerospikeErrorCode.INVALID_CONFIG,
+                            "Field '" + fieldName + "' not found in source 
data");
+                }
+                fieldTypeMapping.put(
+                        fieldName, 
AerospikeDataType.valueOf(configFieldTypes.get(fieldName)));
+            }
+        }
+    }
+
+    private AerospikeDataType mapSeaTunnelType(SeaTunnelDataType<?> 
seaTunnelType) {
+        switch (seaTunnelType.getSqlType()) {
+            case STRING:
+                return AerospikeDataType.STRING;
+            case INT:
+                return AerospikeDataType.INTEGER;
+            case BIGINT:
+                return AerospikeDataType.LONG;
+            case DOUBLE:
+                return AerospikeDataType.DOUBLE;
+            case BOOLEAN:
+                return AerospikeDataType.BOOLEAN;
+            case ARRAY:
+                if (!(seaTunnelType instanceof ArrayType)) {
+                    throw new AerospikeConnectorException(
+                            AerospikeErrorCode.UNSUPPORTED_DATA_TYPE,
+                            "Invalid ARRAY type: " + 
seaTunnelType.getClass().getSimpleName());
+                }
+                return AerospikeDataType.BYTEARRAY;
+            case DATE:
+            case TIMESTAMP:
+                return AerospikeDataType.LONG;
+            default:
+                throw new AerospikeConnectorException(
+                        AerospikeErrorCode.UNSUPPORTED_DATA_TYPE,
+                        "Unsupported SeaTunnel type: " + 
seaTunnelType.getSqlType());
+        }
+    }
+
+    public AerospikeDataType getFieldType(String fieldName) {
+        AerospikeDataType type = fieldTypeMapping.get(fieldName);
+        if (type == null) {
+            throw new AerospikeConnectorException(
+                    AerospikeErrorCode.UNSUPPORTED_DATA_TYPE,
+                    "No type mapping for field: " + fieldName);
+        }
+        return type;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-aerospike/src/test/java/org/apache/seatunnel/connectors/seatunnel/aerospike/AerospikeFactoryTest.java
 
b/seatunnel-connectors-v2/connector-aerospike/src/test/java/org/apache/seatunnel/connectors/seatunnel/aerospike/AerospikeFactoryTest.java
new file mode 100644
index 0000000000..46b49bf084
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-aerospike/src/test/java/org/apache/seatunnel/connectors/seatunnel/aerospike/AerospikeFactoryTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.aerospike;
+
+import 
org.apache.seatunnel.connectors.seatunnel.aerospike.sink.AerospikeSinkFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class AerospikeFactoryTest {
+
+    @Test
+    void optionRule() {
+        Assertions.assertNotNull((new AerospikeSinkFactory()).optionRule());
+        Assertions.assertNotNull((new AerospikeSinkFactory()).optionRule());
+    }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index f686098f40..ecefdcc1cd 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -83,6 +83,7 @@
         <module>connector-qdrant</module>
         <module>connector-sls</module>
         <module>connector-typesense</module>
+        <module>connector-aerospike</module>
     </modules>
 
     <dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 54ce25e1b1..d635d2ea4e 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -961,6 +961,13 @@
                     <scope>provided</scope>
                 </dependency>
 
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-aerospike</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
+
                 <dependency>
                     <groupId>org.apache.seatunnel</groupId>
                     <artifactId>seatunnel-hadoop-aws</artifactId>
diff --git a/seatunnel-dist/release-docs/NOTICE 
b/seatunnel-dist/release-docs/NOTICE
index a2c7980dcf..0884ac9429 100644
--- a/seatunnel-dist/release-docs/NOTICE
+++ b/seatunnel-dist/release-docs/NOTICE
@@ -823,4 +823,9 @@ Ocelli project by Netflix Inc. 
(https://github.com/Netflix/ocelli/).
 
 =========================================================================
 
-
+ 
+===============================================================================
+ + Third-party dependencies for Aerospike connector:
+ 
+===============================================================================
+ + com.aerospike:aerospike-client 
(https://github.com/aerospike/aerospike-client-java)
+ + Copyright 2012-2023 Aerospike, Inc.
+ + Licensed under the Apache License, Version 2.0
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/pom.xml
new file mode 100644
index 0000000000..813d95e6ae
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+    <artifactId>connector-aerospike-e2e</artifactId>
+    <name>SeaTunnel : E2E : Connector V2 : Aerospike</name>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>com.aerospike</groupId>
+            <artifactId>aerospike-client</artifactId>
+            <version>6.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-file-local</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-aerospike</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/java/org/apache/seatunnel/e2e/connector/aerospike/AbstractAerospikeIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/java/org/apache/seatunnel/e2e/connector/aerospike/AbstractAerospikeIT.java
new file mode 100644
index 0000000000..93b0410f1d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/java/org/apache/seatunnel/e2e/connector/aerospike/AbstractAerospikeIT.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.aerospike;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.aerospike.client.AerospikeClient;
+import com.aerospike.client.Bin;
+import com.aerospike.client.Host;
+import com.aerospike.client.Key;
+import com.aerospike.client.Record;
+import com.aerospike.client.policy.ClientPolicy;
+import com.aerospike.client.policy.ScanPolicy;
+import com.aerospike.client.policy.WritePolicy;
+import com.alibaba.fastjson.JSON;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class AbstractAerospikeIT extends TestSuiteBase implements 
TestResource {
+
+    protected static final String NAMESPACE = "test";
+    protected static final String SET_NAME = "seatunnel";
+    private static final int AEROSPIKE_PORT = 3000;
+    private static final String AEROSPIKE_HOST = "aerospike-host";
+
+    protected AerospikeClient client;
+    protected GenericContainer<?> container;
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        container =
+                new GenericContainer<>(getDockerImage())
+                        .withExposedPorts(3000, 3001, 3002, 3003)
+                        .withNetworkAliases(AEROSPIKE_HOST)
+                        .withNetwork(NETWORK)
+                        .withEnv("AEROSPIKE_NAMESPACE", NAMESPACE)
+                        .withEnv("AEROSPIKE_MEM_GB", "1")
+                        .withEnv("AEROSPIKE_ACCESS_ADDRESS", AEROSPIKE_HOST)
+                        .withEnv("AEROSPIKE_ALTERNATE_ACCESS_ADDRESS", 
AEROSPIKE_HOST)
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        
DockerLoggerFactory.getLogger(getDockerImageName())))
+                        .waitingFor(
+                                Wait.forLogMessage(".*service ready: 
soon.*\\n", 1)
+                                        
.withStartupTimeout(Duration.ofMinutes(3)))
+                        .withCreateContainerCmdModifier(cmd -> 
cmd.withHostName(AEROSPIKE_HOST));
+
+        container.start();
+
+        try {
+            Thread.sleep(5000);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+
+        ClientPolicy policy = new ClientPolicy();
+        policy.timeout = 30000;
+        policy.failIfNotConnected = true;
+        policy.readPolicyDefault.maxRetries = 10;
+        policy.writePolicyDefault.maxRetries = 10;
+
+        Host[] hosts =
+                new Host[] {new Host(container.getHost(), 
container.getMappedPort(AEROSPIKE_PORT))};
+
+        client = new AerospikeClient(policy, hosts);
+
+        // Verify connection
+        if (!client.isConnected()) {
+            throw new IllegalStateException("Failed to connect to Aerospike 
server");
+        }
+    }
+
+    private void insertTestData() {
+        WritePolicy writePolicy = new WritePolicy();
+        for (int i = 0; i < 100; i++) {
+            Key key = new Key(NAMESPACE, SET_NAME, "seed_" + i);
+            Bin bin1 = new Bin("id", i);
+            Bin bin2 = new Bin("data", "seed-data-" + i);
+            client.put(writePolicy, key, bin1, bin2);
+        }
+    }
+
+    @TestTemplate
+    public void testAerospikeSink(TestContainer container) throws Exception {
+        Container.ExecResult execResult = 
container.executeJob("/fake_to_aerospike_sink.conf");
+        validateSinkData();
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testWriteToAerospike(TestContainer container) throws Exception 
{
+        final String testKey = "multi_type_key";
+        Key key = new Key(NAMESPACE, SET_NAME, testKey);
+        Map<String, Object> complexData =
+                new HashMap<String, Object>() {
+                    {
+                        put("string_val", "seatunnel_test");
+                        put("int_val", 2023);
+                        put("double_val", 3.1415926);
+                        put("bool_val", true);
+                        put("long_val", 10000000000L);
+                        put("byte_val", new byte[] {0x01, 0x02});
+                        final List<String> places = new ArrayList<>();
+                        places.add("a");
+                        put("array_val", places);
+                        put(
+                                "nested_map",
+                                new HashMap<String, Object>() {
+                                    {
+                                        put("child_str", "nested_value");
+                                        put("child_int", 456);
+                                    }
+                                });
+                    }
+                };
+
+        Bin mainBin = new Bin("complex_data", complexData);
+        Bin extraBin1 = new Bin("reported", 20240601);
+        Bin extraBin2 = new Bin("version", "v2.3.1");
+
+        client.put(null, key, mainBin, extraBin1, extraBin2);
+
+        Record record = client.get(null, key);
+        Assertions.assertNotNull(record, "write records should not be empty");
+        Assertions.assertEquals(3, record.bins.size(), "failed to verify the 
bin quantity");
+    }
+
+    @TestTemplate
+    public void testReadFromAerospike(TestContainer container) throws 
Exception {
+        testWriteToAerospike(container);
+        final String testKey = "multi_type_key";
+        Key key = new Key(NAMESPACE, SET_NAME, testKey);
+
+        Record record = client.get(null, key);
+        Assertions.assertNotNull(record, "no data of the specified key was 
queried");
+
+        Assertions.assertEquals(20240601, ((Number) 
record.bins.get("reported")).intValue());
+        Assertions.assertEquals("v2.3.1", record.bins.get("version"));
+
+        Map<String, Object> data = (Map<String, Object>) 
record.bins.get("complex_data");
+
+        Assertions.assertEquals("seatunnel_test", data.get("string_val"));
+        Assertions.assertEquals(2023, ((Number) 
data.get("int_val")).intValue());
+        Assertions.assertEquals(3.1415926, (Double) data.get("double_val"), 
0.0001);
+        Assertions.assertEquals(true, data.get("bool_val"));
+        Assertions.assertEquals(10000000000L, data.get("long_val"));
+
+        Assertions.assertArrayEquals(new byte[] {0x01, 0x02}, (byte[]) 
data.get("byte_val"));
+
+        List<String> array = (List<String>) data.get("array_val");
+        Assertions.assertEquals("a", array.get(0));
+
+        Map<String, Object> nested = (Map<String, Object>) 
data.get("nested_map");
+        Assertions.assertEquals("nested_value", nested.get("child_str"));
+        Assertions.assertEquals(456, ((Number) 
nested.get("child_int")).intValue());
+    }
+
+    @TestTemplate
+    public void testUpdateData(TestContainer container) throws Exception {
+        final String testKey = "update_test_key";
+        Map<String, Object> initialData = new HashMap<>();
+        initialData.put("version", 1L);
+        initialData.put("status", "active");
+        client.put(null, new Key(NAMESPACE, SET_NAME, testKey), new 
Bin("data", initialData));
+        Map<String, Object> updateData = new HashMap<>();
+        updateData.put("version", 2L);
+        updateData.put("status", "inactive");
+        updateData.put("modified_time", System.currentTimeMillis());
+        client.put(null, new Key(NAMESPACE, SET_NAME, testKey), new 
Bin("data", updateData));
+
+        Record record = client.get(null, new Key(NAMESPACE, SET_NAME, 
testKey));
+        Assertions.assertEquals(updateData, record.bins.get("data"), "the data 
update failed");
+    }
+
+    @TestTemplate
+    public void testQueryByKey(TestContainer container) throws Exception {
+        final int testKey = 1234;
+        Map<String, Object> testData = new HashMap<>();
+        testData.put("id", 1001L);
+        testData.put(
+                "nested",
+                new HashMap<String, Object>() {
+                    {
+                        put("field1", "value1");
+                        put("field2", 3.14);
+                    }
+                });
+        client.put(null, new Key(NAMESPACE, SET_NAME, testKey), new 
Bin("data", testData));
+
+        Record result = client.get(null, new Key(NAMESPACE, SET_NAME, 
testKey));
+
+        Assertions.assertNotNull(result, "no data of the specified key was 
queried");
+        Assertions.assertEquals(
+                testData, result.bins.get("data"), "the query result data is 
inconsistent");
+
+        Map<String, Object> resultData = (Map<String, Object>) 
result.bins.get("data");
+        Map<String, Object> nested = (Map<String, Object>) 
resultData.get("nested");
+        Assertions.assertTrue(
+                nested.get("field2") instanceof Double, "nested field type is 
incorrect");
+    }
+
+    @TestTemplate
+    public void testDeleteAll(TestContainer container) throws Exception {
+        final String tempSet = "temp_delete_set";
+
+        for (int i = 0; i < 5; i++) {
+            Key key = new Key(NAMESPACE, tempSet, "key_" + i);
+            client.put(null, key, new Bin("data", "test_value_" + i));
+        }
+
+        Assertions.assertDoesNotThrow(
+                () -> {
+                    client.scanAll(
+                            null,
+                            NAMESPACE,
+                            tempSet,
+                            (key, record) -> {
+                                client.delete(null, key);
+                            });
+                },
+                "the delete operation throws an exception");
+
+        AtomicInteger count = new AtomicInteger();
+        client.scanAll(null, NAMESPACE, tempSet, (key, record) -> 
count.incrementAndGet());
+        Assertions.assertEquals(0, count.get(), "data deletion is not 
complete");
+    }
+
+    private void validateSinkData() {
+        ScanPolicy scanPolicy = new ScanPolicy();
+
+        client.scanAll(
+                scanPolicy,
+                NAMESPACE,
+                SET_NAME,
+                (key, record) -> {
+                    System.out.println("key: " + key.toString());
+                    System.out.println("record: " + JSON.toJSONString(record));
+                });
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() {
+        if (client != null) {
+            client.close();
+        }
+        if (container != null) {
+            container.stop();
+        }
+    }
+
+    abstract DockerImageName getDockerImage();
+
+    abstract String getDockerImageName();
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/java/org/apache/seatunnel/e2e/connector/aerospike/Aerospike6IT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/java/org/apache/seatunnel/e2e/connector/aerospike/Aerospike6IT.java
new file mode 100644
index 0000000000..cbbb9ce5d9
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/java/org/apache/seatunnel/e2e/connector/aerospike/Aerospike6IT.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.e2e.connector.aerospike;
+
+import org.testcontainers.utility.DockerImageName;
+
+public class Aerospike6IT extends AbstractAerospikeIT {
+    @Override
+    DockerImageName getDockerImage() {
+        return DockerImageName.parse("aerospike/aerospike-server:latest");
+    }
+
+    @Override
+    String getDockerImageName() {
+        return "aerospike6-e2e";
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/java/org/apache/seatunnel/e2e/connector/aerospike/AerospikeContainerInfo.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/java/org/apache/seatunnel/e2e/connector/aerospike/AerospikeContainerInfo.java
new file mode 100644
index 0000000000..7108137ba1
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/java/org/apache/seatunnel/e2e/connector/aerospike/AerospikeContainerInfo.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.e2e.connector.aerospike;
+
+public class AerospikeContainerInfo {
+    private final String host;
+    private final int port;
+    private final String image;
+
+    public AerospikeContainerInfo(String host, int port, String image) {
+        this.host = host;
+        this.port = port;
+        this.image = image;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public String getImage() {
+        return image;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/resources/fake_to_aerospike_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/resources/fake_to_aerospike_sink.conf
new file mode 100644
index 0000000000..78a0e7f782
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-aerospike-e2e/src/test/resources/fake_to_aerospike_sink.conf
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = BATCH
+
+  #spark config
+  spark.app.name = SeaTunnel
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = 1g
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 9
+    string.fake.mode = "template"
+    string.template = ["tyrantlucifer", "hailin", "kris", "fanjia", "zongwen", 
"gaojun"]
+    int.fake.mode = "template"
+    int.template = [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
+    double.fake.mode = "template"
+    double.template = [44.0, 45.0, 46.0, 47.0]
+    timestamp.fake.mode = "template"
+    timestamp.template = ["2022-01-01 00:00:00", "2022-01-01 00:00:01", 
"2022-01-01 00:00:02", "2022-01-01 00:00:03"]
+    schema {
+      fields {
+        c_id = int
+        c_name = string
+        c_money = double
+        c_birth = timestamp
+      }
+    }
+  }
+}
+
+sink {
+  Aerospike {
+      bin_name = "data",
+      schema = {
+        field = {
+            c_id = INTEGER
+            c_name = STRING
+            c_money = DOUBLE
+            c_birth = LONG
+        }
+      },
+      username="",
+      password="",
+      set = "seatunnel",
+      port = 3000
+      data_format = "string",
+      host = "aerospike-host",
+      namespace = "test",
+      key = "c_id"
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index f3008889df..adf120deef 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -83,6 +83,7 @@
         <module>connector-email-e2e</module>
         <module>connector-cdc-opengauss-e2e</module>
         <module>connector-cdc-tidb-e2e</module>
+        <module>connector-aerospike-e2e</module>
     </modules>
 
     <dependencies>

Reply via email to