This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6093c213b [feature][connector-v2][clickhouse] Support write cdc 
changelog event in clickhouse sink (#3653)
6093c213b is described below

commit 6093c213bf366caea945d9867da3f0f8bb4787f7
Author: hailin0 <[email protected]>
AuthorDate: Tue Dec 13 11:35:40 2022 +0800

    [feature][connector-v2][clickhouse] Support write cdc changelog event in 
clickhouse sink (#3653)
    
    * [feature][connector-v2][clickhouse] Support write cdc changelog event in 
clickhouse sink
    
    MergeTree Table Engine:
    - Support enable `allowExperimentalLightweightDelete` setting
    - CDC Events:
      - INSERT: INSERT INTO SQL
      - UPDATE_BEFORE: DELETE SQL
      - UPDATE_AFTER: INSERT INTO SQL
      - DELETE: DELETE SQL
    
    - ReplacingMergeTree Engine CDC Events:
      - INSERT: INSERT INTO SQL
      - UPDATE_BEFORE: ignore
      - UPDATE_AFTER: INSERT INTO SQL
      - DELETE: DELETE SQL
    
    Other Table Engine:
    - CDC Events:
      - INSERT: INSERT INTO SQL
      - UPDATE_BEFORE: ignore
      - UPDATE_AFTER: ALTER TABLE UPDATE SQL
      - DELETE: ALTER TABLE DELETE SQL
    
    * update
    
    * fix InsertOrUpdateBatchStatementExecutor
    
    * add check
---
 docs/en/connector-v2/sink/Clickhouse.md            |  87 +++++++--
 .../clickhouse/config/ClickhouseConfig.java        |  21 +-
 .../seatunnel/clickhouse/config/ReaderOption.java  |  69 ++-----
 .../seatunnel/clickhouse/shard/ShardMetadata.java  | 128 ++-----------
 .../clickhouse/sink/ClickhouseSinkFactory.java     |  18 +-
 .../clickhouse/sink/DistributedEngine.java         |  37 +---
 .../sink/client/ClickhouseBatchStatement.java      |  13 +-
 .../clickhouse/sink/client/ClickhouseProxy.java    |  42 ++--
 .../clickhouse/sink/client/ClickhouseSink.java     |  40 +++-
 .../sink/client/ClickhouseSinkWriter.java          | 120 ++----------
 .../clickhouse/sink/client/ShardRouter.java        |  10 +-
 .../executor/BufferedBatchStatementExecutor.java   |  70 +++++++
 .../InsertOrUpdateBatchStatementExecutor.java      | 138 ++++++++++++++
 .../executor/JdbcBatchStatementExecutor.java}      |  43 ++---
 .../JdbcBatchStatementExecutorBuilder.java         | 211 +++++++++++++++++++++
 .../sink/client/executor/JdbcRowConverter.java     | 129 +++++++++++++
 .../ReduceBufferedBatchStatementExecutor.java      | 119 ++++++++++++
 .../executor/SimpleBatchStatementExecutor.java     |  60 ++++++
 .../clickhouse/sink/client/executor/SqlUtils.java  |  82 ++++++++
 .../sink/client/executor/StatementFactory.java     |  29 +++
 .../clickhouse/sink/file/ClickhouseFileSink.java   |   2 +
 21 files changed, 1084 insertions(+), 384 deletions(-)

diff --git a/docs/en/connector-v2/sink/Clickhouse.md 
b/docs/en/connector-v2/sink/Clickhouse.md
index 4f1f0e16c..90bec5fc9 100644
--- a/docs/en/connector-v2/sink/Clickhouse.md
+++ b/docs/en/connector-v2/sink/Clickhouse.md
@@ -13,6 +13,7 @@ Used to write data to Clickhouse.
 The Clickhouse sink plug-in can achieve accuracy once by implementing 
idempotent writing, and needs to cooperate with aggregatingmergetree and other 
engines that support deduplication.
 
 - [ ] [schema projection](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
 
 :::tip
 
@@ -22,19 +23,22 @@ Write data to Clickhouse can also be done using JDBC
 
 ## Options
 
-| name           | type   | required | default value |
-|----------------|--------|----------|---------------|
-| host           | string | yes      | -             |
-| database       | string | yes      | -             |
-| table          | string | yes      | -             |
-| username       | string | yes      | -             |
-| password       | string | yes      | -             |
-| fields         | string | yes      | -             |
-| clickhouse.*   | string | no       |               |
-| bulk_size      | string | no       | 20000         |
-| split_mode     | string | no       | false         |
-| sharding_key   | string | no       | -             |
-| common-options |        | no       | -             |
+| name                                  | type    | required | default value |
+|---------------------------------------|---------|----------|---------------|
+| host                                  | string  | yes      | -             |
+| database                              | string  | yes      | -             |
+| table                                 | string  | yes      | -             |
+| username                              | string  | yes      | -             |
+| password                              | string  | yes      | -             |
+| fields                                | string  | yes      | -             |
+| clickhouse.*                          | string  | no       |               |
+| bulk_size                             | string  | no       | 20000         |
+| split_mode                            | string  | no       | false         |
+| sharding_key                          | string  | no       | -             |
+| primary_key                           | string  | no       | -             |
+| support_upsert                        | boolean | no       | false         |
+| allow_experimental_lightweight_delete | boolean | no       | false         |
+| common-options                        |         | no       | -             |
 
 ### host [string]
 
@@ -82,39 +86,90 @@ When use split_mode, which node to send data to is a 
problem, the default is ran
 'sharding_key' parameter can be used to specify the field for the sharding 
algorithm. This option only
 worked when 'split_mode' is true.
 
+### primary_key [string]
+
+Mark the primary key column from clickhouse table, and based on primary key 
execute INSERT/UPDATE/DELETE to clickhouse table
+
+### support_upsert [boolean]
+
+Support upsert row by query primary key
+
+### allow_experimental_lightweight_delete [boolean]
+
+Allow experimental lightweight delete based on `*MergeTree` table engine
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details
 
 ## Examples
 
+Simple
+
 ```hocon
 sink {
+  Clickhouse {
+    host = "localhost:8123"
+    database = "default"
+    table = "fake_all"
+    username = "default"
+    password = ""
+  }
+}
+```
+
+Split mode
 
+```hocon
+sink {
   Clickhouse {
     host = "localhost:8123"
     database = "default"
     table = "fake_all"
     username = "default"
     password = ""
+    
+    # split mode options
     split_mode = true
     sharding_key = "age"
   }
-  
 }
 ```
 
+CDC(Change data capture)
+
 ```hocon
 sink {
+  Clickhouse {
+    host = "localhost:8123"
+    database = "default"
+    table = "fake_all"
+    username = "default"
+    password = ""
+    
+    # cdc options
+    primary_key = "id"
+    support_upsert = true
+  }
+}
+```
 
+CDC(Change data capture) for *MergeTree engine
+
+```hocon
+sink {
   Clickhouse {
     host = "localhost:8123"
     database = "default"
     table = "fake_all"
     username = "default"
     password = ""
+    
+    # cdc options
+    primary_key = "id"
+    support_upsert = true
+    allow_experimental_lightweight_delete = true
   }
-  
 }
 ```
 
@@ -132,3 +187,5 @@ sink {
 - [Improve] Clickhouse Sink support nest type and array 
type([3047](https://github.com/apache/incubator-seatunnel/pull/3047))
 
 - [Improve] Clickhouse Sink support geo 
type([3141](https://github.com/apache/incubator-seatunnel/pull/3141))
+
+- [Feature] Support CDC write DELETE/UPDATE/INSERT events 
([3653](https://github.com/apache/incubator-seatunnel/pull/3653))
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
index 3f48df990..9f93eb36c 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
@@ -81,8 +81,25 @@ public class ClickhouseConfig {
     /**
      * When split_mode is true, the sharding_key use for split
      */
-    public static final Option<String> SHARDING_KEY = 
Options.key("sharding_key").stringType()
-        .noDefaultValue().withDescription("When split_mode is true, the 
sharding_key use for split");
+    public static final Option<String> SHARDING_KEY = 
Options.key("sharding_key")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("When split_mode is true, the sharding_key use for 
split");
+
+    public static final Option<String> PRIMARY_KEY = Options.key("primary_key")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("Mark the primary key column from clickhouse table, 
and based on primary key execute INSERT/UPDATE/DELETE to clickhouse table");
+
+    public static final Option<Boolean> SUPPORT_UPSERT = 
Options.key("support_upsert")
+        .booleanType()
+        .defaultValue(false)
+        .withDescription("Support upsert row by query primary key");
+
+    public static final Option<Boolean> ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE 
= Options.key("allow_experimental_lightweight_delete")
+        .booleanType()
+        .defaultValue(false)
+        .withDescription("Allow experimental lightweight delete based on 
`*MergeTree` table engine");
 
     /**
      * ClickhouseFile sink connector used clickhouse-local program's path
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java
index 084f54bcc..59f711741 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java
@@ -20,75 +20,28 @@ package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
 
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+@Builder
+@Getter
 public class ReaderOption implements Serializable {
 
     private ShardMetadata shardMetadata;
     private List<String> fields;
-
+    private String[] primaryKeys;
+    private boolean allowExperimentalLightweightDelete;
+    private boolean supportUpsert;
+    private String tableEngine;
     private Map<String, String> tableSchema;
+    @Setter
     private SeaTunnelRowType seaTunnelRowType;
     private Properties properties;
     private int bulkSize;
-
-    public ReaderOption(ShardMetadata shardMetadata,
-                        Properties properties, List<String> fields, 
Map<String, String> tableSchema, int bulkSize) {
-        this.shardMetadata = shardMetadata;
-        this.properties = properties;
-        this.fields = fields;
-        this.tableSchema = tableSchema;
-        this.bulkSize = bulkSize;
-    }
-
-    public Properties getProperties() {
-        return properties;
-    }
-
-    public void setProperties(Properties properties) {
-        this.properties = properties;
-    }
-
-    public ShardMetadata getShardMetadata() {
-        return shardMetadata;
-    }
-
-    public void setShardMetadata(ShardMetadata shardMetadata) {
-        this.shardMetadata = shardMetadata;
-    }
-
-    public SeaTunnelRowType getSeaTunnelRowType() {
-        return seaTunnelRowType;
-    }
-
-    public void setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
-    }
-
-    public Map<String, String> getTableSchema() {
-        return tableSchema;
-    }
-
-    public void setTableSchema(Map<String, String> tableSchema) {
-        this.tableSchema = tableSchema;
-    }
-
-    public List<String> getFields() {
-        return fields;
-    }
-
-    public void setFields(List<String> fields) {
-        this.fields = fields;
-    }
-
-    public int getBulkSize() {
-        return bulkSize;
-    }
-
-    public void setBulkSize(int bulkSize) {
-        this.bulkSize = bulkSize;
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java
index c40344b2a..7a8e98624 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java
@@ -17,9 +17,15 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.shard;
 
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
 import java.io.Serializable;
-import java.util.Objects;
 
+@Getter
+@EqualsAndHashCode
+@AllArgsConstructor
 public class ShardMetadata implements Serializable {
 
     private static final long serialVersionUID = -1L;
@@ -28,6 +34,7 @@ public class ShardMetadata implements Serializable {
     private String shardKeyType;
     private String database;
     private String table;
+    private String tableEngine;
     private boolean splitMode;
     private Shard defaultShard;
     private String username;
@@ -37,123 +44,10 @@ public class ShardMetadata implements Serializable {
                          String shardKeyType,
                          String database,
                          String table,
-                         boolean splitMode,
-                         Shard defaultShard,
-                         String username,
-                         String password) {
-        this.shardKey = shardKey;
-        this.shardKeyType = shardKeyType;
-        this.database = database;
-        this.table = table;
-        this.splitMode = splitMode;
-        this.defaultShard = defaultShard;
-        this.username = username;
-        this.password = password;
-    }
-
-    public ShardMetadata(String shardKey,
-                         String shardKeyType,
-                         String database,
-                         String table,
+                         String tableEngine,
                          boolean splitMode,
                          Shard defaultShard) {
-        this.shardKey = shardKey;
-        this.shardKeyType = shardKeyType;
-        this.database = database;
-        this.table = table;
-        this.splitMode = splitMode;
-        this.defaultShard = defaultShard;
-    }
-
-    public String getShardKey() {
-        return shardKey;
-    }
-
-    public void setShardKey(String shardKey) {
-        this.shardKey = shardKey;
-    }
-
-    public String getShardKeyType() {
-        return shardKeyType;
-    }
-
-    public void setShardKeyType(String shardKeyType) {
-        this.shardKeyType = shardKeyType;
-    }
-
-    public String getDatabase() {
-        return database;
-    }
-
-    public void setDatabase(String database) {
-        this.database = database;
-    }
-
-    public String getTable() {
-        return table;
-    }
-
-    public void setTable(String table) {
-        this.table = table;
-    }
-
-    public boolean getSplitMode() {
-        return splitMode;
-    }
-
-    public void setSplitMode(boolean splitMode) {
-        this.splitMode = splitMode;
-    }
-
-    public Shard getDefaultShard() {
-        return defaultShard;
-    }
-
-    public void setDefaultShard(Shard defaultShard) {
-        this.defaultShard = defaultShard;
-    }
-
-    public boolean isSplitMode() {
-        return splitMode;
-    }
-
-    public String getUsername() {
-        return username;
-    }
-
-    public void setUsername(String username) {
-        this.username = username;
-    }
-
-    public String getPassword() {
-        return password;
-    }
-
-    public void setPassword(String password) {
-        this.password = password;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        ShardMetadata that = (ShardMetadata) o;
-        return splitMode == that.splitMode
-                && Objects.equals(shardKey, that.shardKey)
-                && Objects.equals(shardKeyType, that.shardKeyType)
-                && Objects.equals(database, that.database)
-                && Objects.equals(table, that.table)
-                && Objects.equals(defaultShard, that.defaultShard)
-                && Objects.equals(username, that.username)
-                && Objects.equals(password, that.password);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(shardKey, shardKeyType, database, table, 
splitMode, defaultShard, username, password);
+        this(shardKey, shardKeyType, database, table, tableEngine,
+            splitMode, defaultShard, null, null);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
index 3a9dedf44..03d71f136 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
@@ -17,14 +17,17 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_PREFIX;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SUPPORT_UPSERT;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
 
@@ -43,8 +46,17 @@ public class ClickhouseSinkFactory implements 
TableSinkFactory {
 
     @Override
     public OptionRule optionRule() {
-        return OptionRule.builder().required(HOST, DATABASE, TABLE)
-            .optional(CLICKHOUSE_PREFIX, BULK_SIZE, SPLIT_MODE, FIELDS, 
SHARDING_KEY)
-            .bundled(USERNAME, PASSWORD).build();
+        return OptionRule.builder()
+            .required(HOST, DATABASE, TABLE)
+            .optional(CLICKHOUSE_PREFIX,
+                BULK_SIZE,
+                SPLIT_MODE,
+                FIELDS,
+                SHARDING_KEY,
+                PRIMARY_KEY,
+                SUPPORT_UPSERT,
+                ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE)
+            .bundled(USERNAME, PASSWORD)
+            .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
index 6a15d5919..067f09fdb 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
@@ -17,42 +17,19 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink;
 
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
 import java.io.Serializable;
 
+@AllArgsConstructor
+@Getter
 public class DistributedEngine implements Serializable {
 
     private static final long serialVersionUID = -1L;
     private String clusterName;
     private String database;
     private String table;
-
-    public DistributedEngine(String clusterName, String database, String 
table) {
-        this.clusterName = clusterName;
-        this.database = database;
-        this.table = table;
-    }
-
-    public String getClusterName() {
-        return clusterName;
-    }
-
-    public void setClusterName(String clusterName) {
-        this.clusterName = clusterName;
-    }
-
-    public String getDatabase() {
-        return database;
-    }
-
-    public void setDatabase(String database) {
-        this.database = database;
-    }
-
-    public String getTable() {
-        return table;
-    }
-
-    public void setTable(String table) {
-        this.table = table;
-    }
+    private String tableEngine;
+    private String tableDDL;
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java
index ae525acee..180cb4023 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java
@@ -17,23 +17,22 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
 
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcBatchStatementExecutor;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.tool.IntHolder;
 
 import com.clickhouse.jdbc.internal.ClickHouseConnectionImpl;
 
-import java.sql.PreparedStatement;
-
 public class ClickhouseBatchStatement {
 
     private final ClickHouseConnectionImpl clickHouseConnection;
-    private final PreparedStatement preparedStatement;
+    private final JdbcBatchStatementExecutor jdbcBatchStatementExecutor;
     private final IntHolder intHolder;
 
     public ClickhouseBatchStatement(ClickHouseConnectionImpl 
clickHouseConnection,
-                                    PreparedStatement preparedStatement,
+                                    JdbcBatchStatementExecutor 
jdbcBatchStatementExecutor,
                                     IntHolder intHolder) {
         this.clickHouseConnection = clickHouseConnection;
-        this.preparedStatement = preparedStatement;
+        this.jdbcBatchStatementExecutor = jdbcBatchStatementExecutor;
         this.intHolder = intHolder;
     }
 
@@ -41,8 +40,8 @@ public class ClickhouseBatchStatement {
         return clickHouseConnection;
     }
 
-    public PreparedStatement getPreparedStatement() {
-        return preparedStatement;
+    public JdbcBatchStatementExecutor getJdbcBatchStatementExecutor() {
+        return jdbcBatchStatementExecutor;
     }
 
     public IntHolder getIntHolder() {
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
index 67b4db7aa..def00f912 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
@@ -66,11 +66,6 @@ public class ClickhouseProxy {
         return 
c.connect(shard.getNode()).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
     }
 
-    public DistributedEngine getClickhouseDistributedTable(String database, 
String table) {
-        ClickHouseRequest<?> request = getClickhouseConnection();
-        return getClickhouseDistributedTable(request, database, table);
-    }
-
     public DistributedEngine 
getClickhouseDistributedTable(ClickHouseRequest<?> connection, String database,
                                                            String table) {
         String sql = String.format("select engine_full from system.tables 
where database = '%s' and name = '%s' and engine = 'Distributed'", database, 
table);
@@ -82,7 +77,30 @@ public class ClickhouseProxy {
                 String engineFull = record.getValue(0).asString();
                 List<String> infos = 
Arrays.stream(engineFull.substring(12).split(","))
                     .map(s -> s.replace("'", 
"").trim()).collect(Collectors.toList());
-                return new DistributedEngine(infos.get(0), infos.get(1), 
infos.get(2).replace("\\)", "").trim());
+
+                String clusterName = infos.get(0);
+                String localDatabase = infos.get(1);
+                String localTable = infos.get(2).replace("\\)", "").trim();
+
+                String localTableSQL = String.format("select 
engine,create_table_query from system.tables where database = '%s' and name = 
'%s'",
+                    localDatabase, localTable);
+                String localTableDDL;
+                String localTableEngine;
+                try (ClickHouseResponse localTableResponse = 
clickhouseRequest.query(localTableSQL).executeAndWait()) {
+                    List<ClickHouseRecord> localTableRecords = 
localTableResponse.stream().collect(Collectors.toList());
+                    if (localTableRecords.isEmpty()) {
+                        throw new 
ClickhouseConnectorException(SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot 
get table from clickhouse, resultSet is empty");
+                    }
+                    localTableEngine = 
localTableRecords.get(0).getValue(0).asString();
+                    localTableDDL = 
localTableRecords.get(0).getValue(1).asString();
+                    localTableDDL = localizationEngine(localTableEngine, 
localTableDDL);
+                }
+
+                return new DistributedEngine(clusterName,
+                    localDatabase,
+                    localTable,
+                    localTableEngine,
+                    localTableDDL);
             }
             throw new 
ClickhouseConnectorException(SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot 
get distributed table from clickhouse, resultSet is empty");
         } catch (ClickHouseException e) {
@@ -163,17 +181,7 @@ public class ClickhouseProxy {
             DistributedEngine distributedEngine = null;
             if ("Distributed".equals(engine)) {
                 distributedEngine = 
getClickhouseDistributedTable(clickhouseRequest, database, table);
-                String localTableSQL = String.format("select 
engine,create_table_query from system.tables where database = '%s' and name = 
'%s'",
-                    distributedEngine.getDatabase(), 
distributedEngine.getTable());
-                try (ClickHouseResponse rs = 
clickhouseRequest.query(localTableSQL).executeAndWait()) {
-                    List<ClickHouseRecord> localTableRecords = 
rs.stream().collect(Collectors.toList());
-                    if (localTableRecords.isEmpty()) {
-                        throw new 
ClickhouseConnectorException(SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot 
get table from clickhouse, resultSet is empty");
-                    }
-                    String localEngine = 
localTableRecords.get(0).getValue(0).asString();
-                    String createLocalTableDDL = 
localTableRecords.get(0).getValue(1).asString();
-                    createTableDDL = localizationEngine(localEngine, 
createLocalTableDDL);
-                }
+                createTableDDL = distributedEngine.getTableDDL();
             }
             return new ClickhouseTable(
                 database,
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index b87c8307e..61149daad 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -17,14 +17,17 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_PREFIX;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SUPPORT_UPSERT;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
 
@@ -64,6 +67,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 
@@ -126,9 +130,9 @@ public class ClickhouseSink implements 
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
         Map<String, String> tableSchema = 
proxy.getClickhouseTableSchema(config.getString(TABLE.key()));
         String shardKey = null;
         String shardKeyType = null;
+        ClickhouseTable table = 
proxy.getClickhouseTable(config.getString(DATABASE.key()),
+            config.getString(TABLE.key()));
         if (config.getBoolean(SPLIT_MODE.key())) {
-            ClickhouseTable table = 
proxy.getClickhouseTable(config.getString(DATABASE.key()),
-                config.getString(TABLE.key()));
             if (!"Distributed".equals(table.getEngine())) {
                 throw new 
ClickhouseConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "split mode only 
support table which engine is " +
                     "'Distributed' engine at now");
@@ -146,6 +150,7 @@ public class ClickhouseSink implements 
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
                 shardKeyType,
                 config.getString(DATABASE.key()),
                 config.getString(TABLE.key()),
+                table.getEngine(),
                 config.getBoolean(SPLIT_MODE.key()),
                 new Shard(1, 1, nodes.get(0)), 
config.getString(USERNAME.key()), config.getString(PASSWORD.key()));
         } else {
@@ -154,6 +159,7 @@ public class ClickhouseSink implements 
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
                 shardKeyType,
                 config.getString(DATABASE.key()),
                 config.getString(TABLE.key()),
+                table.getEngine(),
                 config.getBoolean(SPLIT_MODE.key()),
                 new Shard(1, 1, nodes.get(0)));
         }
@@ -171,7 +177,35 @@ public class ClickhouseSink implements 
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
             fields.addAll(tableSchema.keySet());
         }
         proxy.close();
-        this.option = new ReaderOption(metadata, clickhouseProperties, fields, 
tableSchema, config.getInt(BULK_SIZE.key()));
+
+        String[] primaryKeys = null;
+        if (config.hasPath(PRIMARY_KEY.key())) {
+            String primaryKey = config.getString(PRIMARY_KEY.key());
+            if (shardKey != null && !Objects.equals(primaryKey, shardKey)) {
+                throw new 
ClickhouseConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                    "sharding_key and primary_key must be consistent to ensure 
correct processing of cdc events");
+            }
+            primaryKeys = new String[]{primaryKey};
+        }
+        boolean supportUpsert = SUPPORT_UPSERT.defaultValue();
+        if (config.hasPath(SUPPORT_UPSERT.key())) {
+            supportUpsert = config.getBoolean(SUPPORT_UPSERT.key());
+        }
+        boolean allowExperimentalLightweightDelete = 
ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.defaultValue();
+        if (config.hasPath(ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.key())) {
+            allowExperimentalLightweightDelete = 
config.getBoolean(ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.key());
+        }
+        this.option = ReaderOption.builder()
+            .shardMetadata(metadata)
+            .properties(clickhouseProperties)
+            .fields(fields)
+            .tableEngine(table.getEngine())
+            .tableSchema(tableSchema)
+            .bulkSize(config.getInt(BULK_SIZE.key()))
+            .primaryKeys(primaryKeys)
+            .supportUpsert(supportUpsert)
+            
.allowExperimentalLightweightDelete(allowExperimentalLightweightDelete)
+            .build();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
index b103ecccc..deb015453 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
@@ -24,36 +24,21 @@ import 
org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.ArrayInjectFunction;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.BigDecimalInjectFunction;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.ClickhouseFieldInjectFunction;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DateInjectFunction;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DateTimeInjectFunction;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DoubleInjectFunction;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.FloatInjectFunction;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.IntInjectFunction;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.LongInjectFunction;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.MapInjectFunction;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.StringInjectFunction;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcBatchStatementExecutor;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcBatchStatementExecutorBuilder;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.tool.IntHolder;
 
 import com.clickhouse.jdbc.internal.ClickHouseConnectionImpl;
-import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 
 import java.io.IOException;
-import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 @Slf4j
 public class ClickhouseSinkWriter implements SinkWriter<SeaTunnelRow, 
CKCommitInfo, ClickhouseSinkState> {
@@ -62,22 +47,14 @@ public class ClickhouseSinkWriter implements 
SinkWriter<SeaTunnelRow, CKCommitIn
     private final ReaderOption option;
     private final ShardRouter shardRouter;
     private final transient ClickhouseProxy proxy;
-    private final String prepareSql;
     private final Map<Shard, ClickhouseBatchStatement> statementMap;
-    private final Map<String, ClickhouseFieldInjectFunction> 
fieldInjectFunctionMap;
-    private static final ClickhouseFieldInjectFunction DEFAULT_INJECT_FUNCTION 
= new StringInjectFunction();
-
-    private static final Pattern NULLABLE = 
Pattern.compile("Nullable\\((.*)\\)");
-    private static final Pattern LOW_CARDINALITY = 
Pattern.compile("LowCardinality\\((.*)\\)");
 
     ClickhouseSinkWriter(ReaderOption option, Context context) {
         this.option = option;
         this.context = context;
 
         this.proxy = new 
ClickhouseProxy(option.getShardMetadata().getDefaultShard().getNode());
-        this.fieldInjectFunctionMap = initFieldInjectFunctionMap();
         this.shardRouter = new ShardRouter(proxy, option.getShardMetadata());
-        this.prepareSql = initPrepareSQL();
         this.statementMap = initStatementMap();
     }
 
@@ -90,7 +67,7 @@ public class ClickhouseSinkWriter implements 
SinkWriter<SeaTunnelRow, CKCommitIn
             shardKey = element.getField(i);
         }
         ClickhouseBatchStatement statement = 
statementMap.get(shardRouter.getShard(shardKey));
-        PreparedStatement clickHouseStatement = 
statement.getPreparedStatement();
+        JdbcBatchStatementExecutor clickHouseStatement = 
statement.getJdbcBatchStatementExecutor();
         IntHolder sizeHolder = statement.getIntHolder();
         // add into batch
         addIntoBatch(element, clickHouseStatement);
@@ -117,7 +94,7 @@ public class ClickhouseSinkWriter implements 
SinkWriter<SeaTunnelRow, CKCommitIn
         this.proxy.close();
         for (ClickhouseBatchStatement batchStatement : statementMap.values()) {
             try (ClickHouseConnectionImpl needClosedConnection = 
batchStatement.getClickHouseConnection();
-                 PreparedStatement needClosedStatement = 
batchStatement.getPreparedStatement()) {
+                 JdbcBatchStatementExecutor needClosedStatement = 
batchStatement.getJdbcBatchStatementExecutor()) {
                 IntHolder intHolder = batchStatement.getIntHolder();
                 if (intHolder.getValue() > 0) {
                     flush(needClosedStatement);
@@ -129,29 +106,15 @@ public class ClickhouseSinkWriter implements 
SinkWriter<SeaTunnelRow, CKCommitIn
         }
     }
 
-    private void addIntoBatch(SeaTunnelRow row, PreparedStatement 
clickHouseStatement) {
+    private void addIntoBatch(SeaTunnelRow row, JdbcBatchStatementExecutor 
clickHouseStatement) {
         try {
-            for (int i = 0; i < option.getFields().size(); i++) {
-                String fieldName = option.getFields().get(i);
-                Object fieldValue = 
row.getField(option.getSeaTunnelRowType().indexOf(fieldName));
-                if (fieldValue == null) {
-                    // field does not exist in row
-                    // todo: do we need to transform to default value of each 
type
-                    clickHouseStatement.setObject(i + 1, null);
-                    continue;
-                }
-                String fieldType = option.getTableSchema().get(fieldName);
-                fieldInjectFunctionMap
-                    .getOrDefault(fieldType, DEFAULT_INJECT_FUNCTION)
-                    .injectFields(clickHouseStatement, i + 1, fieldValue);
-            }
-            clickHouseStatement.addBatch();
+            clickHouseStatement.addToBatch(row);
         } catch (SQLException e) {
             throw new 
ClickhouseConnectorException(CommonErrorCode.SQL_OPERATION_FAILED, "Add row 
data into batch error", e);
         }
     }
 
-    private void flush(PreparedStatement clickHouseStatement) {
+    private void flush(JdbcBatchStatementExecutor clickHouseStatement) {
         try {
             clickHouseStatement.executeBatch();
         } catch (Exception e) {
@@ -165,10 +128,21 @@ public class ClickhouseSinkWriter implements 
SinkWriter<SeaTunnelRow, CKCommitIn
             try {
                 ClickHouseConnectionImpl clickhouseConnection = new 
ClickHouseConnectionImpl(s.getJdbcUrl(),
                     this.option.getProperties());
-                PreparedStatement preparedStatement = 
clickhouseConnection.prepareStatement(prepareSql);
+
+                JdbcBatchStatementExecutor jdbcBatchStatementExecutor = new 
JdbcBatchStatementExecutorBuilder()
+                    .setTable(shardRouter.getShardTable())
+                    .setTableEngine(shardRouter.getShardTableEngine())
+                    .setRowType(option.getSeaTunnelRowType())
+                    .setPrimaryKeys(option.getPrimaryKeys())
+                    .setClickhouseTableSchema(option.getTableSchema())
+                    .setProjectionFields(option.getFields().toArray(new 
String[0]))
+                    
.setAllowExperimentalLightweightDelete(option.isAllowExperimentalLightweightDelete())
+                    .setSupportUpsert(option.isSupportUpsert())
+                    .build();
+                
jdbcBatchStatementExecutor.prepareStatements(clickhouseConnection);
                 IntHolder intHolder = new IntHolder();
                 ClickhouseBatchStatement batchStatement =
-                    new ClickhouseBatchStatement(clickhouseConnection, 
preparedStatement, intHolder);
+                    new ClickhouseBatchStatement(clickhouseConnection, 
jdbcBatchStatementExecutor, intHolder);
                 result.put(s, batchStatement);
             } catch (SQLException e) {
                 throw new 
ClickhouseConnectorException(CommonErrorCode.SQL_OPERATION_FAILED, "Clickhouse 
prepare statement error: " + e.getMessage(), e);
@@ -176,58 +150,4 @@ public class ClickhouseSinkWriter implements 
SinkWriter<SeaTunnelRow, CKCommitIn
         });
         return result;
     }
-
-    private String initPrepareSQL() {
-        String[] placeholder = new String[option.getFields().size()];
-        Arrays.fill(placeholder, "?");
-
-        return String.format("INSERT INTO %s (%s) VALUES (%s)",
-            shardRouter.getShardTable(),
-            String.join(",", option.getFields()),
-            String.join(",", placeholder));
-    }
-
-    private Map<String, ClickhouseFieldInjectFunction> 
initFieldInjectFunctionMap() {
-        Map<String, ClickhouseFieldInjectFunction> result = new 
HashMap<>(Common.COLLECTION_SIZE);
-        List<ClickhouseFieldInjectFunction> clickhouseFieldInjectFunctions;
-        ClickhouseFieldInjectFunction defaultFunction = new 
StringInjectFunction();
-        // get field type
-        for (String field : this.option.getFields()) {
-            clickhouseFieldInjectFunctions = Lists.newArrayList(
-                new ArrayInjectFunction(),
-                new MapInjectFunction(),
-                new BigDecimalInjectFunction(),
-                new DateInjectFunction(),
-                new DateTimeInjectFunction(),
-                new LongInjectFunction(),
-                new DoubleInjectFunction(),
-                new FloatInjectFunction(),
-                new IntInjectFunction(),
-                new StringInjectFunction()
-            );
-            ClickhouseFieldInjectFunction function = defaultFunction;
-            String fieldType = this.option.getTableSchema().get(field);
-            for (ClickhouseFieldInjectFunction clickhouseFieldInjectFunction : 
clickhouseFieldInjectFunctions) {
-                if 
(clickhouseFieldInjectFunction.isCurrentFieldType(unwrapCommonPrefix(fieldType)))
 {
-                    function = clickhouseFieldInjectFunction;
-                    break;
-                }
-            }
-            result.put(fieldType, function);
-        }
-        return result;
-    }
-
-    private String unwrapCommonPrefix(String fieldType) {
-        Matcher nullMatcher = NULLABLE.matcher(fieldType);
-        Matcher lowMatcher = LOW_CARDINALITY.matcher(fieldType);
-        if (nullMatcher.matches()) {
-            return nullMatcher.group(1);
-        } else if (lowMatcher.matches()) {
-            return lowMatcher.group(1);
-        } else {
-            return fieldType;
-        }
-    }
-
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
index 71e6430fc..b31fb15ae 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
@@ -40,7 +40,9 @@ public class ShardRouter implements Serializable {
     private static final long serialVersionUID = -1L;
 
     private String shardTable;
+    private String shardTableEngine;
     private final String table;
+    private final String tableEngine;
     private int shardWeightCount;
     private final TreeMap<Integer, Shard> shards;
     private final String shardKey;
@@ -54,8 +56,9 @@ public class ShardRouter implements Serializable {
         this.shards = new TreeMap<>();
         this.shardKey = shardMetadata.getShardKey();
         this.shardKeyType = shardMetadata.getShardKeyType();
-        this.splitMode = shardMetadata.getSplitMode();
+        this.splitMode = shardMetadata.isSplitMode();
         this.table = shardMetadata.getTable();
+        this.tableEngine = shardMetadata.getTableEngine();
         if (StringUtils.isNotEmpty(shardKey) && 
StringUtils.isEmpty(shardKeyType)) {
             throw new 
ClickhouseConnectorException(ClickhouseConnectorErrorCode.SHARD_KEY_NOT_FOUND, 
"Shard key " + shardKey + " not found in table " + table);
         }
@@ -63,6 +66,7 @@ public class ShardRouter implements Serializable {
         if (splitMode) {
             DistributedEngine localTable = 
proxy.getClickhouseDistributedTable(connection, shardMetadata.getDatabase(), 
table);
             this.shardTable = localTable.getTable();
+            this.shardTableEngine = localTable.getTableEngine();
             List<Shard> shardList = proxy.getClusterShardList(connection, 
localTable.getClusterName(),
                 localTable.getDatabase(), 
shardMetadata.getDefaultShard().getNode().getPort(),
                 shardMetadata.getUsername(), shardMetadata.getPassword());
@@ -81,6 +85,10 @@ public class ShardRouter implements Serializable {
         return splitMode ? shardTable : table;
     }
 
+    public String getShardTableEngine() {
+        return splitMode ? shardTableEngine : tableEngine;
+    }
+
     public Shard getShard(Object shardValue) {
         if (!splitMode) {
             return shards.firstEntry().getValue();
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/BufferedBatchStatementExecutor.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/BufferedBatchStatementExecutor.java
new file mode 100644
index 000000000..7b5a4d249
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/BufferedBatchStatementExecutor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+@RequiredArgsConstructor
+public class BufferedBatchStatementExecutor implements 
JdbcBatchStatementExecutor {
+    @NonNull
+    private final JdbcBatchStatementExecutor statementExecutor;
+    @NonNull
+    private final Function<SeaTunnelRow, SeaTunnelRow> valueTransform;
+    @NonNull
+    private final List<SeaTunnelRow> buffer = new ArrayList<>();
+
+    @Override
+    public void prepareStatements(Connection connection) throws SQLException {
+        statementExecutor.prepareStatements(connection);
+    }
+
+    @Override
+    public void addToBatch(SeaTunnelRow record) throws SQLException {
+        buffer.add(valueTransform.apply(record));
+    }
+
+    @Override
+    public void executeBatch() throws SQLException {
+        if (!buffer.isEmpty()) {
+            for (SeaTunnelRow row : buffer) {
+                statementExecutor.addToBatch(row);
+            }
+            statementExecutor.executeBatch();
+            buffer.clear();
+        }
+    }
+
+    @Override
+    public void closeStatements() throws SQLException {
+        if (!buffer.isEmpty()) {
+            executeBatch();
+        }
+        if (statementExecutor != null) {
+            statementExecutor.closeStatements();
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/InsertOrUpdateBatchStatementExecutor.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/InsertOrUpdateBatchStatementExecutor.java
new file mode 100644
index 000000000..a824772af
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/InsertOrUpdateBatchStatementExecutor.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.function.Function;
+
+@RequiredArgsConstructor
+public class InsertOrUpdateBatchStatementExecutor implements 
JdbcBatchStatementExecutor {
+    private final StatementFactory existStmtFactory;
+    @NonNull
+    private final StatementFactory insertStmtFactory;
+    @NonNull
+    private final StatementFactory updateStmtFactory;
+    private final Function<SeaTunnelRow, SeaTunnelRow> keyExtractor;
+    private final JdbcRowConverter keyRowConverter;
+    @NonNull
+    private final JdbcRowConverter valueRowConverter;
+    private transient PreparedStatement existStatement;
+    private transient PreparedStatement insertStatement;
+    private transient PreparedStatement updateStatement;
+    private transient Boolean preChangeFlag;
+    private transient boolean submitted;
+
+    public InsertOrUpdateBatchStatementExecutor(StatementFactory 
insertStmtFactory,
+                                                StatementFactory 
updateStmtFactory,
+                                                JdbcRowConverter rowConverter) 
{
+        this(null, insertStmtFactory, updateStmtFactory,
+            null, null, rowConverter);
+    }
+
+    @Override
+    public void prepareStatements(Connection connection) throws SQLException {
+        if (upsertMode()) {
+            existStatement = existStmtFactory.createStatement(connection);
+        }
+        insertStatement = insertStmtFactory.createStatement(connection);
+        updateStatement = updateStmtFactory.createStatement(connection);
+    }
+
+    private boolean upsertMode() {
+        return existStmtFactory != null;
+    }
+
+    private boolean hasInsert(SeaTunnelRow record) throws SQLException {
+        if (upsertMode()) {
+            return !exist(keyExtractor.apply(record));
+        }
+        switch (record.getRowKind()) {
+            case INSERT:
+                return true;
+            case UPDATE_AFTER:
+                return false;
+            default:
+                // todo
+                throw new UnsupportedOperationException();
+        }
+    }
+
+    @Override
+    public void addToBatch(SeaTunnelRow record) throws SQLException {
+        boolean currentChangeFlag = hasInsert(record);
+        if (currentChangeFlag) {
+            if (preChangeFlag != null && !preChangeFlag) {
+                updateStatement.executeBatch();
+                updateStatement.clearBatch();
+            }
+            valueRowConverter.toExternal(record, insertStatement);
+            insertStatement.addBatch();
+        } else {
+            if (preChangeFlag != null && preChangeFlag) {
+                insertStatement.executeBatch();
+                insertStatement.clearBatch();
+            }
+            valueRowConverter.toExternal(record, updateStatement);
+            updateStatement.addBatch();
+        }
+        preChangeFlag = currentChangeFlag;
+        submitted = false;
+    }
+
+    @Override
+    public void executeBatch() throws SQLException {
+        if (preChangeFlag != null) {
+            if (preChangeFlag) {
+                insertStatement.executeBatch();
+                insertStatement.clearBatch();
+            } else {
+                updateStatement.executeBatch();
+                updateStatement.clearBatch();
+            }
+        }
+        submitted = true;
+    }
+
+    @Override
+    public void closeStatements() throws SQLException {
+        if (!submitted) {
+            executeBatch();
+        }
+        for (PreparedStatement statement : Arrays.asList(existStatement, 
insertStatement, updateStatement)) {
+            if (statement != null) {
+                statement.close();
+            }
+        }
+    }
+
+    private boolean exist(SeaTunnelRow pk) throws SQLException {
+        keyRowConverter.toExternal(pk, existStatement);
+        try (ResultSet resultSet = existStatement.executeQuery()) {
+            return resultSet.next();
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutor.java
similarity index 50%
copy from 
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
copy to 
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutor.java
index 6a15d5919..961a11ad5 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutor.java
@@ -15,44 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink;
+package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor;
 
-import java.io.Serializable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
-public class DistributedEngine implements Serializable {
+import java.sql.Connection;
+import java.sql.SQLException;
 
-    private static final long serialVersionUID = -1L;
-    private String clusterName;
-    private String database;
-    private String table;
+public interface JdbcBatchStatementExecutor extends AutoCloseable{
 
-    public DistributedEngine(String clusterName, String database, String 
table) {
-        this.clusterName = clusterName;
-        this.database = database;
-        this.table = table;
-    }
-
-    public String getClusterName() {
-        return clusterName;
-    }
-
-    public void setClusterName(String clusterName) {
-        this.clusterName = clusterName;
-    }
+    void prepareStatements(Connection connection) throws SQLException;
 
-    public String getDatabase() {
-        return database;
-    }
+    void addToBatch(SeaTunnelRow record) throws SQLException;
 
-    public void setDatabase(String database) {
-        this.database = database;
-    }
+    void executeBatch() throws SQLException;
 
-    public String getTable() {
-        return table;
-    }
+    void closeStatements() throws SQLException;
 
-    public void setTable(String table) {
-        this.table = table;
+    @Override
+    default void close() throws SQLException {
+        closeStatements();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java
new file mode 100644
index 000000000..cf2508841
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import lombok.Setter;
+import lombok.experimental.Accessors;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+
+@Setter
+@Accessors(chain = true)
+public class JdbcBatchStatementExecutorBuilder {
+    private static final String MERGE_TREE_ENGINE_SUFFIX = "MergeTree";
+    private static final String REPLACING_MERGE_TREE_ENGINE_SUFFIX = 
"ReplacingMergeTree";
+    private String table;
+    private String tableEngine;
+    private SeaTunnelRowType rowType;
+    private String[] primaryKeys;
+    private String[] projectionFields;
+    private Map<String, String> clickhouseTableSchema;
+    private boolean supportUpsert;
+    private boolean allowExperimentalLightweightDelete;
+    private String[] orderByKeys;
+
+    private boolean supportMergeTreeEngineExperimentalLightweightDelete() {
+        return tableEngine.endsWith(MERGE_TREE_ENGINE_SUFFIX)
+            && allowExperimentalLightweightDelete;
+    }
+
+    private boolean supportReplacingMergeTreeTableUpsert() {
+        return tableEngine.endsWith(REPLACING_MERGE_TREE_ENGINE_SUFFIX)
+            && Objects.equals(primaryKeys, orderByKeys);
+    }
+
+    private String[] getDefaultProjectionFields() {
+        List<String> fieldNames = Arrays.asList(rowType.getFieldNames());
+        return clickhouseTableSchema.keySet()
+            .stream()
+            .filter(field -> fieldNames.contains(field))
+            .toArray(value -> new String[0]);
+    }
+
+    public JdbcBatchStatementExecutor build() {
+        Objects.requireNonNull(table);
+        Objects.requireNonNull(tableEngine);
+        Objects.requireNonNull(rowType);
+        Objects.requireNonNull(clickhouseTableSchema);
+        if (projectionFields == null) {
+            projectionFields = getDefaultProjectionFields();
+        }
+
+        JdbcRowConverter valueRowConverter = new JdbcRowConverter(
+            rowType, clickhouseTableSchema, projectionFields);
+        if (primaryKeys == null || primaryKeys.length == 0) {
+            // INSERT: writer all events when primary-keys is empty
+            return createInsertBufferedExecutor(table, rowType, 
valueRowConverter);
+        }
+
+        int[] pkFields = Arrays.stream(primaryKeys)
+            .mapToInt(Arrays.asList(rowType.getFieldNames())::indexOf)
+            .toArray();
+        SeaTunnelDataType[] pkTypes = getKeyTypes(pkFields, rowType);
+        JdbcRowConverter pkRowConverter = new JdbcRowConverter(
+            new SeaTunnelRowType(primaryKeys, pkTypes), clickhouseTableSchema, 
primaryKeys);
+        Function<SeaTunnelRow, SeaTunnelRow> pkExtractor = 
createKeyExtractor(pkFields);
+
+        if (supportMergeTreeEngineExperimentalLightweightDelete()) {
+            boolean convertUpdateBeforeEventToDeleteAction;
+            // DELETE: delete sql
+            JdbcBatchStatementExecutor deleteExecutor = createDeleteExecutor(
+                table, primaryKeys, pkRowConverter);
+            JdbcBatchStatementExecutor updateExecutor;
+            if (supportReplacingMergeTreeTableUpsert()) {
+                // ReplacingMergeTree Update Row: upsert row by 
order-by-keys(update_after event)
+                updateExecutor = createInsertExecutor(table, rowType, 
valueRowConverter);
+                convertUpdateBeforeEventToDeleteAction = false;
+            } else {
+                // *MergeTree Update Row:
+                // 1. delete(update_before event) + insert or update by query 
primary-keys(update_after event)
+                // 2. delete(update_before event) + insert(update_after event)
+                updateExecutor = supportUpsert ?
+                    createUpsertExecutor(table, rowType, primaryKeys, 
pkExtractor, pkRowConverter, valueRowConverter)
+                    : createInsertExecutor(table, rowType, valueRowConverter);
+                convertUpdateBeforeEventToDeleteAction = true;
+            }
+            return new ReduceBufferedBatchStatementExecutor(updateExecutor, 
deleteExecutor, pkExtractor,
+                Function.identity(), !convertUpdateBeforeEventToDeleteAction);
+        }
+
+        // DELETE: alter table delete sql
+        JdbcBatchStatementExecutor deleteExecutor = 
createAlterTableDeleteExecutor(
+            table, primaryKeys, pkRowConverter);
+        JdbcBatchStatementExecutor updateExecutor;
+        if (supportReplacingMergeTreeTableUpsert()) {
+            updateExecutor = createInsertExecutor(table, rowType, 
valueRowConverter);
+        } else {
+            // Other-Engine Update Row:
+            // 1. insert or update by query primary-keys(insert/update_after 
event)
+            // 2. insert(insert event) + alter table update(update_after event)
+            updateExecutor = supportUpsert ?
+                createUpsertExecutor(table, rowType, primaryKeys, pkExtractor, 
pkRowConverter, valueRowConverter)
+                : createInsertOrUpdateExecutor(table, rowType, primaryKeys, 
valueRowConverter);
+        }
+        return new ReduceBufferedBatchStatementExecutor(
+            updateExecutor, deleteExecutor, pkExtractor,
+            Function.identity(), true);
+    }
+
+    private static JdbcBatchStatementExecutor 
createInsertBufferedExecutor(String table,
+                                                                           
SeaTunnelRowType rowType,
+                                                                           
JdbcRowConverter rowConverter) {
+        return new BufferedBatchStatementExecutor(
+            createInsertExecutor(table, rowType, rowConverter), 
Function.identity());
+    }
+
+    private static JdbcBatchStatementExecutor 
createInsertOrUpdateExecutor(String table,
+                                                                           
SeaTunnelRowType rowType,
+                                                                           
String[] pkNames,
+                                                                           
JdbcRowConverter rowConverter) {
+        return new InsertOrUpdateBatchStatementExecutor(
+            connection -> 
connection.prepareStatement(SqlUtils.getInsertIntoStatement(table, 
rowType.getFieldNames())),
+            connection -> 
connection.prepareStatement(SqlUtils.getAlterTableUpdateStatement(table, 
rowType.getFieldNames(), pkNames)),
+            rowConverter);
+    }
+
+    private static JdbcBatchStatementExecutor createUpsertExecutor(String 
table,
+                                                                   
SeaTunnelRowType rowType,
+                                                                   String[] 
pkNames,
+                                                                   
Function<SeaTunnelRow, SeaTunnelRow> keyExtractor,
+                                                                   
JdbcRowConverter keyConverter,
+                                                                   
JdbcRowConverter valueConverter) {
+        return new InsertOrUpdateBatchStatementExecutor(
+            connection -> 
connection.prepareStatement(SqlUtils.getRowExistsStatement(table, pkNames)),
+            connection -> 
connection.prepareStatement(SqlUtils.getInsertIntoStatement(table, 
rowType.getFieldNames())),
+            connection -> 
connection.prepareStatement(SqlUtils.getAlterTableUpdateStatement(table, 
rowType.getFieldNames(), pkNames)),
+            keyExtractor,
+            keyConverter,
+            valueConverter);
+    }
+
+    private static JdbcBatchStatementExecutor createInsertExecutor(String 
table,
+                                                                   
SeaTunnelRowType rowType,
+                                                                   
JdbcRowConverter rowConverter) {
+        String insertSQL = SqlUtils.getInsertIntoStatement(table, 
rowType.getFieldNames());
+        return createSimpleExecutor(insertSQL, rowConverter);
+    }
+
+    private static JdbcBatchStatementExecutor createDeleteExecutor(String 
table,
+                                                                   String[] 
primaryKeys,
+                                                                   
JdbcRowConverter rowConverter) {
+        String deleteSQL = SqlUtils.getDeleteStatement(table, primaryKeys);
+        return createSimpleExecutor(deleteSQL, rowConverter);
+    }
+
+    private static JdbcBatchStatementExecutor 
createAlterTableDeleteExecutor(String table,
+                                                                             
String[] primaryKeys,
+                                                                             
JdbcRowConverter rowConverter) {
+        String alterTableDeleteSQL = 
SqlUtils.getAlterTableDeleteStatement(table, primaryKeys);
+        return createSimpleExecutor(alterTableDeleteSQL, rowConverter);
+    }
+
+    private static JdbcBatchStatementExecutor createSimpleExecutor(String sql,
+                                                                   
JdbcRowConverter rowConverter) {
+        return new SimpleBatchStatementExecutor(
+            connection -> connection.prepareStatement(sql),
+            rowConverter);
+    }
+
+    private static SeaTunnelDataType[] getKeyTypes(int[] pkFields, 
SeaTunnelRowType rowType) {
+        return Arrays.stream(pkFields)
+            .mapToObj((IntFunction<SeaTunnelDataType>) index -> 
rowType.getFieldType(index))
+            .toArray(length -> new SeaTunnelDataType[length]);
+    }
+
+    private static Function<SeaTunnelRow, SeaTunnelRow> 
createKeyExtractor(int[] pkFields) {
+        return row -> {
+            Object[] fields = new Object[pkFields.length];
+            for (int i = 0; i < pkFields.length; i++) {
+                fields[i] = row.getField(pkFields[i]);
+            }
+            SeaTunnelRow newRow = new SeaTunnelRow(fields);
+            newRow.setTableId(row.getTableId());
+            newRow.setRowKind(row.getRowKind());
+            return row;
+        };
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcRowConverter.java
new file mode 100644
index 000000000..ec0ce2d80
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcRowConverter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.ArrayInjectFunction;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.BigDecimalInjectFunction;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.ClickhouseFieldInjectFunction;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DateInjectFunction;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DateTimeInjectFunction;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DoubleInjectFunction;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.FloatInjectFunction;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.IntInjectFunction;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.LongInjectFunction;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.MapInjectFunction;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.StringInjectFunction;
+
+import lombok.NonNull;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class JdbcRowConverter implements Serializable {
+    private static final Pattern NULLABLE = 
Pattern.compile("Nullable\\((.*)\\)");
+    private static final Pattern LOW_CARDINALITY = 
Pattern.compile("LowCardinality\\((.*)\\)");
+    private static final ClickhouseFieldInjectFunction DEFAULT_INJECT_FUNCTION 
= new StringInjectFunction();
+
+    private final String[] projectionFields;
+    private final Map<String, ClickhouseFieldInjectFunction> 
fieldInjectFunctionMap;
+    private final Map<String, Function<SeaTunnelRow, Object>> fieldGetterMap;
+
+    public JdbcRowConverter(@NonNull SeaTunnelRowType rowType,
+                            @NonNull Map<String, String> clickhouseTableSchema,
+                            @NonNull String[] projectionFields) {
+        this.projectionFields = projectionFields;
+        this.fieldInjectFunctionMap = createFieldInjectFunctionMap(
+            projectionFields, clickhouseTableSchema);
+        this.fieldGetterMap = createFieldGetterMap(
+            projectionFields, rowType);
+    }
+
+    public PreparedStatement toExternal(SeaTunnelRow row,
+                                        PreparedStatement statement) throws 
SQLException {
+        for (int i = 0; i < projectionFields.length; i++) {
+            String fieldName = projectionFields[i];
+            Object fieldValue = fieldGetterMap.get(fieldName).apply(row);
+            if (fieldValue == null) {
+                // field does not exist in row
+                // todo: do we need to transform to default value of each type
+                statement.setObject(i + 1, null);
+                continue;
+            }
+            fieldInjectFunctionMap.getOrDefault(fieldName, 
DEFAULT_INJECT_FUNCTION)
+                .injectFields(statement, i + 1, fieldValue);
+        }
+        return statement;
+    }
+
+    private Map<String, ClickhouseFieldInjectFunction> 
createFieldInjectFunctionMap(String[] fields,
+                                                                               
     Map<String, String> clickhouseTableSchema) {
+        Map<String, ClickhouseFieldInjectFunction> fieldInjectFunctionMap = 
new HashMap<>();
+        for (String field : fields) {
+            String fieldType = clickhouseTableSchema.get(field);
+            ClickhouseFieldInjectFunction injectFunction = Arrays.asList(
+                new ArrayInjectFunction(),
+                new MapInjectFunction(),
+                new BigDecimalInjectFunction(),
+                new DateInjectFunction(),
+                new DateTimeInjectFunction(),
+                new LongInjectFunction(),
+                new DoubleInjectFunction(),
+                new FloatInjectFunction(),
+                new IntInjectFunction(),
+                new StringInjectFunction())
+                .stream()
+                .filter(f -> 
f.isCurrentFieldType(unwrapCommonPrefix(fieldType)))
+                .findFirst()
+                .orElse(new StringInjectFunction());
+            fieldInjectFunctionMap.put(field, injectFunction);
+        }
+        return fieldInjectFunctionMap;
+    }
+
+    private Map<String, Function<SeaTunnelRow, Object>> 
createFieldGetterMap(String[] fields,
+                                                                             
SeaTunnelRowType rowType) {
+        Map<String, Function<SeaTunnelRow, Object>> fieldGetterMap = new 
HashMap<>();
+        for (int i = 0; i < fields.length; i++) {
+            String fieldName = fields[i];
+            int fieldIndex = rowType.indexOf(fieldName);
+            fieldGetterMap.put(fieldName, row -> row.getField(fieldIndex));
+        }
+        return fieldGetterMap;
+    }
+
+    private String unwrapCommonPrefix(String fieldType) {
+        Matcher nullMatcher = NULLABLE.matcher(fieldType);
+        Matcher lowMatcher = LOW_CARDINALITY.matcher(fieldType);
+        if (nullMatcher.matches()) {
+            return nullMatcher.group(1);
+        } else if (lowMatcher.matches()) {
+            return lowMatcher.group(1);
+        } else {
+            return fieldType;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/ReduceBufferedBatchStatementExecutor.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/ReduceBufferedBatchStatementExecutor.java
new file mode 100644
index 000000000..2dab2abea
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/ReduceBufferedBatchStatementExecutor.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor;
+
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import lombok.AllArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+@AllArgsConstructor
+@RequiredArgsConstructor
+public class ReduceBufferedBatchStatementExecutor implements 
JdbcBatchStatementExecutor {
+    @NonNull
+    private final JdbcBatchStatementExecutor insertOrUpdateExecutor;
+    @NonNull
+    private final JdbcBatchStatementExecutor deleteExecutor;
+    @NonNull
+    private final Function<SeaTunnelRow, SeaTunnelRow> keyExtractor;
+    @NonNull
+    private final Function<SeaTunnelRow, SeaTunnelRow> valueTransform;
+    private boolean ignoreUpdateBefore;
+    @NonNull
+    private final LinkedHashMap<SeaTunnelRow, Pair<Boolean, SeaTunnelRow>> 
buffer = new LinkedHashMap<>();
+
+    @Override
+    public void prepareStatements(Connection connection) throws SQLException {
+        insertOrUpdateExecutor.prepareStatements(connection);
+        deleteExecutor.prepareStatements(connection);
+    }
+
+    @Override
+    public void addToBatch(SeaTunnelRow record) throws SQLException {
+        if (RowKind.UPDATE_BEFORE.equals(record.getRowKind()) && 
ignoreUpdateBefore) {
+            return;
+        }
+
+        SeaTunnelRow key = keyExtractor.apply(record);
+        boolean changeFlag = changeFlag(record.getRowKind());
+        SeaTunnelRow value = valueTransform.apply(record);
+        buffer.put(key, Pair.of(changeFlag, value));
+    }
+
+    @Override
+    public void executeBatch() throws SQLException {
+        Boolean preChangeFlag = null;
+        Set<Map.Entry<SeaTunnelRow, Pair<Boolean, SeaTunnelRow>>> entrySet = 
buffer.entrySet();
+        for (Map.Entry<SeaTunnelRow, Pair<Boolean, SeaTunnelRow>> entry : 
entrySet) {
+            Boolean currentChangeFlag = entry.getValue().getKey();
+            if (currentChangeFlag) {
+                if (preChangeFlag != null && !preChangeFlag) {
+                    deleteExecutor.executeBatch();
+                }
+                insertOrUpdateExecutor.addToBatch(entry.getValue().getValue());
+            } else {
+                if (preChangeFlag != null && preChangeFlag) {
+                    insertOrUpdateExecutor.executeBatch();
+                }
+                deleteExecutor.addToBatch(entry.getKey());
+            }
+            preChangeFlag = currentChangeFlag;
+        }
+
+        if (preChangeFlag != null) {
+            if (preChangeFlag) {
+                insertOrUpdateExecutor.executeBatch();
+            } else {
+                deleteExecutor.executeBatch();
+            }
+        }
+        buffer.clear();
+    }
+
+    @Override
+    public void closeStatements() throws SQLException {
+        if (!buffer.isEmpty()) {
+            executeBatch();
+        }
+        insertOrUpdateExecutor.closeStatements();
+        deleteExecutor.closeStatements();
+    }
+
+    private boolean changeFlag(RowKind rowKind) {
+        switch (rowKind) {
+            case INSERT:
+            case UPDATE_AFTER:
+                return true;
+            case DELETE:
+            case UPDATE_BEFORE:
+                return false;
+            default:
+                throw new UnsupportedOperationException("Unsupported rowKind: 
" + rowKind);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SimpleBatchStatementExecutor.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SimpleBatchStatementExecutor.java
new file mode 100644
index 000000000..44b84dcb6
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SimpleBatchStatementExecutor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+@RequiredArgsConstructor
+public class SimpleBatchStatementExecutor implements 
JdbcBatchStatementExecutor {
+    @NonNull
+    private final StatementFactory statementFactory;
+    @NonNull
+    private final JdbcRowConverter converter;
+    private transient PreparedStatement statement;
+
+    @Override
+    public void prepareStatements(Connection connection) throws SQLException {
+        statement = statementFactory.createStatement(connection);
+    }
+
+    @Override
+    public void addToBatch(SeaTunnelRow record) throws SQLException {
+        converter.toExternal(record, statement);
+        statement.addBatch();
+    }
+
+    @Override
+    public void executeBatch() throws SQLException {
+        statement.executeBatch();
+        statement.clearBatch();
+    }
+
+    @Override
+    public void closeStatements() throws SQLException {
+        if (statement != null) {
+            statement.close();
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SqlUtils.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SqlUtils.java
new file mode 100644
index 000000000..6582a1747
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SqlUtils.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor;
+
+import static java.lang.String.format;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+public class SqlUtils {
+    public static String quoteIdentifier(String identifier) {
+        return "" + identifier + "";
+    }
+
+    public static String getInsertIntoStatement(String tableName,
+                                                String[] fieldNames) {
+        String columns = Arrays.stream(fieldNames)
+            .map(fieldName -> quoteIdentifier(fieldName))
+            .collect(Collectors.joining(", "));
+        String placeholders = Arrays.stream(fieldNames)
+            .map(fieldName -> "?")
+            .collect(Collectors.joining(", "));
+        return String.format("INSERT INTO %s (%s) VALUES (%s)",
+            tableName, columns, placeholders);
+    }
+
+    public static String getDeleteStatement(String tableName,
+                                            String[] conditionFields) {
+        String conditionClause = Arrays.stream(conditionFields)
+            .map(fieldName -> format("%s = ?", quoteIdentifier(fieldName)))
+            .collect(Collectors.joining(" AND "));
+        return String.format("" +
+                "SET allow_experimental_lightweight_delete = true;" +
+                "DELETE FROM %s WHERE %s", quoteIdentifier(tableName), 
conditionClause);
+    }
+
+    public static String getAlterTableUpdateStatement(String tableName,
+                                                      String[] fieldNames,
+                                                      String[] 
conditionFields) {
+        String setClause = Arrays.stream(fieldNames)
+            .map(fieldName -> String.format("%s = ?", 
quoteIdentifier(fieldName)))
+            .collect(Collectors.joining(", "));
+        String conditionClause = Arrays.stream(conditionFields)
+            .map(fieldName -> String.format("%s = ?", 
quoteIdentifier(fieldName)))
+            .collect(Collectors.joining(" AND "));
+        return String.format("ALTER TABLE %s UPDATE %s WHERE %s",
+            tableName, setClause, conditionClause);
+    }
+
+    public static String getAlterTableDeleteStatement(String tableName,
+                                                      String[] 
conditionFields) {
+        String conditionClause = Arrays.stream(conditionFields)
+            .map(fieldName -> format("%s = ?", quoteIdentifier(fieldName)))
+            .collect(Collectors.joining(" AND "));
+        return String.format("ALTER TABLE %s DELETE WHERE %s",
+            tableName, conditionClause);
+    }
+
+    public static String getRowExistsStatement(String tableName,
+                                               String[] conditionFields) {
+        String fieldExpressions = Arrays.stream(conditionFields)
+            .map(field -> format("%s = ?", quoteIdentifier(field)))
+            .collect(Collectors.joining(" AND "));
+        return String.format("SELECT 1 FROM %s WHERE %s",
+            quoteIdentifier(tableName), fieldExpressions);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/StatementFactory.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/StatementFactory.java
new file mode 100644
index 000000000..3093b39db
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/StatementFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+@FunctionalInterface
+public interface StatementFactory {
+
+    PreparedStatement createStatement(Connection connection) throws 
SQLException;
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index 7aa5010f2..1da905450 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -105,6 +105,7 @@ public class ClickhouseFileSink implements 
SeaTunnelSink<SeaTunnelRow, Clickhous
 
         ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
         Map<String, String> tableSchema = 
proxy.getClickhouseTableSchema(config.getString(TABLE.key()));
+        ClickhouseTable table = 
proxy.getClickhouseTable(config.getString(DATABASE.key()), 
config.getString(TABLE.key()));
         String shardKey = null;
         String shardKeyType = null;
         if (config.hasPath(SHARDING_KEY.key())) {
@@ -116,6 +117,7 @@ public class ClickhouseFileSink implements 
SeaTunnelSink<SeaTunnelRow, Clickhous
             shardKeyType,
             config.getString(DATABASE.key()),
             config.getString(TABLE.key()),
+            table.getEngine(),
             false, // we don't need to set splitMode in clickhouse file mode.
             new Shard(1, 1, nodes.get(0)), config.getString(USERNAME.key()), 
config.getString(PASSWORD.key()));
         List<String> fields;


Reply via email to