This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 073ebeeddf [Feature][Connector-V2] Support CDC mode for databend sink 
connector (#9661)
073ebeeddf is described below

commit 073ebeeddf3b25617121561b746c65befb71e262
Author: Jeremy <[email protected]>
AuthorDate: Mon Sep 22 23:01:18 2025 +0800

    [Feature][Connector-V2] Support CDC mode for databend sink connector (#9661)
---
 docs/en/connector-v2/sink/Databend.md              |  53 ++-
 docs/zh/connector-v2/sink/Databend.md              |  54 ++-
 .../databend/catalog/DatabendCatalog.java          |   2 +-
 .../seatunnel/databend/config/DatabendOptions.java |   6 +
 .../databend/config/DatabendSinkConfig.java        | 229 +++++-----
 .../databend/config/DatabendSinkOptions.java       |  18 +
 .../seatunnel/databend/sink/DatabendSink.java      | 141 ++++++-
 .../sink/DatabendSinkAggregatedCommitInfo.java     |  58 +++
 .../sink/DatabendSinkAggregatedCommitter.java      | 250 +++++++++++
 .../databend/sink/DatabendSinkCommitterInfo.java   |  65 +++
 .../databend/sink/DatabendSinkWriter.java          | 470 ++++++++++++++++++---
 .../databend/sink/DatabendSinkWriterTest.java      | 211 +++++++++
 .../e2e/connector/databend/DatabendCDCSinkIT.java  | 335 +++++++++++++++
 .../resources/databend/fake_to_databend_cdc.conf   |  92 ++++
 .../container/seatunnel/SeaTunnelContainer.java    |   1 +
 15 files changed, 1785 insertions(+), 200 deletions(-)

diff --git a/docs/en/connector-v2/sink/Databend.md 
b/docs/en/connector-v2/sink/Databend.md
index e040d2c355..4999a600c9 100644
--- a/docs/en/connector-v2/sink/Databend.md
+++ b/docs/en/connector-v2/sink/Databend.md
@@ -12,9 +12,9 @@ import ChangeLog from '../changelog/connector-databend.md';
 
 ## Key Features
 
-- [ ] [Exactly-Once](../../concept/connector-v2-features.md)
 - [ ] [Support Multi-table Writing](../../concept/connector-v2-features.md)
-- [ ] [CDC](../../concept/connector-v2-features.md)
+- [x] [Exactly-Once](../../concept/connector-v2-features.md)
+- [x] [CDC](../../concept/connector-v2-features.md)
 - [x] [Parallelism](../../concept/connector-v2-features.md)
 
 ## Description
@@ -34,21 +34,23 @@ The Databend sink internally implements bulk data import 
through stage attachmen
 
 ## Sink Options
 
-| Name | Type | Required | Default Value | Description                         
        |
-|------|------|----------|---------------|---------------------------------------------|
-| url | String | Yes | - | Databend JDBC connection URL               |
-| username | String | Yes | - | Databend database username                    |
-| password | String | Yes | - | Databend database password                     
|
-| database | String | No | - | Databend database name, defaults to the 
database name specified in the connection URL |
-| table | String | No | - | Databend table name                       |
-| batch_size | Integer | No | 1000 | Number of records for batch writing       
                    |
-| auto_commit | Boolean | No | true | Whether to auto-commit transactions      
                     |
-| max_retries | Integer | No | 3 | Maximum retry attempts on write failure     
                  |
-| schema_save_mode | Enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | Schema save 
mode                      |
-| data_save_mode | Enum | No | APPEND_DATA | Data save mode                    
        |
-| custom_sql | String | No | - | Custom write SQL, typically used for complex 
write scenarios              |
+| Name                | Type | Required | Default Value | Description          
                       |
+|---------------------|------|----------|---------------|---------------------------------------------|
+| url                 | String | Yes | - | Databend JDBC connection URL        
       |
+| username            | String | Yes | - | Databend database username          
          |
+| password            | String | Yes | - | Databend database password          
           |
+| database            | String | No | - | Databend database name, defaults to 
the database name specified in the connection URL |
+| table               | String | No | - | Databend table name                  
     |
+| batch_size          | Integer | No | 1000 | Number of records for batch 
writing                           |
+| auto_commit         | Boolean | No | true | Whether to auto-commit 
transactions                           |
+| max_retries         | Integer | No | 3 | Maximum retry attempts on write 
failure                       |
+| schema_save_mode    | Enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | Schema save 
mode                      |
+| data_save_mode      | Enum | No | APPEND_DATA | Data save mode               
             |
+| custom_sql          | String | No | - | Custom write SQL, typically used for 
complex write scenarios              |
 | execute_timeout_sec | Integer | No | 300 | SQL execution timeout (seconds)   
                   |
-| jdbc_config | Map | No | - | Additional JDBC connection configuration, such 
as connection timeout parameters             |
+| jdbc_config         | Map | No | - | Additional JDBC connection 
configuration, such as connection timeout parameters             |
+| conflict_key        | String | No | - | Conflict key for CDC mode, used to 
determine the primary key for conflict resolution |
+| enable_delete       | Boolean | No | false | Whether to allow delete 
operations in CDC mode |
 
 ### schema_save_mode [Enum]
 
@@ -152,6 +154,25 @@ sink {
 }
 ```
 
+### CDC mode
+
+```hocon
+sink {
+  Databend {
+    url = "jdbc:databend://databend:8000/default?ssl=false"
+    username = "root"
+    password = ""
+    database = "default"
+    table = "sink_table"
+    
+    # Enable CDC mode
+    batch_size = 1
+    conflict_key = "id"
+    enable_delete = true
+  }
+}
+```
+
 ## Related Links
 
 - [Databend Official Website](https://databend.rs/)
diff --git a/docs/zh/connector-v2/sink/Databend.md 
b/docs/zh/connector-v2/sink/Databend.md
index aa4b9dab15..62944b6b18 100644
--- a/docs/zh/connector-v2/sink/Databend.md
+++ b/docs/zh/connector-v2/sink/Databend.md
@@ -12,9 +12,9 @@ import ChangeLog from '../changelog/connector-databend.md';
 
 ## 主要特性
 
-- [ ] [精确一次](../../concept/connector-v2-features.md)
 - [ ] [支持多表写入](../../concept/connector-v2-features.md)
-- [ ] [cdc](../../concept/connector-v2-features.md)
+- [x] [精确一次](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
 - [x] [并行度](../../concept/connector-v2-features.md)
 
 ## 描述
@@ -34,21 +34,23 @@ Databend sink 内部通过 stage attachment 实现数据的批量导入。
 
 ## Sink 选项
 
-| 名称 | 类型 | 是否必须 | 默认值 | 描述                                 |
-|------|------|----------|--------|------------------------------------|
-| url | String | 是 | - | Databend JDBC 连接 URL               |
-| username | String | 是 | - | Databend 数据库用户名                    |
-| password | String | 是 | - | Databend 数据库密码                     |
-| database | String | 否 | - | Databend 数据库名称,默认使用连接 URL 中指定的数据库名 |
-| table | String | 否 | - | Databend 表名称                       |
-| batch_size | Integer | 否 | 1000 | 批量写入的记录数                           |
-| auto_commit | Boolean | 否 | true | 是否自动提交事务                           |
-| max_retries | Integer | 否 | 3 | 写入失败时的最大重试次数                       |
-| schema_save_mode | Enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | 保存 Schema 的模式   
                   |
-| data_save_mode | Enum | 否 | APPEND_DATA | 保存数据的模式                            
|
-| custom_sql | String | 否 | - | 自定义写入 SQL,通常用于复杂的写入场景              |
+| 名称                  | 类型 | 是否必须 | 默认值 | 描述                                 |
+|---------------------|------|----------|--------|------------------------------------|
+| url                 | String | 是 | - | Databend JDBC 连接 URL               |
+| username            | String | 是 | - | Databend 数据库用户名                    |
+| password            | String | 是 | - | Databend 数据库密码                     |
+| database            | String | 否 | - | Databend 数据库名称,默认使用连接 URL 中指定的数据库名 |
+| table               | String | 否 | - | Databend 表名称                       |
+| batch_size          | Integer | 否 | 1000 | 批量写入的记录数                          
 |
+| auto_commit         | Boolean | 否 | true | 是否自动提交事务                          
 |
+| max_retries         | Integer | 否 | 3 | 写入失败时的最大重试次数                       |
+| schema_save_mode    | Enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | 保存 Schema 
的模式                      |
+| data_save_mode      | Enum | 否 | APPEND_DATA | 保存数据的模式                       
     |
+| custom_sql          | String | 否 | - | 自定义写入 SQL,通常用于复杂的写入场景              |
 | execute_timeout_sec | Integer | 否 | 300 | 执行SQL的超时时间(秒)                      
|
-| jdbc_config | Map | 否 | - | 额外的 JDBC 连接配置,如连接超时参数等             |
+| jdbc_config         | Map | 否 | - | 额外的 JDBC 连接配置,如连接超时参数等             |
+| conflict_key        | String | 否 | - | cdc 模式下的冲突键,用于确定冲突解决的主键 |
+| enable_delete       | Boolean | 否 | false | cdc 模式下是否允许删除操作 |
 
 ### schema_save_mode [Enum]
 
@@ -152,6 +154,26 @@ sink {
 }
 ```
 
+### CDC mode
+
+```hocon
+sink {
+  Databend {
+    url = "jdbc:databend://databend:8000/default?ssl=false"
+    username = "root"
+    password = ""
+    database = "default"
+    table = "sink_table"
+    
+    # Enable CDC mode
+    batch_size = 1
+    interval = 3
+    conflict_key = "id"
+    enable_delete = true
+  }
+}
+```
+
 ## 相关链接
 
 - [Databend 官方网站](https://databend.rs/)
diff --git 
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/catalog/DatabendCatalog.java
 
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/catalog/DatabendCatalog.java
index 29d529bef3..56010f1d69 100644
--- 
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/catalog/DatabendCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/catalog/DatabendCatalog.java
@@ -449,7 +449,7 @@ public class DatabendCatalog implements Catalog {
                 return "TIME";
             case TIMESTAMP:
                 LocalTimeType timeType = (LocalTimeType) dataType;
-                return String.format("TIMESTAMP(%d)");
+                return "TIMESTAMP";
             default:
                 throw new DatabendConnectorException(
                         DatabendConnectorErrorCode.UNSUPPORTED_DATA_TYPE,
diff --git 
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendOptions.java
 
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendOptions.java
index e9cf5fec7e..184f30d088 100644
--- 
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendOptions.java
+++ 
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendOptions.java
@@ -97,4 +97,10 @@ public class DatabendOptions {
                     .booleanType()
                     .defaultValue(true)
                     .withDescription("Whether to auto commit for sink");
+
+    public static final Option<String> CONFLICT_KEY =
+            Options.key("conflict_key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The conflict key for sink, used in 
upsert mode");
 }
diff --git 
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendSinkConfig.java
 
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendSinkConfig.java
index a9176636a7..cf644de087 100644
--- 
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendSinkConfig.java
@@ -18,131 +18,154 @@
 package org.apache.seatunnel.connectors.seatunnel.databend.config;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.sink.DataSaveMode;
-import org.apache.seatunnel.api.sink.SchemaSaveMode;
 
 import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Properties;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.AUTO_COMMIT;
-import static 
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.BATCH_SIZE;
-import static 
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.JDBC_CONFIG;
-import static 
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.MAX_RETRIES;
-import static 
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.SSL;
-import static 
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.TABLE;
-import static 
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.URL;
-import static 
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.USERNAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendSinkOptions.CUSTOM_SQL;
-import static 
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendSinkOptions.DATA_SAVE_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendSinkOptions.EXECUTE_TIMEOUT_SEC;
-import static 
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendSinkOptions.SCHEMA_SAVE_MODE;
-
-@Setter
+@Slf4j
 @Getter
-@ToString
 public class DatabendSinkConfig implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    // common options
-    private String url;
-    private String username;
-    private String password;
-    private Boolean ssl;
-    private String database;
-    private String table;
-    private Boolean autoCommit;
-    private Integer batchSize;
-    private Integer maxRetries;
-    private Map<String, String> jdbcConfig;
-
-    // sink options
-    private Integer executeTimeoutSec;
-    private String customSql;
-    private SchemaSaveMode schemaSaveMode;
-    private DataSaveMode dataSaveMode;
-    private Properties properties;
+    private final String url;
+    private final String username;
+    private final String password;
+    private final String database;
+    private final String table;
+    private final boolean autoCommit;
+    private final int batchSize;
+    private final int executeTimeoutSec;
+    private final int interval;
+    private final String conflictKey;
+    private final boolean enableDelete;
+
+    private DatabendSinkConfig(Builder builder) {
+        this.url = builder.url;
+        this.username = builder.username;
+        this.password = builder.password;
+        this.database = builder.database;
+        this.table = builder.table;
+        this.autoCommit = builder.autoCommit;
+        this.batchSize = builder.batchSize;
+        this.executeTimeoutSec = builder.executeTimeoutSec;
+        this.interval = builder.interval;
+        this.conflictKey = builder.conflictKey;
+        this.enableDelete = builder.enableDelete;
+    }
 
     public static DatabendSinkConfig of(ReadonlyConfig config) {
-        DatabendSinkConfig sinkConfig = new DatabendSinkConfig();
-
-        // common options
-        sinkConfig.setUrl(config.get(URL));
-        sinkConfig.setUsername(config.get(USERNAME));
-        sinkConfig.setPassword(config.get(PASSWORD));
-        sinkConfig.setDatabase(config.get(DATABASE));
-        sinkConfig.setTable(config.get(TABLE));
-        sinkConfig.setAutoCommit(config.get(AUTO_COMMIT));
-        sinkConfig.setBatchSize(config.get(BATCH_SIZE));
-        sinkConfig.setMaxRetries(config.get(MAX_RETRIES));
-        sinkConfig.setJdbcConfig(config.get(JDBC_CONFIG));
-
-        // sink options
-        sinkConfig.setExecuteTimeoutSec(config.get(EXECUTE_TIMEOUT_SEC));
-        sinkConfig.setCustomSql(config.getOptional(CUSTOM_SQL).orElse(null));
-        sinkConfig.setSchemaSaveMode(config.get(SCHEMA_SAVE_MODE));
-        sinkConfig.setDataSaveMode(config.get(DATA_SAVE_MODE));
-        // Create properties for JDBC connection
-        Properties properties = new Properties();
-        if (sinkConfig.getJdbcConfig() != null) {
-            sinkConfig.getJdbcConfig().forEach(properties::setProperty);
+        return new Builder()
+                .withUrl(config.get(DatabendOptions.URL))
+                .withUsername(config.get(DatabendOptions.USERNAME))
+                .withPassword(config.get(DatabendOptions.PASSWORD))
+                .withDatabase(config.get(DatabendOptions.DATABASE))
+                .withTable(config.get(DatabendOptions.TABLE))
+                .withAutoCommit(config.get(DatabendOptions.AUTO_COMMIT))
+                .withBatchSize(config.get(DatabendOptions.BATCH_SIZE))
+                
.withExecuteTimeoutSec(config.get(DatabendSinkOptions.EXECUTE_TIMEOUT_SEC))
+                .withConflictKey(config.get(DatabendSinkOptions.CONFLICT_KEY))
+                .withAllowDelete(config.get(DatabendSinkOptions.ENABLE_DELETE))
+                .build();
+    }
+
+    public static class Builder {
+        private String url;
+        private String username;
+        private String password;
+        private String database;
+        private String table;
+        private boolean autoCommit = true;
+        private int batchSize = 1000;
+        private int executeTimeoutSec = 300;
+        private int interval = 30;
+        private String conflictKey;
+        private boolean enableDelete = false;
+
+        public Builder withUrl(String url) {
+            this.url = url;
+            return this;
+        }
+
+        public Builder withUsername(String username) {
+            this.username = username;
+            return this;
         }
-        if (!properties.containsKey("user")) {
-            properties.setProperty("user", sinkConfig.getUsername());
+
+        public Builder withPassword(String password) {
+            this.password = password;
+            return this;
         }
-        if (!properties.containsKey("password")) {
-            properties.setProperty("password", sinkConfig.getPassword());
+
+        public Builder withDatabase(String database) {
+            this.database = database;
+            return this;
         }
-        if (sinkConfig.getSsl() != null) {
-            properties.setProperty("ssl", sinkConfig.getSsl().toString());
+
+        public Builder withTable(String table) {
+            this.table = table;
+            return this;
         }
-        sinkConfig.setProperties(properties);
 
-        return sinkConfig;
-    }
+        public Builder withAutoCommit(boolean autoCommit) {
+            this.autoCommit = autoCommit;
+            return this;
+        }
 
-    // Change UserName, password, jdbcConfig to properties from 
databendSinkConfig
-    public Properties toProperties() {
-        Properties properties = new Properties();
-        properties.put("user", username);
-        properties.put("password", password);
-        properties.put("ssl", ssl);
-        if (jdbcConfig != null) {
-            jdbcConfig.forEach(properties::put);
+        public Builder withBatchSize(int batchSize) {
+            this.batchSize = batchSize;
+            return this;
         }
-        return properties;
-    }
-    /** Convert this config to a ReadonlyConfig */
-    public ReadonlyConfig toReadonlyConfig() {
-        Map<String, Object> map = new HashMap<>();
-        map.put(URL.key(), url);
-        map.put(USERNAME.key(), username);
-        map.put(PASSWORD.key(), password);
-        if (ssl != null) {
-            map.put(SSL.key(), ssl);
+
+        public Builder withExecuteTimeoutSec(int executeTimeoutSec) {
+            this.executeTimeoutSec = executeTimeoutSec;
+            return this;
         }
-        map.put(DATABASE.key(), database);
-        map.put(TABLE.key(), table);
-        map.put(AUTO_COMMIT.key(), autoCommit);
-        map.put(BATCH_SIZE.key(), batchSize);
-        map.put(MAX_RETRIES.key(), maxRetries);
-        if (jdbcConfig != null) {
-            map.put(JDBC_CONFIG.key(), jdbcConfig);
+
+        public Builder withInterval(int interval) {
+            this.interval = interval;
+            return this;
         }
-        map.put(EXECUTE_TIMEOUT_SEC.key(), executeTimeoutSec);
-        if (customSql != null) {
-            map.put(CUSTOM_SQL.key(), customSql);
+
+        public Builder withConflictKey(String conflictKey) {
+            this.conflictKey = conflictKey;
+            return this;
         }
-        map.put(SCHEMA_SAVE_MODE.key(), schemaSaveMode);
-        map.put(DATA_SAVE_MODE.key(), dataSaveMode);
 
-        return ReadonlyConfig.fromMap(map);
+        public Builder withAllowDelete(boolean allowDelete) {
+            this.enableDelete = allowDelete;
+            return this;
+        }
+
+        public DatabendSinkConfig build() {
+            return new DatabendSinkConfig(this);
+        }
+    }
+
+    public Properties getProperties() {
+        Properties properties = new Properties();
+        properties.setProperty("user", username);
+        properties.setProperty("password", password);
+        return properties;
+    }
+
+    public String getRawTableName() {
+        long timestamp = System.currentTimeMillis();
+        return table + "_raw_" + timestamp;
+    }
+
+    public String getStreamName() {
+        long timestamp = System.currentTimeMillis();
+        return table + "_stream_" + timestamp;
+    }
+
+    public Properties toProperties() {
+        return getProperties();
+    }
+
+    public boolean isCdcMode() {
+        return conflictKey != null && !conflictKey.isEmpty();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendSinkOptions.java
 
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendSinkOptions.java
index 19fa2823a1..3b186693a4 100644
--- 
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendSinkOptions.java
@@ -47,4 +47,22 @@ public class DatabendSinkOptions {
                     .intType()
                     .defaultValue(300)
                     .withDescription("The timeout seconds for Databend client 
execution");
+
+    public static final Option<Integer> BATCH_SIZE =
+            Options.key("batch_size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription("Batch size for CDC merge operations");
+
+    public static final Option<String> CONFLICT_KEY =
+            Options.key("conflict_key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Conflict key for CDC merge operations");
+
+    public static final Option<Boolean> ENABLE_DELETE =
+            Options.key("enable_delete")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether to allow delete operations in 
CDC mode");
 }
diff --git 
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSink.java
 
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSink.java
index 342dee7bab..284004a908 100644
--- 
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSink.java
+++ 
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSink.java
@@ -17,12 +17,16 @@
 
 package org.apache.seatunnel.connectors.seatunnel.databend.sink;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SchemaSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -50,13 +54,20 @@ import java.util.stream.Collectors;
 
 @Slf4j
 public class DatabendSink
-        implements SeaTunnelSink<SeaTunnelRow, Void, Void, Void>, 
SupportSaveMode {
+        implements SeaTunnelSink<
+                        SeaTunnelRow,
+                        Void,
+                        DatabendSinkCommitterInfo,
+                        DatabendSinkAggregatedCommitInfo>,
+                SupportSaveMode {
 
     private final CatalogTable catalogTable;
     private final SchemaSaveMode schemaSaveMode;
     private final DataSaveMode dataSaveMode;
     private final String database;
     private final String table;
+    private final String rawTableName;
+    private final String streamName;
     private final String customSql;
     private final boolean autoCommit;
     private final int batchSize;
@@ -64,6 +75,10 @@ public class DatabendSink
     private final DatabendSinkConfig databendSinkConfig;
     private ReadonlyConfig readonlyConfig;
 
+    // CDC infrastructure initialization fields
+    private boolean isCdcInfrastructureInitialized = false;
+    private JobContext jobContext;
+
     public DatabendSink(CatalogTable catalogTable, ReadonlyConfig options) {
         this.catalogTable = catalogTable;
         this.databendSinkConfig = DatabendSinkConfig.of(options);
@@ -82,6 +97,8 @@ public class DatabendSink
         } else {
             this.table = configuredTable;
         }
+        this.rawTableName = databendSinkConfig.getRawTableName();
+        this.streamName = databendSinkConfig.getStreamName();
         this.autoCommit = options.get(DatabendOptions.AUTO_COMMIT);
         this.batchSize = options.get(DatabendOptions.BATCH_SIZE);
         this.executeTimeoutSec = 
options.get(DatabendSinkOptions.EXECUTE_TIMEOUT_SEC);
@@ -111,6 +128,13 @@ public class DatabendSink
         log.info("Auto commit: {}", autoCommit);
         log.info("Batch size: {}", batchSize);
         log.info("Execute timeout: {} seconds", executeTimeoutSec);
+
+        // CDC mode info
+        if (databendSinkConfig.isCdcMode()) {
+            log.info("CDC mode enabled with conflict key: {}", 
databendSinkConfig.getConflictKey());
+            log.info("Enable delete: {}", databendSinkConfig.isEnableDelete());
+            log.info("Interval: {} seconds", databendSinkConfig.getInterval());
+        }
     }
 
     @Override
@@ -119,7 +143,8 @@ public class DatabendSink
     }
 
     @Override
-    public DatabendSinkWriter createWriter(@NonNull SinkWriter.Context 
context) throws IOException {
+    public SinkWriter<SeaTunnelRow, DatabendSinkCommitterInfo, Void> 
createWriter(
+            @NonNull SinkWriter.Context context) throws IOException {
         try {
             Connection connection = 
DatabendUtil.createConnection(databendSinkConfig);
             connection.setAutoCommit(autoCommit);
@@ -132,6 +157,8 @@ public class DatabendSink
                     customSql,
                     database,
                     table,
+                    rawTableName,
+                    streamName,
                     batchSize,
                     executeTimeoutSec);
         } catch (SQLException e) {
@@ -142,11 +169,6 @@ public class DatabendSink
         }
     }
 
-    @Override
-    public Optional<CatalogTable> getWriteCatalogTable() {
-        return Optional.of(catalogTable);
-    }
-
     @Override
     public Optional<SaveModeHandler> getSaveModeHandler() {
         try {
@@ -220,4 +242,109 @@ public class DatabendSink
                 return "STRING"; // Default to STRING for complex types
         }
     }
+
+    @Override
+    public Optional<
+                    SinkAggregatedCommitter<
+                            DatabendSinkCommitterInfo, 
DatabendSinkAggregatedCommitInfo>>
+            createAggregatedCommitter() throws IOException {
+        DatabendSinkAggregatedCommitter committer =
+                new DatabendSinkAggregatedCommitter(
+                        databendSinkConfig, database, table, rawTableName, 
streamName);
+        committer.setCatalogTable(catalogTable);
+        return Optional.of(committer);
+    }
+
+    @Override
+    public Optional<Serializer<DatabendSinkCommitterInfo>> 
getCommitInfoSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    @Override
+    public Optional<Serializer<DatabendSinkAggregatedCommitInfo>>
+            getAggregatedCommitInfoSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    @Override
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
+
+        // Only initialize CDC infrastructure on coordinator node in BATCH mode
+        // jobContext.getJobMode() == JobMode.BATCH
+        if (databendSinkConfig.isCdcMode() && !isCdcInfrastructureInitialized) 
{
+            initializeCdcInfrastructure();
+            isCdcInfrastructureInitialized = true;
+        }
+    }
+
+    /** Initialize CDC infrastructure (raw table and stream) only once on the 
coordinator node */
+    private void initializeCdcInfrastructure() {
+        log.info("Initializing CDC infrastructure for database: {}, table: 
{}", database, table);
+        try (Connection connection = 
DatabendUtil.createConnection(databendSinkConfig)) {
+            // Generate unique names for raw table and stream
+            String rawTableName = this.rawTableName;
+            String streamName = this.streamName;
+
+            // Create raw table
+            createRawTable(connection, rawTableName);
+
+            // Create stream on raw table
+            createStream(connection, database, rawTableName, streamName);
+
+            log.info(
+                    "CDC infrastructure initialized - raw table: {}, stream: 
{}",
+                    rawTableName,
+                    streamName);
+        } catch (SQLException e) {
+            throw new DatabendConnectorException(
+                    DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
+                    "Failed to initialize CDC infrastructure: " + 
e.getMessage(),
+                    e);
+        }
+    }
+
+    private String getCurrentTimestamp() {
+        return java.time.LocalDateTime.now()
+                
.format(java.time.format.DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS"));
+    }
+
+    private void createRawTable(Connection connection, String rawTableName) 
throws SQLException {
+        String createTableSql =
+                String.format(
+                        "CREATE TABLE IF NOT EXISTS %s.%s ("
+                                + "  id VARCHAR(255),"
+                                + "  table_name VARCHAR(255),"
+                                + "  raw_data JSON,"
+                                + "  add_time TIMESTAMP,"
+                                + "  action STRING"
+                                + ")",
+                        database, rawTableName);
+
+        log.info("Creating raw table with SQL: {}", createTableSql);
+        try (java.sql.Statement stmt = connection.createStatement()) {
+            stmt.execute(createTableSql);
+            log.info("Raw table {} created successfully", rawTableName);
+        }
+    }
+
+    private void createStream(
+            Connection connection, String database, String rawTableName, 
String streamName)
+            throws SQLException {
+        String createStreamSql =
+                String.format(
+                        "CREATE STREAM IF NOT EXISTS %s.%s ON TABLE %s.%s",
+                        database, streamName, database, rawTableName);
+
+        log.info("Creating stream with SQL: {}", createStreamSql);
+        try (java.sql.Statement stmt = connection.createStatement()) {
+            stmt.execute(createStreamSql);
+            log.info("Stream {} created successfully", streamName);
+        }
+    }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.of(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitInfo.java
 
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitInfo.java
new file mode 100644
index 0000000000..f6d0518e1f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitInfo.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.databend.sink;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class DatabendSinkAggregatedCommitInfo implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final List<DatabendSinkCommitterInfo> commitInfos;
+    private final String rawTableName;
+    private final String streamName;
+
+    public DatabendSinkAggregatedCommitInfo(
+            List<DatabendSinkCommitterInfo> commitInfos, String rawTableName, 
String streamName) {
+        this.commitInfos = commitInfos;
+        this.rawTableName = rawTableName;
+        this.streamName = streamName;
+    }
+
+    public List<DatabendSinkCommitterInfo> getCommitInfos() {
+        return commitInfos;
+    }
+
+    public String getRawTableName() {
+        return rawTableName;
+    }
+
+    public String getStreamName() {
+        return streamName;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new 
StringBuilder("DatabendSinkAggregatedCommitInfo{");
+        sb.append("commitInfos=").append(commitInfos);
+        sb.append(", rawTableName='").append(rawTableName).append("'");
+        sb.append(", streamName='").append(streamName).append("'");
+        sb.append('}');
+        return sb.toString();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitter.java
new file mode 100644
index 0000000000..0f55132fa9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitter.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.databend.sink;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import 
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.databend.exception.DatabendConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.databend.exception.DatabendConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.databend.util.DatabendUtil;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Aggregated committer for Databend sink that handles CDC (Change Data 
Capture) operations. In CDC
+ * mode, this committer performs merge operations to apply changes to the 
target table. Merge
+ * operations are only performed when the accumulated record count reaches the 
configured batch
+ * size, which helps optimize performance by reducing the frequency of merge 
operations.
+ */
+@Slf4j
+public class DatabendSinkAggregatedCommitter
+        implements SinkAggregatedCommitter<
+                DatabendSinkCommitterInfo, DatabendSinkAggregatedCommitInfo> {
+
+    // Add a unique identifier for each instance
+    private static final AtomicLong INSTANCE_COUNTER = new AtomicLong(0);
+    private final long instanceId = INSTANCE_COUNTER.getAndIncrement();
+
+    private final DatabendSinkConfig databendSinkConfig;
+    private final String database;
+    private final String table;
+    private final String rawTableName;
+    private final String streamName;
+
+    private Connection connection;
+    private boolean isCdcMode;
+    // Store catalog table to access schema information
+    private CatalogTable catalogTable;
+
+    // Add a setter for catalogTable
+    public void setCatalogTable(CatalogTable catalogTable) {
+        this.catalogTable = catalogTable;
+    }
+
+    public DatabendSinkAggregatedCommitter(
+            DatabendSinkConfig databendSinkConfig,
+            String database,
+            String table,
+            String rawTableName,
+            String streamName) {
+        this.databendSinkConfig = databendSinkConfig;
+        this.database = database;
+        this.table = table;
+        this.rawTableName = rawTableName;
+        this.streamName = streamName;
+        this.isCdcMode = databendSinkConfig.isCdcMode();
+    }
+
+    @Override
+    public void init() {
+        try {
+            log.info("[Instance {}] Initializing 
DatabendSinkAggregatedCommitter", instanceId);
+            log.info("[Instance {}] DatabendSinkConfig: {}", instanceId, 
databendSinkConfig);
+            log.info("[Instance {}] Database: {}", instanceId, database);
+            log.info("[Instance {}] Table: {}", instanceId, table);
+            log.info("[Instance {}] Is CDC mode: {}", instanceId, isCdcMode);
+
+            this.connection = 
DatabendUtil.createConnection(databendSinkConfig);
+            log.info(
+                    "[Instance {}] Databend connection created successfully: 
{}",
+                    instanceId,
+                    connection);
+
+            // CDC infrastructure is now initialized in 
DatabendSink.setJobContext
+            // Just log that we're in CDC mode
+            if (isCdcMode) {
+                log.info("[Instance {}] Running in CDC mode", instanceId);
+            }
+        } catch (SQLException e) {
+            log.error(
+                    "[Instance {}] Failed to initialize 
DatabendSinkAggregatedCommitter: {}",
+                    instanceId,
+                    e.getMessage(),
+                    e);
+            throw new DatabendConnectorException(
+                    DatabendConnectorErrorCode.CONNECT_FAILED,
+                    "Failed to initialize DatabendSinkAggregatedCommitter: " + 
e.getMessage(),
+                    e);
+        } catch (Exception e) {
+            log.error(
+                    "[Instance {}] Unexpected error during initialization: {}",
+                    instanceId,
+                    e.getMessage(),
+                    e);
+            throw e;
+        }
+    }
+
+    private String getCurrentTimestamp() {
+        return 
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS"));
+    }
+
+    @Override
+    public List<DatabendSinkAggregatedCommitInfo> commit(
+            List<DatabendSinkAggregatedCommitInfo> aggregatedCommitInfos) 
throws IOException {
+        // Perform final merge operation in CDC mode only when necessary
+        if (isCdcMode) {
+            performMerge(aggregatedCommitInfos);
+        }
+
+        // Return empty list as there's no need to retry
+        return new ArrayList<>();
+    }
+
+    private void performMerge(List<DatabendSinkAggregatedCommitInfo> 
aggregatedCommitInfos) {
+        // Merge all the data from raw table to target table
+        String mergeSql = generateMergeSql();
+        log.info("[Instance {}] Executing MERGE INTO statement: {}", 
instanceId, mergeSql);
+
+        try (Statement stmt = connection.createStatement()) {
+            stmt.execute(mergeSql);
+            log.info("[Instance {}] Merge operation completed successfully", 
instanceId);
+        } catch (SQLException e) {
+            log.error(
+                    "[Instance {}] Failed to execute merge operation: {}",
+                    instanceId,
+                    e.getMessage(),
+                    e);
+            throw new DatabendConnectorException(
+                    DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
+                    "Failed to execute merge operation: " + e.getMessage(),
+                    e);
+        }
+    }
+
+    private String generateMergeSql() {
+        StringBuilder sql = new StringBuilder();
+        sql.append(String.format("MERGE INTO %s.%s a ", database, table));
+        sql.append("USING (SELECT ");
+
+        // Add all columns from raw_data
+        if (catalogTable != null && catalogTable.getSeaTunnelRowType() != 
null) {
+            String[] fieldNames = 
catalogTable.getSeaTunnelRowType().getFieldNames();
+            for (int i = 0; i < fieldNames.length; i++) {
+                if (i > 0) {
+                    sql.append(", ");
+                }
+                sql.append(String.format("raw_data:%s as %s", fieldNames[i], 
fieldNames[i]));
+            }
+        } else {
+            // Fallback to generic raw_data if schema is not available
+            sql.append("raw_data");
+        }
+
+        sql.append(", action FROM ")
+                .append(database)
+                .append(".")
+                // In the new approach, we don't have streamName in this class
+                // The stream name should be passed from DatabendSink or 
retrieved differently
+                .append(streamName) // Placeholder, will be replaced properly
+                .append(" QUALIFY ROW_NUMBER() OVER(PARTITION BY ")
+                .append(databendSinkConfig.getConflictKey())
+                .append(" ORDER BY add_time DESC) = 1) b ");
+
+        sql.append("ON a.")
+                .append(databendSinkConfig.getConflictKey())
+                .append(" = b.")
+                .append(databendSinkConfig.getConflictKey())
+                .append(" ");
+
+        sql.append("WHEN MATCHED AND b.action = 'update' THEN UPDATE * ");
+
+        if (databendSinkConfig.isEnableDelete()) {
+            sql.append("WHEN MATCHED AND b.action = 'delete' THEN DELETE ");
+        }
+
+        sql.append("WHEN NOT MATCHED AND b.action!='delete' THEN INSERT *");
+
+        return sql.toString();
+    }
+
+    @Override
+    public DatabendSinkAggregatedCommitInfo 
combine(List<DatabendSinkCommitterInfo> commitInfos) {
+        // Just combine all commit infos into one aggregated commit info
+        // In the new approach, rawTableName and streamName are not needed here
+        return new DatabendSinkAggregatedCommitInfo(commitInfos, null, null);
+    }
+
+    @Override
+    public void abort(List<DatabendSinkAggregatedCommitInfo> 
aggregatedCommitInfos)
+            throws IOException {
+        // In case of abort, we might want to clean up the raw table and stream
+        log.info("[Instance {}] Aborting Databend sink operations", 
instanceId);
+        try {
+            if (isCdcMode && connection != null && !connection.isClosed()) {
+                // In the new approach, raw table and stream names are not 
stored in this class
+                // Cleanup would need to be handled differently or at the 
DatabendSink level
+                log.info(
+                        "[Instance {}] CDC mode abort - cleanup handled at 
DatabendSink level",
+                        instanceId);
+            }
+        } catch (Exception e) {
+            log.warn(
+                    "[Instance {}] Failed to clean up during abort: {}",
+                    instanceId,
+                    e.getMessage(),
+                    e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            if (connection != null && !connection.isClosed()) {
+                connection.close();
+            }
+        } catch (SQLException e) {
+            throw new DatabendConnectorException(
+                    DatabendConnectorErrorCode.CONNECT_FAILED,
+                    "[Instance {}] Failed to close connection in 
DatabendSinkAggregatedCommitter: "
+                            + e.getMessage(),
+                    e);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkCommitterInfo.java
 
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkCommitterInfo.java
new file mode 100644
index 0000000000..e5c88643ff
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkCommitterInfo.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.databend.sink;
+
+import java.io.Serializable;
+
+public class DatabendSinkCommitterInfo implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    // CDC related fields
+    private String rawTableName;
+    private String streamName;
+
+    public DatabendSinkCommitterInfo() {
+        // Default constructor
+    }
+
+    public DatabendSinkCommitterInfo(String rawTableName, String streamName) {
+        this.rawTableName = rawTableName;
+        this.streamName = streamName;
+    }
+
+    public String getRawTableName() {
+        return rawTableName;
+    }
+
+    public void setRawTableName(String rawTableName) {
+        this.rawTableName = rawTableName;
+    }
+
+    public String getStreamName() {
+        return streamName;
+    }
+
+    public void setStreamName(String streamName) {
+        this.streamName = streamName;
+    }
+
+    @Override
+    public String toString() {
+        return "DatabendSinkCommitterInfo{"
+                + "rawTableName='"
+                + rawTableName
+                + '\''
+                + ", streamName='"
+                + streamName
+                + '\''
+                + '}';
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriter.java
 
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriter.java
index f18a4006b3..dc2da3133e 100644
--- 
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriter.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.databend.sink;
 
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -25,6 +28,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
 import 
org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventDispatcher;
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -36,13 +40,17 @@ import 
org.apache.seatunnel.connectors.seatunnel.databend.schema.SchemaChangeMan
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -52,7 +60,8 @@ import java.util.stream.Collectors;
 
 @Slf4j
 public class DatabendSinkWriter
-        implements SinkWriter<SeaTunnelRow, Void, Void>, 
SupportSchemaEvolutionSinkWriter {
+        implements SinkWriter<SeaTunnelRow, DatabendSinkCommitterInfo, Void>,
+                SupportSchemaEvolutionSinkWriter {
 
     private final Connection connection;
     private final Context context;
@@ -69,6 +78,17 @@ public class DatabendSinkWriter
     private int batchCount = 0;
     private DatabendSinkConfig databendSinkConfig;
 
+    // CDC related fields
+    // Note: In CDC mode, rawTableName and streamName are set by 
DatabendSinkAggregatedCommitter
+    // The writer receives these values through the prepareCommit process
+    private boolean isCdcMode = false;
+    private String rawTableName;
+    private String streamName;
+    private String targetTableName;
+    private PreparedStatement cdcPreparedStatement;
+    private String conflictKey;
+    private boolean enableDelete;
+
     public DatabendSinkWriter(
             Context context,
             Connection connection,
@@ -77,6 +97,8 @@ public class DatabendSinkWriter
             String customSql,
             String database,
             String table,
+            String rawTableName,
+            String streamName,
             int batchSize,
             int executeTimeoutSec) {
         this.context = context;
@@ -88,11 +110,25 @@ public class DatabendSinkWriter
         this.tableSchema = catalogTable.getTableSchema();
         this.sinkTablePath = TablePath.of(database, table);
 
+        // CDC mode check
+        this.isCdcMode = databendSinkConfig.isCdcMode();
+        if (databendSinkConfig.isCdcMode()) {
+            this.rawTableName = rawTableName;
+            this.streamName = streamName;
+            log.info("DatabendSinkWriter initialized in CDC mode with raw 
table: {}", rawTableName);
+        } else {
+            log.info("DatabendSinkWriter initialized in traditional mode");
+        }
+        this.conflictKey = databendSinkConfig.getConflictKey();
+        this.enableDelete = databendSinkConfig.isEnableDelete();
+        this.targetTableName = table;
+
         log.info("DatabendSinkWriter constructor - catalogTable: {}", 
catalogTable);
         log.info("DatabendSinkWriter constructor - tableSchema: {}", 
tableSchema);
         log.info(
                 "DatabendSinkWriter constructor - rowType: {}", 
catalogTable.getSeaTunnelRowType());
         log.info("DatabendSinkWriter constructor - target table path: {}", 
sinkTablePath);
+        log.info("DatabendSinkWriter constructor - CDC mode: {}", isCdcMode);
 
         // if custom SQL is provided, use it directly
         if (customSql != null && !customSql.isEmpty()) {
@@ -110,46 +146,131 @@ public class DatabendSinkWriter
                         e);
             }
         } else {
-            // use the catalog table schema to create the target table
-            SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
-            if (rowType == null || rowType.getFieldNames().length == 0) {
-                throw new DatabendConnectorException(
-                        DatabendConnectorErrorCode.SCHEMA_NOT_FOUND,
-                        "Source table schema is empty or null");
-            }
-
             try {
-                if (!tableExists(database, table)) {
+                if (isCdcMode) {
+                    // In CDC mode, we don't create tables here, it's done in 
AggregatedCommitter
+                    // We'll get the raw table and stream names from the 
committer via prepareCommit
                     log.info(
-                            "Target table {}.{} does not exist, creating with 
source schema",
-                            database,
-                            table);
-                    createTable(database, table, rowType);
+                            "CDC mode enabled, table creation will be handled 
by AggregatedCommitter");
                 } else {
-                    log.info("Target table {}.{} exists, verifying schema", 
database, table);
-                    verifyTableSchema(database, table, rowType);
+                    // Traditional mode
+                    initTraditionalMode(database, table);
                 }
             } catch (SQLException e) {
                 throw new DatabendConnectorException(
                         DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
-                        "Failed to verify/create target table: " + 
e.getMessage(),
+                        "Failed to initialize sink writer: " + e.getMessage(),
                         e);
             }
+        }
+    }
 
-            this.insertSql = generateInsertSql(database, table, rowType);
-            log.info("Generated insert SQL: {}", insertSql);
-            try {
-                this.schemaChangeManager = new 
SchemaChangeManager(databendSinkConfig);
-                this.preparedStatement = 
connection.prepareStatement(insertSql);
-                this.preparedStatement.setQueryTimeout(executeTimeoutSec);
-                log.info("PreparedStatement created successfully");
-            } catch (SQLException e) {
-                throw new DatabendConnectorException(
-                        DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
-                        "Failed to prepare statement: " + e.getMessage(),
-                        e);
-            }
+    private String getCurrentTimestamp() {
+        return 
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"));
+    }
+
+    private void initializeCdcPreparedStatement() throws SQLException {
+        log.info("Initializing CDC PreparedStatement");
+
+        // In CDC mode, the rawTableName should be set by the 
AggregatedCommitter
+        // If it's not set yet, we can't proceed with CDC operations
+        if (rawTableName == null || rawTableName.isEmpty()) {
+            throw new DatabendConnectorException(
+                    DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
+                    "Raw table name not set by AggregatedCommitter. Cannot 
initialize CDC PreparedStatement.");
         }
+
+        // Generate insert SQL for raw table
+        String insertRawSql = 
generateInsertRawSql(sinkTablePath.getDatabaseName());
+
+        // Create the PreparedStatement
+        this.cdcPreparedStatement = connection.prepareStatement(insertRawSql);
+        this.cdcPreparedStatement.setQueryTimeout(executeTimeoutSec);
+
+        log.info("CDC PreparedStatement created successfully with SQL: {}", 
insertRawSql);
+    }
+
+    private void initTraditionalMode(String database, String table) throws 
SQLException {
+        // use the catalog table schema to create the target table
+        SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
+        if (rowType == null || rowType.getFieldNames().length == 0) {
+            throw new DatabendConnectorException(
+                    DatabendConnectorErrorCode.SCHEMA_NOT_FOUND,
+                    "Source table schema is empty or null");
+        }
+
+        this.insertSql = generateInsertSql(database, table, rowType);
+        log.info("Generated insert SQL: {}", insertSql);
+        try {
+            this.schemaChangeManager = new 
SchemaChangeManager(databendSinkConfig);
+            this.preparedStatement = connection.prepareStatement(insertSql);
+            this.preparedStatement.setQueryTimeout(executeTimeoutSec);
+            log.info("PreparedStatement created successfully");
+        } catch (SQLException e) {
+            throw new DatabendConnectorException(
+                    DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
+                    "Failed to prepare statement: " + e.getMessage(),
+                    e);
+        }
+    }
+
+    private String generateInsertRawSql(String database) {
+        return String.format(
+                "INSERT INTO %s.%s (id, table_name, raw_data, add_time, 
action) VALUES (?, ?, ?, ?, ?)",
+                database, rawTableName);
+    }
+
+    private void performMerge() {
+        if (batchCount <= 0) {
+            log.debug("No data to merge, skipping");
+            return;
+        }
+
+        String mergeSql = generateMergeSql();
+        log.info("Executing MERGE INTO statement: {}", mergeSql);
+
+        try (Statement stmt = connection.createStatement()) {
+            stmt.execute(mergeSql);
+            log.info("Merge operation completed successfully");
+            batchCount = 0; // Reset batch count after successful merge
+        } catch (SQLException e) {
+            log.error("Failed to execute merge operation: {}", e.getMessage(), 
e);
+        }
+    }
+
+    String generateMergeSql() {
+        StringBuilder sql = new StringBuilder();
+        sql.append(
+                String.format(
+                        "MERGE INTO %s.%s a ", 
sinkTablePath.getDatabaseName(), targetTableName));
+        sql.append(String.format("USING (SELECT "));
+
+        // Add all columns from raw_data
+        String[] fieldNames = 
catalogTable.getSeaTunnelRowType().getFieldNames();
+        for (int i = 0; i < fieldNames.length; i++) {
+            if (i > 0) sql.append(", ");
+            sql.append(String.format("raw_data:%s as %s", fieldNames[i], 
fieldNames[i]));
+        }
+
+        sql.append(", action FROM ")
+                .append(sinkTablePath.getDatabaseName())
+                .append(".")
+                .append(streamName)
+                .append(" QUALIFY ROW_NUMBER() OVER(PARTITION BY ")
+                .append(conflictKey)
+                .append(" ORDER BY add_time DESC) = 1) b ");
+
+        sql.append("ON a.").append(conflictKey).append(" = 
b.").append(conflictKey).append(" ");
+
+        sql.append("WHEN MATCHED AND b.action = 'update' THEN UPDATE * ");
+
+        if (enableDelete) {
+            sql.append("WHEN MATCHED AND b.action = 'delete' THEN DELETE ");
+        }
+
+        sql.append("WHEN NOT MATCHED AND b.action!='delete' THEN INSERT *");
+
+        return sql.toString();
     }
 
     @Override
@@ -217,26 +338,12 @@ public class DatabendSinkWriter
                 return;
             }
 
-            if (preparedStatement == null) {
-                log.info("PreparedStatement is null, initializing...");
-                initializePreparedStatement(row);
-                log.info("PreparedStatement initialized successfully");
-            }
-
-            boolean allFieldsNull = true;
-            for (Object field : row.getFields()) {
-                if (field != null) {
-                    allFieldsNull = false;
-                    break;
-                }
-            }
-
-            if (allFieldsNull) {
-                log.warn("All fields in row are null, skipping");
-                return;
+            if (isCdcMode) {
+                processCdcRow(row);
+            } else {
+                processTraditionalRow(row);
             }
 
-            processRow(row);
             batchCount++;
             log.info("Batch count after adding row: {}", batchCount);
 
@@ -247,7 +354,7 @@ public class DatabendSinkWriter
             }
         } catch (Exception e) {
             log.error("Failed to write row: {}", row, e);
-            // tru to execute the remaining batch if any error occurs
+            // try to execute the remaining batch if any error occurs
             try {
                 if (batchCount > 0) {
                     log.info("Attempting to execute remaining batch after 
error");
@@ -263,6 +370,160 @@ public class DatabendSinkWriter
         }
     }
 
+    private void processCdcRow(SeaTunnelRow row) throws SQLException {
+        log.info("Processing CDC row with kind: {}", row.getRowKind());
+
+        String action = mapRowKindToAction(row.getRowKind());
+        if ("update_before".equals(action)) {
+            log.debug("UPDATE_BEFORE operation detected, skipping row");
+            return;
+        }
+
+        if ("delete".equals(action) && !enableDelete) {
+            log.debug("DELETE operation not allowed, skipping row");
+            return;
+        }
+
+        // Ensure cdcPreparedStatement is initialized
+        if (cdcPreparedStatement == null) {
+            log.info("CDC PreparedStatement is null, initializing...");
+            initializeCdcPreparedStatement();
+
+            // If it's still null, we need to throw an exception as we can't 
proceed
+            if (cdcPreparedStatement == null) {
+                throw new DatabendConnectorException(
+                        DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
+                        "Failed to initialize CDC PreparedStatement. Raw table 
name might not be set by AggregatedCommitter.");
+            }
+
+            log.info("CDC PreparedStatement initialized successfully");
+        }
+
+        // Get conflict key value
+        String conflictKeyValue = getConflictKeyValue(row);
+
+        // Convert row to JSON
+        String jsonData = convertRowToJson(row);
+
+        cdcPreparedStatement.setString(1, conflictKeyValue);
+        cdcPreparedStatement.setString(2, targetTableName);
+        cdcPreparedStatement.setString(3, jsonData);
+        cdcPreparedStatement.setTimestamp(4, 
java.sql.Timestamp.valueOf(LocalDateTime.now()));
+        cdcPreparedStatement.setString(5, action);
+
+        cdcPreparedStatement.addBatch();
+    }
+
+    private void processTraditionalRow(SeaTunnelRow row) throws SQLException {
+        // Ensure preparedStatement is initialized
+        if (preparedStatement == null) {
+            log.info("PreparedStatement is null, initializing...");
+            initializePreparedStatement(row);
+
+            // If it's still null, we need to throw an exception as we can't 
proceed
+            if (preparedStatement == null) {
+                throw new DatabendConnectorException(
+                        DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
+                        "Failed to initialize PreparedStatement.");
+            }
+
+            log.info("PreparedStatement initialized successfully");
+        }
+
+        boolean allFieldsNull = true;
+        for (Object field : row.getFields()) {
+            if (field != null) {
+                allFieldsNull = false;
+                break;
+            }
+        }
+
+        if (allFieldsNull) {
+            log.warn("All fields in row are null, skipping");
+            return;
+        }
+
+        processRow(row);
+    }
+
+    private String mapRowKindToAction(RowKind rowKind) {
+        switch (rowKind) {
+            case INSERT:
+                return "insert";
+            case UPDATE_AFTER:
+                return "update";
+            case DELETE:
+                return "delete";
+        }
+        return "update_before";
+    }
+
+    /**
+     * Get the value of the conflict key field from the row. This value will 
be used as the ID in
+     * the raw table.
+     */
+    private String getConflictKeyValue(SeaTunnelRow row) {
+        String[] fieldNames = 
catalogTable.getSeaTunnelRowType().getFieldNames();
+        int index = Arrays.asList(fieldNames).indexOf(conflictKey);
+
+        if (index >= 0 && index < row.getFields().length) {
+            Object value = row.getField(index);
+            if (value != null) {
+                return value.toString();
+            }
+        }
+
+        // This should not happen in a proper CDC setup where conflict key 
values are always present
+        // If we reach here, it indicates a data issue
+        throw new IllegalArgumentException(
+                "Conflict key field '" + conflictKey + "' value is null or not 
found in row");
+    }
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private String convertRowToJson(SeaTunnelRow row) {
+        try {
+            ObjectNode jsonNode = objectMapper.createObjectNode();
+            String[] fieldNames = 
catalogTable.getSeaTunnelRowType().getFieldNames();
+            Object[] fields = row.getFields();
+
+            for (int i = 0; i < fieldNames.length; i++) {
+                String fieldName = fieldNames[i];
+                Object value = fields[i];
+
+                if (value == null) {
+                    jsonNode.putNull(fieldName);
+                } else if (value instanceof String) {
+                    jsonNode.put(fieldName, (String) value);
+                } else if (value instanceof Integer) {
+                    jsonNode.put(fieldName, (Integer) value);
+                } else if (value instanceof Long) {
+                    jsonNode.put(fieldName, (Long) value);
+                } else if (value instanceof Float) {
+                    jsonNode.put(fieldName, (Float) value);
+                } else if (value instanceof Double) {
+                    jsonNode.put(fieldName, (Double) value);
+                } else if (value instanceof Boolean) {
+                    jsonNode.put(fieldName, (Boolean) value);
+                } else if (value instanceof BigDecimal) {
+                    jsonNode.put(fieldName, (BigDecimal) value);
+                } else if (value instanceof java.sql.Timestamp) {
+                    jsonNode.put(fieldName, value.toString());
+                } else if (value instanceof java.sql.Date) {
+                    jsonNode.put(fieldName, value.toString());
+                } else if (value instanceof byte[]) {
+                    jsonNode.put(fieldName, 
Base64.getEncoder().encodeToString((byte[]) value));
+                } else {
+                    jsonNode.put(fieldName, value.toString());
+                }
+            }
+
+            return objectMapper.writeValueAsString(jsonNode);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to convert row to JSON", e);
+        }
+    }
+
     private void initializePreparedStatement(SeaTunnelRow row) throws 
SQLException {
         log.info("Initializing PreparedStatement based on row data");
 
@@ -423,16 +684,65 @@ public class DatabendSinkWriter
         log.info("Added row to batch, current batch count: {}", batchCount + 
1);
     }
 
+    private void verifyRawTableData(String rawTableName, String database) 
throws SQLException {
+        try (Statement stmt = connection.createStatement();
+                ResultSet rs =
+                        stmt.executeQuery(
+                                "SELECT COUNT(*), COUNT(DISTINCT raw_data:id) 
FROM "
+                                        + database
+                                        + "."
+                                        + rawTableName)) {
+            if (rs.next()) {
+                log.info(
+                        "Raw table sjh {} has {} total rows, {} unique ids",
+                        rawTableName,
+                        rs.getInt(1),
+                        rs.getInt(2));
+            }
+        }
+
+        try (Statement stmt = connection.createStatement();
+                ResultSet dataRs =
+                        stmt.executeQuery(
+                                "SELECT raw_data, action, add_time FROM "
+                                        + database
+                                        + "."
+                                        + rawTableName
+                                        + " ORDER BY add_time"); ) {
+            while (dataRs.next()) {
+                log.info(
+                        "Raw data : {}, action: {}, time: {}",
+                        dataRs.getString(1),
+                        dataRs.getString(2),
+                        dataRs.getTimestamp(3));
+            }
+        }
+    }
+
     private void executeBatch() {
         if (batchCount > 0) {
             try {
                 log.info("Executing batch of {} records", batchCount);
-                int[] results = preparedStatement.executeBatch();
-                int totalAffected = 0;
-                for (int result : results) {
-                    totalAffected += result;
+                if (isCdcMode) {
+                    int[] results = cdcPreparedStatement.executeBatch();
+                    int totalAffected = 0;
+                    for (int result : results) {
+                        totalAffected += result;
+                    }
+                    log.info(
+                            "CDC batch executed successfully, total affected 
rows: {}",
+                            totalAffected);
+                    verifyRawTableData(rawTableName, 
sinkTablePath.getDatabaseName());
+                } else {
+                    int[] results = preparedStatement.executeBatch();
+                    int totalAffected = 0;
+                    for (int result : results) {
+                        totalAffected += result;
+                    }
+                    log.info(
+                            "Traditional batch executed successfully, total 
affected rows: {}",
+                            totalAffected);
                 }
-                log.info("Batch executed successfully, total affected rows: 
{}", totalAffected);
                 batchCount = 0;
             } catch (SQLException e) {
                 log.error("Failed to execute batch", e);
@@ -447,11 +757,13 @@ public class DatabendSinkWriter
     }
 
     @Override
-    public Optional<Void> prepareCommit() throws IOException {
+    public Optional<DatabendSinkCommitterInfo> prepareCommit() throws 
IOException {
         log.info("Preparing to commit, executing remaining batch");
         executeBatch();
         log.info("Commit prepared successfully");
-        return Optional.empty();
+        // In the new approach, rawTableName and streamName are initialized in 
DatabendSink
+        // We pass null values as they're not needed in the committer info
+        return Optional.of(new DatabendSinkCommitterInfo(null, null));
     }
 
     @Override
@@ -490,12 +802,30 @@ public class DatabendSinkWriter
     public void close() throws IOException {
         log.info("Closing DatabendSinkWriter");
         try {
-            if (preparedStatement != null) {
+            // Execute final batch before closing
+            if (batchCount > 0) {
                 log.info("Executing final batch before closing");
                 executeBatch();
+            }
+
+            // Perform final merge in CDC mode
+            if (isCdcMode) {
+                log.info("Performing final merge before closing");
+                performMerge();
+            }
+
+            // Close prepared statements
+            if (preparedStatement != null) {
                 log.info("Closing PreparedStatement");
                 preparedStatement.close();
             }
+
+            if (cdcPreparedStatement != null) {
+                log.info("Closing CDC PreparedStatement");
+                cdcPreparedStatement.close();
+            }
+
+            // Close connection
             if (connection != null) {
                 if (!connection.getAutoCommit()) {
                     log.info("Committing transaction");
@@ -504,6 +834,7 @@ public class DatabendSinkWriter
                 log.info("Closing connection");
                 connection.close();
             }
+
             log.info("DatabendSinkWriter closed successfully");
         } catch (SQLException e) {
             log.error("Failed to close DatabendSinkWriter", e);
@@ -634,4 +965,29 @@ public class DatabendSinkWriter
                 return "VARCHAR"; // default use VARCHAR
         }
     }
+
+    // Package-private methods for testing
+    String getConflictKey() {
+        return conflictKey;
+    }
+
+    TablePath getSinkTablePath() {
+        return sinkTablePath;
+    }
+
+    String getRawTableName() {
+        return rawTableName;
+    }
+
+    String getStreamName() {
+        return streamName;
+    }
+
+    boolean isEnableDelete() {
+        return enableDelete;
+    }
+
+    CatalogTable getCatalogTable() {
+        return catalogTable;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-databend/src/test/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriterTest.java
 
b/seatunnel-connectors-v2/connector-databend/src/test/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriterTest.java
new file mode 100644
index 0000000000..9fa07b5c62
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-databend/src/test/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriterTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.databend.sink;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class DatabendSinkWriterTest {
+
+    @Test
+    public void testGenerateMergeSql() throws Exception {
+        // Create a mock DatabendSinkWriter
+        DatabendSinkWriter sinkWriter = mock(DatabendSinkWriter.class);
+
+        // Set up the real method to test
+        when(sinkWriter.generateMergeSql()).thenCallRealMethod();
+
+        // Use reflection to set private fields
+        setPrivateField(sinkWriter, "conflictKey", "id");
+        setPrivateField(sinkWriter, "sinkTablePath", TablePath.of("test_db", 
"target_table"));
+        setPrivateField(sinkWriter, "streamName", "cdc_stream");
+        setPrivateField(sinkWriter, "enableDelete", true);
+        setPrivateField(sinkWriter, "targetTableName", "target_table");
+
+        // Mock catalogTable
+        org.apache.seatunnel.api.table.catalog.CatalogTable catalogTable =
+                
mock(org.apache.seatunnel.api.table.catalog.CatalogTable.class);
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"id", "name", "score"},
+                        new 
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+                            BasicType.INT_TYPE, BasicType.STRING_TYPE, 
BasicType.DOUBLE_TYPE
+                        });
+        when(catalogTable.getSeaTunnelRowType()).thenReturn(rowType);
+        setPrivateField(sinkWriter, "catalogTable", catalogTable);
+
+        // Call the method
+        String mergeSql = sinkWriter.generateMergeSql();
+
+        // Expected SQL
+        String expectedSql =
+                "MERGE INTO test_db.target_table a "
+                        + "USING (SELECT raw_data:id as id, raw_data:name as 
name, raw_data:score as score, action "
+                        + "FROM test_db.cdc_stream "
+                        + "QUALIFY ROW_NUMBER() OVER(PARTITION BY id ORDER BY 
add_time DESC) = 1) b "
+                        + "ON a.id = b.id "
+                        + "WHEN MATCHED AND b.action = 'update' THEN UPDATE * "
+                        + "WHEN MATCHED AND b.action = 'delete' THEN DELETE "
+                        + "WHEN NOT MATCHED AND b.action!='delete' THEN INSERT 
*";
+
+        assertEquals(expectedSql, mergeSql);
+    }
+
+    @Test
+    public void testGenerateMergeSqlWithoutDelete() throws Exception {
+        // Create a mock DatabendSinkWriter
+        DatabendSinkWriter sinkWriter = mock(DatabendSinkWriter.class);
+
+        // Set up the real method to test
+        when(sinkWriter.generateMergeSql()).thenCallRealMethod();
+
+        // Use reflection to set private fields
+        setPrivateField(sinkWriter, "conflictKey", "id");
+        setPrivateField(sinkWriter, "sinkTablePath", TablePath.of("test_db", 
"target_table"));
+        setPrivateField(sinkWriter, "streamName", "cdc_stream");
+        setPrivateField(sinkWriter, "enableDelete", false);
+        setPrivateField(sinkWriter, "targetTableName", "target_table");
+
+        // Mock catalogTable
+        org.apache.seatunnel.api.table.catalog.CatalogTable catalogTable =
+                
mock(org.apache.seatunnel.api.table.catalog.CatalogTable.class);
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"id", "name", "score"},
+                        new 
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+                            BasicType.INT_TYPE, BasicType.STRING_TYPE, 
BasicType.DOUBLE_TYPE
+                        });
+        when(catalogTable.getSeaTunnelRowType()).thenReturn(rowType);
+        setPrivateField(sinkWriter, "catalogTable", catalogTable);
+
+        // Call the method
+        String mergeSql = sinkWriter.generateMergeSql();
+
+        // Expected SQL without DELETE clause
+        String expectedSql =
+                "MERGE INTO test_db.target_table a "
+                        + "USING (SELECT raw_data:id as id, raw_data:name as 
name, raw_data:score as score, action "
+                        + "FROM test_db.cdc_stream "
+                        + "QUALIFY ROW_NUMBER() OVER(PARTITION BY id ORDER BY 
add_time DESC) = 1) b "
+                        + "ON a.id = b.id "
+                        + "WHEN MATCHED AND b.action = 'update' THEN UPDATE * "
+                        + "WHEN NOT MATCHED AND b.action!='delete' THEN INSERT 
*";
+
+        assertEquals(expectedSql, mergeSql);
+    }
+
+    @Test
+    public void testGetConflictKeyValue() throws Exception {
+        // Create a mock DatabendSinkWriter
+        DatabendSinkWriter sinkWriter = mock(DatabendSinkWriter.class);
+
+        // Get the method to test
+        Method method =
+                DatabendSinkWriter.class.getDeclaredMethod(
+                        "getConflictKeyValue", SeaTunnelRow.class);
+        method.setAccessible(true);
+
+        // Mock catalogTable
+        org.apache.seatunnel.api.table.catalog.CatalogTable catalogTable =
+                
mock(org.apache.seatunnel.api.table.catalog.CatalogTable.class);
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"id", "name", "score"},
+                        new 
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+                            BasicType.INT_TYPE, BasicType.STRING_TYPE, 
BasicType.DOUBLE_TYPE
+                        });
+        when(catalogTable.getSeaTunnelRowType()).thenReturn(rowType);
+        setPrivateField(sinkWriter, "catalogTable", catalogTable);
+
+        // Create test row
+        Object[] fields = {1, "test", 95.5};
+        SeaTunnelRow row = new SeaTunnelRow(fields);
+
+        // Set conflict key
+        setPrivateField(sinkWriter, "conflictKey", "id");
+
+        // Call the method
+        String conflictKeyValue = (String) method.invoke(sinkWriter, row);
+
+        // Expected value - should be 1
+        assertEquals("1", conflictKeyValue);
+    }
+
+    @Test
+    public void testGetConflictKeyValueWithNullValue() throws Exception {
+        // Create a mock DatabendSinkWriter
+        DatabendSinkWriter sinkWriter = mock(DatabendSinkWriter.class);
+
+        // Get the method to test
+        Method method =
+                DatabendSinkWriter.class.getDeclaredMethod(
+                        "getConflictKeyValue", SeaTunnelRow.class);
+        method.setAccessible(true);
+
+        // Mock catalogTable
+        org.apache.seatunnel.api.table.catalog.CatalogTable catalogTable =
+                
mock(org.apache.seatunnel.api.table.catalog.CatalogTable.class);
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"id", "name", "score"},
+                        new 
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+                            BasicType.INT_TYPE, BasicType.STRING_TYPE, 
BasicType.DOUBLE_TYPE
+                        });
+        when(catalogTable.getSeaTunnelRowType()).thenReturn(rowType);
+        setPrivateField(sinkWriter, "catalogTable", catalogTable);
+
+        // Create test row with null conflict key value
+        Object[] fields = {null, "test", 95.5};
+        SeaTunnelRow row = new SeaTunnelRow(fields);
+
+        // Set conflict key
+        setPrivateField(sinkWriter, "conflictKey", "id");
+
+        // Call the method - should throw IllegalArgumentException wrapped in
+        // InvocationTargetException
+        InvocationTargetException exception =
+                assertThrows(
+                        InvocationTargetException.class,
+                        () -> {
+                            method.invoke(sinkWriter, row);
+                        });
+
+        // Verify the cause is IllegalArgumentException
+        assertEquals(IllegalArgumentException.class, 
exception.getCause().getClass());
+    }
+
+    // Helper method to set private fields using reflection
+    private void setPrivateField(Object target, String fieldName, Object 
value) throws Exception {
+        Field field = target.getClass().getDeclaredField(fieldName);
+        field.setAccessible(true);
+        field.set(target, value);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-databend-e2e/src/test/java/org/apache/seatunnel/e2e/connector/databend/DatabendCDCSinkIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-databend-e2e/src/test/java/org/apache/seatunnel/e2e/connector/databend/DatabendCDCSinkIT.java
new file mode 100644
index 0000000000..c837dbeb61
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-databend-e2e/src/test/java/org/apache/seatunnel/e2e/connector/databend/DatabendCDCSinkIT.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.databend;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.databend.DatabendContainer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+public class DatabendCDCSinkIT extends TestSuiteBase implements TestResource {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DatabendCDCSinkIT.class);
+    private static final String DATABEND_DOCKER_IMAGE = 
"datafuselabs/databend:nightly";
+    private static final String DATABEND_CONTAINER_HOST = "databend";
+    private static final int PORT = 8000;
+    private static final int LOCAL_PORT = 8000;
+    private static final String DATABASE = "default";
+    private static final String SINK_TABLE = "sink_table";
+    private DatabendContainer container;
+    private GenericContainer<?> minioContainer;
+    private Connection connection;
+
+    @TestTemplate
+    public void testDatabendSinkCDC(TestContainer container) throws Exception {
+        // Run the CDC test job
+        Container.ExecResult execResult =
+                container.executeJob("/databend/fake_to_databend_cdc.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+
+        // Wait for the merge operation to complete
+        // Increased wait time to ensure merge operations finish
+        Thread.sleep(10000);
+
+        // Verify the sink results
+        try (Statement statement = connection.createStatement()) {
+
+            // First check how many records we have
+            try (ResultSet countRs =
+                    statement.executeQuery("SELECT COUNT(*) as count FROM 
sink_table")) {
+                if (countRs.next()) {
+                    int count = countRs.getInt("count");
+                    LOG.info("Found {} records in sink_table", count);
+                }
+            }
+
+            // Then get all records for debugging
+            try (ResultSet allRs = statement.executeQuery("SELECT * FROM 
sink_table ORDER BY id")) {
+                LOG.info("All records in sink_table:");
+                while (allRs.next()) {
+                    LOG.info(
+                            "Record: id={}, name={}, position={}, age={}, 
score={}",
+                            allRs.getInt("id"),
+                            allRs.getString("name"),
+                            allRs.getString("position"),
+                            allRs.getInt("age"),
+                            allRs.getDouble("score"));
+                }
+            }
+
+            // Finally check with expected results
+            try (ResultSet resultSet =
+                    statement.executeQuery("SELECT * FROM sink_table ORDER BY 
id")) {
+
+                List<List<Object>> expectedRecords =
+                        Arrays.asList(
+                                Arrays.asList(1, "Alice", "Engineer", 30, 
95.5),
+                                Arrays.asList(3, "Charlie", "Engineer", 35, 
92.5),
+                                Arrays.asList(4, "David", "Designer", 28, 
88.0));
+
+                List<List<Object>> actualRecords = new ArrayList<>();
+
+                while (resultSet.next()) {
+                    List<Object> row = new ArrayList<>();
+                    row.add(resultSet.getInt("id"));
+                    row.add(resultSet.getString("name"));
+                    row.add(resultSet.getString("position"));
+                    row.add(resultSet.getInt("age"));
+                    row.add(resultSet.getDouble("score"));
+                    actualRecords.add(row);
+                }
+
+                LOG.info("Expected records: {}", expectedRecords);
+                LOG.info("Actual records: {}", actualRecords);
+
+                Assertions.assertEquals(
+                        expectedRecords.size(),
+                        actualRecords.size(),
+                        "Record count mismatch. Expected: "
+                                + expectedRecords.size()
+                                + ", Actual: "
+                                + actualRecords.size());
+                for (int i = 0; i < expectedRecords.size(); i++) {
+                    Assertions.assertEquals(
+                            expectedRecords.get(i),
+                            actualRecords.get(i),
+                            "Record at index " + i + " does not match");
+                }
+            }
+        }
+        clearSinkTable();
+    }
+
+    private void clearSinkTable() throws SQLException {
+        try (Statement statement = connection.createStatement()) {
+            statement.execute("TRUNCATE TABLE sink_table");
+        }
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        this.minioContainer =
+                new GenericContainer<>("minio/minio:latest")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases("minio")
+                        .withEnv("MINIO_ROOT_USER", "minioadmin")
+                        .withEnv("MINIO_ROOT_PASSWORD", "minioadmin")
+                        .withCommand("server", "/data")
+                        .withExposedPorts(9000);
+
+        this.minioContainer.setWaitStrategy(
+                
Wait.defaultWaitStrategy().withStartupTimeout(Duration.ofSeconds(60)));
+
+        
this.minioContainer.setPortBindings(Lists.newArrayList(String.format("%s:%s", 
9000, 9000)));
+
+        this.minioContainer.start();
+
+        LOG.info("MinIO container starting,wait 5 secs ...");
+        Thread.sleep(5000);
+
+        boolean bucketCreated = createMinIOBucketWithAWSSDK("databend");
+        if (!bucketCreated) {
+            LOG.warn("can't make sure MinIO bucket create success,continue to 
start Databend");
+        }
+        this.container =
+                new DatabendContainer(DATABEND_DOCKER_IMAGE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(DATABEND_CONTAINER_HOST)
+                        .withUsername("root")
+                        .withPassword("")
+                        .withEnv("STORAGE_TYPE", "s3")
+                        .withEnv("STORAGE_S3_ENDPOINT_URL", 
"http://minio:9000";)
+                        .withEnv("STORAGE_S3_ACCESS_KEY_ID", "minioadmin")
+                        .withEnv("STORAGE_S3_SECRET_ACCESS_KEY", "minioadmin")
+                        .withEnv("STORAGE_S3_BUCKET", "databend")
+                        .withEnv("STORAGE_S3_REGION", "us-east-1")
+                        .withEnv("STORAGE_S3_ENABLE_VIRTUAL_HOST_STYLE", 
"false")
+                        .withEnv("STORAGE_S3_FORCE_PATH_STYLE", "true")
+                        .withUrlParam("ssl", "false");
+
+        this.container.setPortBindings(
+                Lists.newArrayList(
+                        String.format(
+                                "%s:%s", LOCAL_PORT, PORT) // host 8000 map to 
container port 8000
+                        ));
+
+        Startables.deepStart(Stream.of(this.container)).join();
+        LOG.info("Databend container started");
+        Awaitility.given()
+                .ignoreExceptions()
+                .atMost(360, TimeUnit.SECONDS)
+                .untilAsserted(this::initConnection);
+
+        this.initializeDatabendTable();
+    }
+
+    private void initializeDatabendTable() {
+        try (Statement statement = connection.createStatement(); ) {
+            // Create sink table
+            String createTableSql =
+                    "CREATE TABLE IF NOT EXISTS sink_table ("
+                            + "  id INT, "
+                            + "  name STRING, "
+                            + "  position STRING, "
+                            + "  age INT, "
+                            + "  score DOUBLE"
+                            + ")";
+            statement.execute(createTableSql);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing Databend table failed!", 
e);
+        }
+    }
+
+    /**
+     * using AWS SDK create MinIO bucket
+     *
+     * @param bucketName bucket
+     * @return success or not
+     */
+    private boolean createMinIOBucketWithAWSSDK(String bucketName) {
+        try {
+            LOG.info("using AWS SDK to create MinIO bucket: {}", bucketName);
+
+            AwsClientBuilder.EndpointConfiguration endpointConfig =
+                    new AwsClientBuilder.EndpointConfiguration(
+                            "http://localhost:9000";, "us-east-1");
+
+            AWSCredentials credentials = new BasicAWSCredentials("minioadmin", 
"minioadmin");
+            AWSCredentialsProvider credentialsProvider =
+                    new AWSStaticCredentialsProvider(credentials);
+
+            AmazonS3 s3Client =
+                    AmazonS3ClientBuilder.standard()
+                            .withEndpointConfiguration(endpointConfig)
+                            .withCredentials(credentialsProvider)
+                            .withPathStyleAccessEnabled(true)
+                            .disableChunkedEncoding()
+                            .build();
+
+            boolean bucketExists = s3Client.doesBucketExistV2(bucketName);
+            if (bucketExists) {
+                LOG.info("bucket {} exist,no need to create", bucketName);
+                return true;
+            }
+
+            s3Client.createBucket(bucketName);
+            LOG.info("create MinIO bucket success: {}", bucketName);
+            return true;
+        } catch (Exception e) {
+            LOG.error("using AWS SDK to create MinIO failed", e);
+            return false;
+        }
+    }
+
+    //    private synchronized Connection getConnection() throws SQLException {
+    //        if (this.connection == null || this.connection.isClosed()) {
+    //            LOG.info("Creating new database connection");
+    //            final Properties info = new Properties();
+    //            info.put("user", "root");
+    //            info.put("password", "");
+    //
+    //            String jdbcUrl =
+    //                    String.format(
+    //                            "jdbc:databend://%s:%d/%s?ssl=false",
+    //                            container.getHost(), 
container.getMappedPort(8000), DATABASE);
+    //
+    //            this.connection = DriverManager.getConnection(jdbcUrl, info);
+    //        }
+    //        return this.connection;
+    //    }
+
+    private void initConnection()
+            throws SQLException, ClassNotFoundException, 
InstantiationException,
+                    IllegalAccessException {
+        final Properties info = new Properties();
+        info.put("user", "root"); // Default Databend user
+        info.put("password", ""); // Default Databend password is empty
+        System.out.println("maped port is: " + container.getMappedPort(8000));
+        System.out.println("mapped host: is: " + container.getHost());
+
+        String jdbcUrl =
+                String.format(
+                        "jdbc:databend://%s:%d/%s?ssl=false",
+                        container.getHost(), container.getMappedPort(8000), 
DATABASE);
+
+        this.connection = DriverManager.getConnection(jdbcUrl, info);
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+
+        if (this.connection != null) {
+            try {
+                this.connection.close();
+                LOG.info("Database connection closed");
+
+                this.connection = null;
+            } catch (SQLException e) {
+                LOG.error("Error closing database connection", e);
+            }
+        }
+
+        // Add a longer sleep to ensure all heartbeat threads are properly 
terminated
+        Thread.sleep(10000);
+
+        if (this.container != null) {
+            this.container.stop();
+            LOG.info("Container stopped");
+        }
+
+        if (this.minioContainer != null) {
+            this.minioContainer.stop();
+            LOG.info("MinIO container stopped");
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-databend-e2e/src/test/resources/databend/fake_to_databend_cdc.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-databend-e2e/src/test/resources/databend/fake_to_databend_cdc.conf
new file mode 100644
index 0000000000..9a9eed6f9f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-databend-e2e/src/test/resources/databend/fake_to_databend_cdc.conf
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 2
+  job.mode = "BATCH"
+  checkpoint.interval  = 1000
+}
+
+source {
+  FakeSource {
+    row.num = 10
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+        position = "string"
+        age = "int"
+        score = "double"
+      }
+    }
+    
+    # CDC data with different row kinds
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "Alice", "Engineer", 30, 95.5]
+      },
+      {
+        kind = INSERT
+        fields = [2, "Bob", "Developer", 25, 85.0]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [2, "Bob", "Developer", 25, 85.0]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [2, "Bob", "Senior Developer", 25, 87.0]
+      },
+      {
+        kind = INSERT
+        fields = [3, "Charlie", "Engineer", 35, 92.5]
+      },
+      {
+        kind = INSERT
+        fields = [4, "David", "Designer", 28, 88.0]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [2, "Bob", "Senior Developer", 25, 87.0]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [2, "Bob", "Tech Lead", 25, 90.0]
+      },
+      {
+        kind = DELETE
+        fields = [2, "Bob", "Tech Lead", 25, 90.0]
+      }
+    ]
+  }
+}
+
+sink {
+  Databend {
+    url = "jdbc:databend://databend:8000/default?ssl=false"
+    username = "root"
+    password = ""
+    database = "default"
+    table = "sink_table"
+    
+    # Enable CDC mode
+    batch_size = 1
+    conflict_key = "id"
+    enable_delete = true
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 354a408745..2c1ef8076a 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -418,6 +418,7 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
                 || s.contains(
                         
"org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner")
                 || s.startsWith("Log4j2-TF-")
+                || s.startsWith("heartbeat") // Add heartbeat threads as 
system threads
                 || aqsThread.matcher(s).matches()
                 // The renewed background thread of the hdfs client
                 || s.startsWith("LeaseRenewer")

Reply via email to