This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6093c213b [feature][connector-v2][clickhouse] Support write cdc
changelog event in clickhouse sink (#3653)
6093c213b is described below
commit 6093c213bf366caea945d9867da3f0f8bb4787f7
Author: hailin0 <[email protected]>
AuthorDate: Tue Dec 13 11:35:40 2022 +0800
[feature][connector-v2][clickhouse] Support write cdc changelog event in
clickhouse sink (#3653)
* [feature][connector-v2][clickhouse] Support write cdc changelog event in
clickhouse sink
MergeTree Table Engine:
- Support enable `allowExperimentalLightweightDelete` setting
- CDC Events:
- INSERT: INSERT INTO SQL
- UPDATE_BEFORE: DELETE SQL
- UPDATE_AFTER: INSERT INTO SQL
- DELETE: DELETE SQL
- ReplacingMergeTree Engine CDC Events:
- INSERT: INSERT INTO SQL
- UPDATE_BEFORE: ignore
- UPDATE_AFTER: INSERT INTO SQL
- DELETE: DELETE SQL
Other Table Engine:
- CDC Events:
- INSERT: INSERT INTO SQL
- UPDATE_BEFORE: ignore
- UPDATE_AFTER: ALTER TABLE UPDATE SQL
- DELETE: ALTER TABLE DELETE SQL
* update
* fix InsertOrUpdateBatchStatementExecutor
* add check
---
docs/en/connector-v2/sink/Clickhouse.md | 87 +++++++--
.../clickhouse/config/ClickhouseConfig.java | 21 +-
.../seatunnel/clickhouse/config/ReaderOption.java | 69 ++-----
.../seatunnel/clickhouse/shard/ShardMetadata.java | 128 ++-----------
.../clickhouse/sink/ClickhouseSinkFactory.java | 18 +-
.../clickhouse/sink/DistributedEngine.java | 37 +---
.../sink/client/ClickhouseBatchStatement.java | 13 +-
.../clickhouse/sink/client/ClickhouseProxy.java | 42 ++--
.../clickhouse/sink/client/ClickhouseSink.java | 40 +++-
.../sink/client/ClickhouseSinkWriter.java | 120 ++----------
.../clickhouse/sink/client/ShardRouter.java | 10 +-
.../executor/BufferedBatchStatementExecutor.java | 70 +++++++
.../InsertOrUpdateBatchStatementExecutor.java | 138 ++++++++++++++
.../executor/JdbcBatchStatementExecutor.java} | 43 ++---
.../JdbcBatchStatementExecutorBuilder.java | 211 +++++++++++++++++++++
.../sink/client/executor/JdbcRowConverter.java | 129 +++++++++++++
.../ReduceBufferedBatchStatementExecutor.java | 119 ++++++++++++
.../executor/SimpleBatchStatementExecutor.java | 60 ++++++
.../clickhouse/sink/client/executor/SqlUtils.java | 82 ++++++++
.../sink/client/executor/StatementFactory.java | 29 +++
.../clickhouse/sink/file/ClickhouseFileSink.java | 2 +
21 files changed, 1084 insertions(+), 384 deletions(-)
diff --git a/docs/en/connector-v2/sink/Clickhouse.md
b/docs/en/connector-v2/sink/Clickhouse.md
index 4f1f0e16c..90bec5fc9 100644
--- a/docs/en/connector-v2/sink/Clickhouse.md
+++ b/docs/en/connector-v2/sink/Clickhouse.md
@@ -13,6 +13,7 @@ Used to write data to Clickhouse.
The Clickhouse sink plug-in can achieve accuracy once by implementing
idempotent writing, and needs to cooperate with aggregatingmergetree and other
engines that support deduplication.
- [ ] [schema projection](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
:::tip
@@ -22,19 +23,22 @@ Write data to Clickhouse can also be done using JDBC
## Options
-| name | type | required | default value |
-|----------------|--------|----------|---------------|
-| host | string | yes | - |
-| database | string | yes | - |
-| table | string | yes | - |
-| username | string | yes | - |
-| password | string | yes | - |
-| fields | string | yes | - |
-| clickhouse.* | string | no | |
-| bulk_size | string | no | 20000 |
-| split_mode | string | no | false |
-| sharding_key | string | no | - |
-| common-options | | no | - |
+| name | type | required | default value |
+|---------------------------------------|---------|----------|---------------|
+| host | string | yes | - |
+| database | string | yes | - |
+| table | string | yes | - |
+| username | string | yes | - |
+| password | string | yes | - |
+| fields | string | yes | - |
+| clickhouse.* | string | no | |
+| bulk_size | string | no | 20000 |
+| split_mode | string | no | false |
+| sharding_key | string | no | - |
+| primary_key | string | no | - |
+| support_upsert | boolean | no | false |
+| allow_experimental_lightweight_delete | boolean | no | false |
+| common-options | | no | - |
### host [string]
@@ -82,39 +86,90 @@ When use split_mode, which node to send data to is a
problem, the default is ran
'sharding_key' parameter can be used to specify the field for the sharding
algorithm. This option only
worked when 'split_mode' is true.
+### primary_key [string]
+
+Mark the primary key column from clickhouse table, and based on primary key
execute INSERT/UPDATE/DELETE to clickhouse table
+
+### support_upsert [boolean]
+
+Support upsert row by query primary key
+
+### allow_experimental_lightweight_delete [boolean]
+
+Allow experimental lightweight delete based on `*MergeTree` table engine
+
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
## Examples
+Simple
+
```hocon
sink {
+ Clickhouse {
+ host = "localhost:8123"
+ database = "default"
+ table = "fake_all"
+ username = "default"
+ password = ""
+ }
+}
+```
+
+Split mode
+```hocon
+sink {
Clickhouse {
host = "localhost:8123"
database = "default"
table = "fake_all"
username = "default"
password = ""
+
+ # split mode options
split_mode = true
sharding_key = "age"
}
-
}
```
+CDC(Change data capture)
+
```hocon
sink {
+ Clickhouse {
+ host = "localhost:8123"
+ database = "default"
+ table = "fake_all"
+ username = "default"
+ password = ""
+
+ # cdc options
+ primary_key = "id"
+ support_upsert = true
+ }
+}
+```
+CDC(Change data capture) for *MergeTree engine
+
+```hocon
+sink {
Clickhouse {
host = "localhost:8123"
database = "default"
table = "fake_all"
username = "default"
password = ""
+
+ # cdc options
+ primary_key = "id"
+ support_upsert = true
+ allow_experimental_lightweight_delete = true
}
-
}
```
@@ -132,3 +187,5 @@ sink {
- [Improve] Clickhouse Sink support nest type and array
type([3047](https://github.com/apache/incubator-seatunnel/pull/3047))
- [Improve] Clickhouse Sink support geo
type([3141](https://github.com/apache/incubator-seatunnel/pull/3141))
+
+- [Feature] Support CDC write DELETE/UPDATE/INSERT events
([3653](https://github.com/apache/incubator-seatunnel/pull/3653))
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
index 3f48df990..9f93eb36c 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
@@ -81,8 +81,25 @@ public class ClickhouseConfig {
/**
* When split_mode is true, the sharding_key use for split
*/
- public static final Option<String> SHARDING_KEY =
Options.key("sharding_key").stringType()
- .noDefaultValue().withDescription("When split_mode is true, the
sharding_key use for split");
+ public static final Option<String> SHARDING_KEY =
Options.key("sharding_key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("When split_mode is true, the sharding_key use for
split");
+
+ public static final Option<String> PRIMARY_KEY = Options.key("primary_key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Mark the primary key column from clickhouse table,
and based on primary key execute INSERT/UPDATE/DELETE to clickhouse table");
+
+ public static final Option<Boolean> SUPPORT_UPSERT =
Options.key("support_upsert")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Support upsert row by query primary key");
+
+ public static final Option<Boolean> ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE
= Options.key("allow_experimental_lightweight_delete")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Allow experimental lightweight delete based on
`*MergeTree` table engine");
/**
* ClickhouseFile sink connector used clickhouse-local program's path
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java
index 084f54bcc..59f711741 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java
@@ -20,75 +20,28 @@ package
org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+@Builder
+@Getter
public class ReaderOption implements Serializable {
private ShardMetadata shardMetadata;
private List<String> fields;
-
+ private String[] primaryKeys;
+ private boolean allowExperimentalLightweightDelete;
+ private boolean supportUpsert;
+ private String tableEngine;
private Map<String, String> tableSchema;
+ @Setter
private SeaTunnelRowType seaTunnelRowType;
private Properties properties;
private int bulkSize;
-
- public ReaderOption(ShardMetadata shardMetadata,
- Properties properties, List<String> fields,
Map<String, String> tableSchema, int bulkSize) {
- this.shardMetadata = shardMetadata;
- this.properties = properties;
- this.fields = fields;
- this.tableSchema = tableSchema;
- this.bulkSize = bulkSize;
- }
-
- public Properties getProperties() {
- return properties;
- }
-
- public void setProperties(Properties properties) {
- this.properties = properties;
- }
-
- public ShardMetadata getShardMetadata() {
- return shardMetadata;
- }
-
- public void setShardMetadata(ShardMetadata shardMetadata) {
- this.shardMetadata = shardMetadata;
- }
-
- public SeaTunnelRowType getSeaTunnelRowType() {
- return seaTunnelRowType;
- }
-
- public void setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
- }
-
- public Map<String, String> getTableSchema() {
- return tableSchema;
- }
-
- public void setTableSchema(Map<String, String> tableSchema) {
- this.tableSchema = tableSchema;
- }
-
- public List<String> getFields() {
- return fields;
- }
-
- public void setFields(List<String> fields) {
- this.fields = fields;
- }
-
- public int getBulkSize() {
- return bulkSize;
- }
-
- public void setBulkSize(int bulkSize) {
- this.bulkSize = bulkSize;
- }
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java
index c40344b2a..7a8e98624 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java
@@ -17,9 +17,15 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.shard;
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
import java.io.Serializable;
-import java.util.Objects;
+@Getter
+@EqualsAndHashCode
+@AllArgsConstructor
public class ShardMetadata implements Serializable {
private static final long serialVersionUID = -1L;
@@ -28,6 +34,7 @@ public class ShardMetadata implements Serializable {
private String shardKeyType;
private String database;
private String table;
+ private String tableEngine;
private boolean splitMode;
private Shard defaultShard;
private String username;
@@ -37,123 +44,10 @@ public class ShardMetadata implements Serializable {
String shardKeyType,
String database,
String table,
- boolean splitMode,
- Shard defaultShard,
- String username,
- String password) {
- this.shardKey = shardKey;
- this.shardKeyType = shardKeyType;
- this.database = database;
- this.table = table;
- this.splitMode = splitMode;
- this.defaultShard = defaultShard;
- this.username = username;
- this.password = password;
- }
-
- public ShardMetadata(String shardKey,
- String shardKeyType,
- String database,
- String table,
+ String tableEngine,
boolean splitMode,
Shard defaultShard) {
- this.shardKey = shardKey;
- this.shardKeyType = shardKeyType;
- this.database = database;
- this.table = table;
- this.splitMode = splitMode;
- this.defaultShard = defaultShard;
- }
-
- public String getShardKey() {
- return shardKey;
- }
-
- public void setShardKey(String shardKey) {
- this.shardKey = shardKey;
- }
-
- public String getShardKeyType() {
- return shardKeyType;
- }
-
- public void setShardKeyType(String shardKeyType) {
- this.shardKeyType = shardKeyType;
- }
-
- public String getDatabase() {
- return database;
- }
-
- public void setDatabase(String database) {
- this.database = database;
- }
-
- public String getTable() {
- return table;
- }
-
- public void setTable(String table) {
- this.table = table;
- }
-
- public boolean getSplitMode() {
- return splitMode;
- }
-
- public void setSplitMode(boolean splitMode) {
- this.splitMode = splitMode;
- }
-
- public Shard getDefaultShard() {
- return defaultShard;
- }
-
- public void setDefaultShard(Shard defaultShard) {
- this.defaultShard = defaultShard;
- }
-
- public boolean isSplitMode() {
- return splitMode;
- }
-
- public String getUsername() {
- return username;
- }
-
- public void setUsername(String username) {
- this.username = username;
- }
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- ShardMetadata that = (ShardMetadata) o;
- return splitMode == that.splitMode
- && Objects.equals(shardKey, that.shardKey)
- && Objects.equals(shardKeyType, that.shardKeyType)
- && Objects.equals(database, that.database)
- && Objects.equals(table, that.table)
- && Objects.equals(defaultShard, that.defaultShard)
- && Objects.equals(username, that.username)
- && Objects.equals(password, that.password);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(shardKey, shardKeyType, database, table,
splitMode, defaultShard, username, password);
+ this(shardKey, shardKeyType, database, table, tableEngine,
+ splitMode, defaultShard, null, null);
}
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
index 3a9dedf44..03d71f136 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
@@ -17,14 +17,17 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_PREFIX;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SUPPORT_UPSERT;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
@@ -43,8 +46,17 @@ public class ClickhouseSinkFactory implements
TableSinkFactory {
@Override
public OptionRule optionRule() {
- return OptionRule.builder().required(HOST, DATABASE, TABLE)
- .optional(CLICKHOUSE_PREFIX, BULK_SIZE, SPLIT_MODE, FIELDS,
SHARDING_KEY)
- .bundled(USERNAME, PASSWORD).build();
+ return OptionRule.builder()
+ .required(HOST, DATABASE, TABLE)
+ .optional(CLICKHOUSE_PREFIX,
+ BULK_SIZE,
+ SPLIT_MODE,
+ FIELDS,
+ SHARDING_KEY,
+ PRIMARY_KEY,
+ SUPPORT_UPSERT,
+ ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE)
+ .bundled(USERNAME, PASSWORD)
+ .build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
index 6a15d5919..067f09fdb 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
@@ -17,42 +17,19 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
import java.io.Serializable;
+@AllArgsConstructor
+@Getter
public class DistributedEngine implements Serializable {
private static final long serialVersionUID = -1L;
private String clusterName;
private String database;
private String table;
-
- public DistributedEngine(String clusterName, String database, String
table) {
- this.clusterName = clusterName;
- this.database = database;
- this.table = table;
- }
-
- public String getClusterName() {
- return clusterName;
- }
-
- public void setClusterName(String clusterName) {
- this.clusterName = clusterName;
- }
-
- public String getDatabase() {
- return database;
- }
-
- public void setDatabase(String database) {
- this.database = database;
- }
-
- public String getTable() {
- return table;
- }
-
- public void setTable(String table) {
- this.table = table;
- }
+ private String tableEngine;
+ private String tableDDL;
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java
index ae525acee..180cb4023 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java
@@ -17,23 +17,22 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.tool.IntHolder;
import com.clickhouse.jdbc.internal.ClickHouseConnectionImpl;
-import java.sql.PreparedStatement;
-
public class ClickhouseBatchStatement {
private final ClickHouseConnectionImpl clickHouseConnection;
- private final PreparedStatement preparedStatement;
+ private final JdbcBatchStatementExecutor jdbcBatchStatementExecutor;
private final IntHolder intHolder;
public ClickhouseBatchStatement(ClickHouseConnectionImpl
clickHouseConnection,
- PreparedStatement preparedStatement,
+ JdbcBatchStatementExecutor
jdbcBatchStatementExecutor,
IntHolder intHolder) {
this.clickHouseConnection = clickHouseConnection;
- this.preparedStatement = preparedStatement;
+ this.jdbcBatchStatementExecutor = jdbcBatchStatementExecutor;
this.intHolder = intHolder;
}
@@ -41,8 +40,8 @@ public class ClickhouseBatchStatement {
return clickHouseConnection;
}
- public PreparedStatement getPreparedStatement() {
- return preparedStatement;
+ public JdbcBatchStatementExecutor getJdbcBatchStatementExecutor() {
+ return jdbcBatchStatementExecutor;
}
public IntHolder getIntHolder() {
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
index 67b4db7aa..def00f912 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
@@ -66,11 +66,6 @@ public class ClickhouseProxy {
return
c.connect(shard.getNode()).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
}
- public DistributedEngine getClickhouseDistributedTable(String database,
String table) {
- ClickHouseRequest<?> request = getClickhouseConnection();
- return getClickhouseDistributedTable(request, database, table);
- }
-
public DistributedEngine
getClickhouseDistributedTable(ClickHouseRequest<?> connection, String database,
String table) {
String sql = String.format("select engine_full from system.tables
where database = '%s' and name = '%s' and engine = 'Distributed'", database,
table);
@@ -82,7 +77,30 @@ public class ClickhouseProxy {
String engineFull = record.getValue(0).asString();
List<String> infos =
Arrays.stream(engineFull.substring(12).split(","))
.map(s -> s.replace("'",
"").trim()).collect(Collectors.toList());
- return new DistributedEngine(infos.get(0), infos.get(1),
infos.get(2).replace("\\)", "").trim());
+
+ String clusterName = infos.get(0);
+ String localDatabase = infos.get(1);
+ String localTable = infos.get(2).replace("\\)", "").trim();
+
+ String localTableSQL = String.format("select
engine,create_table_query from system.tables where database = '%s' and name =
'%s'",
+ localDatabase, localTable);
+ String localTableDDL;
+ String localTableEngine;
+ try (ClickHouseResponse localTableResponse =
clickhouseRequest.query(localTableSQL).executeAndWait()) {
+ List<ClickHouseRecord> localTableRecords =
localTableResponse.stream().collect(Collectors.toList());
+ if (localTableRecords.isEmpty()) {
+ throw new
ClickhouseConnectorException(SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot
get table from clickhouse, resultSet is empty");
+ }
+ localTableEngine =
localTableRecords.get(0).getValue(0).asString();
+ localTableDDL =
localTableRecords.get(0).getValue(1).asString();
+ localTableDDL = localizationEngine(localTableEngine,
localTableDDL);
+ }
+
+ return new DistributedEngine(clusterName,
+ localDatabase,
+ localTable,
+ localTableEngine,
+ localTableDDL);
}
throw new
ClickhouseConnectorException(SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot
get distributed table from clickhouse, resultSet is empty");
} catch (ClickHouseException e) {
@@ -163,17 +181,7 @@ public class ClickhouseProxy {
DistributedEngine distributedEngine = null;
if ("Distributed".equals(engine)) {
distributedEngine =
getClickhouseDistributedTable(clickhouseRequest, database, table);
- String localTableSQL = String.format("select
engine,create_table_query from system.tables where database = '%s' and name =
'%s'",
- distributedEngine.getDatabase(),
distributedEngine.getTable());
- try (ClickHouseResponse rs =
clickhouseRequest.query(localTableSQL).executeAndWait()) {
- List<ClickHouseRecord> localTableRecords =
rs.stream().collect(Collectors.toList());
- if (localTableRecords.isEmpty()) {
- throw new
ClickhouseConnectorException(SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot
get table from clickhouse, resultSet is empty");
- }
- String localEngine =
localTableRecords.get(0).getValue(0).asString();
- String createLocalTableDDL =
localTableRecords.get(0).getValue(1).asString();
- createTableDDL = localizationEngine(localEngine,
createLocalTableDDL);
- }
+ createTableDDL = distributedEngine.getTableDDL();
}
return new ClickhouseTable(
database,
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index b87c8307e..61149daad 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -17,14 +17,17 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_PREFIX;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SUPPORT_UPSERT;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
@@ -64,6 +67,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
@@ -126,9 +130,9 @@ public class ClickhouseSink implements
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
Map<String, String> tableSchema =
proxy.getClickhouseTableSchema(config.getString(TABLE.key()));
String shardKey = null;
String shardKeyType = null;
+ ClickhouseTable table =
proxy.getClickhouseTable(config.getString(DATABASE.key()),
+ config.getString(TABLE.key()));
if (config.getBoolean(SPLIT_MODE.key())) {
- ClickhouseTable table =
proxy.getClickhouseTable(config.getString(DATABASE.key()),
- config.getString(TABLE.key()));
if (!"Distributed".equals(table.getEngine())) {
throw new
ClickhouseConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "split mode only
support table which engine is " +
"'Distributed' engine at now");
@@ -146,6 +150,7 @@ public class ClickhouseSink implements
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
shardKeyType,
config.getString(DATABASE.key()),
config.getString(TABLE.key()),
+ table.getEngine(),
config.getBoolean(SPLIT_MODE.key()),
new Shard(1, 1, nodes.get(0)),
config.getString(USERNAME.key()), config.getString(PASSWORD.key()));
} else {
@@ -154,6 +159,7 @@ public class ClickhouseSink implements
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
shardKeyType,
config.getString(DATABASE.key()),
config.getString(TABLE.key()),
+ table.getEngine(),
config.getBoolean(SPLIT_MODE.key()),
new Shard(1, 1, nodes.get(0)));
}
@@ -171,7 +177,35 @@ public class ClickhouseSink implements
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
fields.addAll(tableSchema.keySet());
}
proxy.close();
- this.option = new ReaderOption(metadata, clickhouseProperties, fields,
tableSchema, config.getInt(BULK_SIZE.key()));
+
+ String[] primaryKeys = null;
+ if (config.hasPath(PRIMARY_KEY.key())) {
+ String primaryKey = config.getString(PRIMARY_KEY.key());
+ if (shardKey != null && !Objects.equals(primaryKey, shardKey)) {
+ throw new
ClickhouseConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ "sharding_key and primary_key must be consistent to ensure
correct processing of cdc events");
+ }
+ primaryKeys = new String[]{primaryKey};
+ }
+ boolean supportUpsert = SUPPORT_UPSERT.defaultValue();
+ if (config.hasPath(SUPPORT_UPSERT.key())) {
+ supportUpsert = config.getBoolean(SUPPORT_UPSERT.key());
+ }
+ boolean allowExperimentalLightweightDelete =
ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.defaultValue();
+ if (config.hasPath(ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.key())) {
+ allowExperimentalLightweightDelete =
config.getBoolean(ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.key());
+ }
+ this.option = ReaderOption.builder()
+ .shardMetadata(metadata)
+ .properties(clickhouseProperties)
+ .fields(fields)
+ .tableEngine(table.getEngine())
+ .tableSchema(tableSchema)
+ .bulkSize(config.getInt(BULK_SIZE.key()))
+ .primaryKeys(primaryKeys)
+ .supportUpsert(supportUpsert)
+
.allowExperimentalLightweightDelete(allowExperimentalLightweightDelete)
+ .build();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
index b103ecccc..deb015453 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
@@ -24,36 +24,21 @@ import
org.apache.seatunnel.common.exception.CommonErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.ArrayInjectFunction;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.BigDecimalInjectFunction;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.ClickhouseFieldInjectFunction;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DateInjectFunction;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DateTimeInjectFunction;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DoubleInjectFunction;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.FloatInjectFunction;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.IntInjectFunction;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.LongInjectFunction;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.MapInjectFunction;
-import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.StringInjectFunction;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcBatchStatementExecutor;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcBatchStatementExecutorBuilder;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.tool.IntHolder;
import com.clickhouse.jdbc.internal.ClickHouseConnectionImpl;
-import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
-import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
@Slf4j
public class ClickhouseSinkWriter implements SinkWriter<SeaTunnelRow,
CKCommitInfo, ClickhouseSinkState> {
@@ -62,22 +47,14 @@ public class ClickhouseSinkWriter implements
SinkWriter<SeaTunnelRow, CKCommitIn
private final ReaderOption option;
private final ShardRouter shardRouter;
private final transient ClickhouseProxy proxy;
- private final String prepareSql;
private final Map<Shard, ClickhouseBatchStatement> statementMap;
- private final Map<String, ClickhouseFieldInjectFunction>
fieldInjectFunctionMap;
- private static final ClickhouseFieldInjectFunction DEFAULT_INJECT_FUNCTION
= new StringInjectFunction();
-
- private static final Pattern NULLABLE =
Pattern.compile("Nullable\\((.*)\\)");
- private static final Pattern LOW_CARDINALITY =
Pattern.compile("LowCardinality\\((.*)\\)");
ClickhouseSinkWriter(ReaderOption option, Context context) {
this.option = option;
this.context = context;
this.proxy = new
ClickhouseProxy(option.getShardMetadata().getDefaultShard().getNode());
- this.fieldInjectFunctionMap = initFieldInjectFunctionMap();
this.shardRouter = new ShardRouter(proxy, option.getShardMetadata());
- this.prepareSql = initPrepareSQL();
this.statementMap = initStatementMap();
}
@@ -90,7 +67,7 @@ public class ClickhouseSinkWriter implements
SinkWriter<SeaTunnelRow, CKCommitIn
shardKey = element.getField(i);
}
ClickhouseBatchStatement statement =
statementMap.get(shardRouter.getShard(shardKey));
- PreparedStatement clickHouseStatement =
statement.getPreparedStatement();
+ JdbcBatchStatementExecutor clickHouseStatement =
statement.getJdbcBatchStatementExecutor();
IntHolder sizeHolder = statement.getIntHolder();
// add into batch
addIntoBatch(element, clickHouseStatement);
@@ -117,7 +94,7 @@ public class ClickhouseSinkWriter implements
SinkWriter<SeaTunnelRow, CKCommitIn
this.proxy.close();
for (ClickhouseBatchStatement batchStatement : statementMap.values()) {
try (ClickHouseConnectionImpl needClosedConnection =
batchStatement.getClickHouseConnection();
- PreparedStatement needClosedStatement =
batchStatement.getPreparedStatement()) {
+ JdbcBatchStatementExecutor needClosedStatement =
batchStatement.getJdbcBatchStatementExecutor()) {
IntHolder intHolder = batchStatement.getIntHolder();
if (intHolder.getValue() > 0) {
flush(needClosedStatement);
@@ -129,29 +106,15 @@ public class ClickhouseSinkWriter implements
SinkWriter<SeaTunnelRow, CKCommitIn
}
}
- private void addIntoBatch(SeaTunnelRow row, PreparedStatement
clickHouseStatement) {
+ private void addIntoBatch(SeaTunnelRow row, JdbcBatchStatementExecutor
clickHouseStatement) {
try {
- for (int i = 0; i < option.getFields().size(); i++) {
- String fieldName = option.getFields().get(i);
- Object fieldValue =
row.getField(option.getSeaTunnelRowType().indexOf(fieldName));
- if (fieldValue == null) {
- // field does not exist in row
- // todo: do we need to transform to default value of each
type
- clickHouseStatement.setObject(i + 1, null);
- continue;
- }
- String fieldType = option.getTableSchema().get(fieldName);
- fieldInjectFunctionMap
- .getOrDefault(fieldType, DEFAULT_INJECT_FUNCTION)
- .injectFields(clickHouseStatement, i + 1, fieldValue);
- }
- clickHouseStatement.addBatch();
+ clickHouseStatement.addToBatch(row);
} catch (SQLException e) {
throw new
ClickhouseConnectorException(CommonErrorCode.SQL_OPERATION_FAILED, "Add row
data into batch error", e);
}
}
- private void flush(PreparedStatement clickHouseStatement) {
+ private void flush(JdbcBatchStatementExecutor clickHouseStatement) {
try {
clickHouseStatement.executeBatch();
} catch (Exception e) {
@@ -165,10 +128,21 @@ public class ClickhouseSinkWriter implements
SinkWriter<SeaTunnelRow, CKCommitIn
try {
ClickHouseConnectionImpl clickhouseConnection = new
ClickHouseConnectionImpl(s.getJdbcUrl(),
this.option.getProperties());
- PreparedStatement preparedStatement =
clickhouseConnection.prepareStatement(prepareSql);
+
+ JdbcBatchStatementExecutor jdbcBatchStatementExecutor = new
JdbcBatchStatementExecutorBuilder()
+ .setTable(shardRouter.getShardTable())
+ .setTableEngine(shardRouter.getShardTableEngine())
+ .setRowType(option.getSeaTunnelRowType())
+ .setPrimaryKeys(option.getPrimaryKeys())
+ .setClickhouseTableSchema(option.getTableSchema())
+ .setProjectionFields(option.getFields().toArray(new
String[0]))
+
.setAllowExperimentalLightweightDelete(option.isAllowExperimentalLightweightDelete())
+ .setSupportUpsert(option.isSupportUpsert())
+ .build();
+
jdbcBatchStatementExecutor.prepareStatements(clickhouseConnection);
IntHolder intHolder = new IntHolder();
ClickhouseBatchStatement batchStatement =
- new ClickhouseBatchStatement(clickhouseConnection,
preparedStatement, intHolder);
+ new ClickhouseBatchStatement(clickhouseConnection,
jdbcBatchStatementExecutor, intHolder);
result.put(s, batchStatement);
} catch (SQLException e) {
throw new
ClickhouseConnectorException(CommonErrorCode.SQL_OPERATION_FAILED, "Clickhouse
prepare statement error: " + e.getMessage(), e);
@@ -176,58 +150,4 @@ public class ClickhouseSinkWriter implements
SinkWriter<SeaTunnelRow, CKCommitIn
});
return result;
}
-
- private String initPrepareSQL() {
- String[] placeholder = new String[option.getFields().size()];
- Arrays.fill(placeholder, "?");
-
- return String.format("INSERT INTO %s (%s) VALUES (%s)",
- shardRouter.getShardTable(),
- String.join(",", option.getFields()),
- String.join(",", placeholder));
- }
-
- private Map<String, ClickhouseFieldInjectFunction>
initFieldInjectFunctionMap() {
- Map<String, ClickhouseFieldInjectFunction> result = new
HashMap<>(Common.COLLECTION_SIZE);
- List<ClickhouseFieldInjectFunction> clickhouseFieldInjectFunctions;
- ClickhouseFieldInjectFunction defaultFunction = new
StringInjectFunction();
- // get field type
- for (String field : this.option.getFields()) {
- clickhouseFieldInjectFunctions = Lists.newArrayList(
- new ArrayInjectFunction(),
- new MapInjectFunction(),
- new BigDecimalInjectFunction(),
- new DateInjectFunction(),
- new DateTimeInjectFunction(),
- new LongInjectFunction(),
- new DoubleInjectFunction(),
- new FloatInjectFunction(),
- new IntInjectFunction(),
- new StringInjectFunction()
- );
- ClickhouseFieldInjectFunction function = defaultFunction;
- String fieldType = this.option.getTableSchema().get(field);
- for (ClickhouseFieldInjectFunction clickhouseFieldInjectFunction :
clickhouseFieldInjectFunctions) {
- if
(clickhouseFieldInjectFunction.isCurrentFieldType(unwrapCommonPrefix(fieldType)))
{
- function = clickhouseFieldInjectFunction;
- break;
- }
- }
- result.put(fieldType, function);
- }
- return result;
- }
-
- private String unwrapCommonPrefix(String fieldType) {
- Matcher nullMatcher = NULLABLE.matcher(fieldType);
- Matcher lowMatcher = LOW_CARDINALITY.matcher(fieldType);
- if (nullMatcher.matches()) {
- return nullMatcher.group(1);
- } else if (lowMatcher.matches()) {
- return lowMatcher.group(1);
- } else {
- return fieldType;
- }
- }
-
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
index 71e6430fc..b31fb15ae 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
@@ -40,7 +40,9 @@ public class ShardRouter implements Serializable {
private static final long serialVersionUID = -1L;
private String shardTable;
+ private String shardTableEngine;
private final String table;
+ private final String tableEngine;
private int shardWeightCount;
private final TreeMap<Integer, Shard> shards;
private final String shardKey;
@@ -54,8 +56,9 @@ public class ShardRouter implements Serializable {
this.shards = new TreeMap<>();
this.shardKey = shardMetadata.getShardKey();
this.shardKeyType = shardMetadata.getShardKeyType();
- this.splitMode = shardMetadata.getSplitMode();
+ this.splitMode = shardMetadata.isSplitMode();
this.table = shardMetadata.getTable();
+ this.tableEngine = shardMetadata.getTableEngine();
if (StringUtils.isNotEmpty(shardKey) &&
StringUtils.isEmpty(shardKeyType)) {
throw new
ClickhouseConnectorException(ClickhouseConnectorErrorCode.SHARD_KEY_NOT_FOUND,
"Shard key " + shardKey + " not found in table " + table);
}
@@ -63,6 +66,7 @@ public class ShardRouter implements Serializable {
if (splitMode) {
DistributedEngine localTable =
proxy.getClickhouseDistributedTable(connection, shardMetadata.getDatabase(),
table);
this.shardTable = localTable.getTable();
+ this.shardTableEngine = localTable.getTableEngine();
List<Shard> shardList = proxy.getClusterShardList(connection,
localTable.getClusterName(),
localTable.getDatabase(),
shardMetadata.getDefaultShard().getNode().getPort(),
shardMetadata.getUsername(), shardMetadata.getPassword());
@@ -81,6 +85,10 @@ public class ShardRouter implements Serializable {
return splitMode ? shardTable : table;
}
+ public String getShardTableEngine() {
+ return splitMode ? shardTableEngine : tableEngine;
+ }
+
public Shard getShard(Object shardValue) {
if (!splitMode) {
return shards.firstEntry().getValue();
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/BufferedBatchStatementExecutor.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/BufferedBatchStatementExecutor.java
new file mode 100644
index 000000000..7b5a4d249
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/BufferedBatchStatementExecutor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+@RequiredArgsConstructor
+public class BufferedBatchStatementExecutor implements
JdbcBatchStatementExecutor {
+ @NonNull
+ private final JdbcBatchStatementExecutor statementExecutor;
+ @NonNull
+ private final Function<SeaTunnelRow, SeaTunnelRow> valueTransform;
+ @NonNull
+ private final List<SeaTunnelRow> buffer = new ArrayList<>();
+
+ @Override
+ public void prepareStatements(Connection connection) throws SQLException {
+ statementExecutor.prepareStatements(connection);
+ }
+
+ @Override
+ public void addToBatch(SeaTunnelRow record) throws SQLException {
+ buffer.add(valueTransform.apply(record));
+ }
+
+ @Override
+ public void executeBatch() throws SQLException {
+ if (!buffer.isEmpty()) {
+ for (SeaTunnelRow row : buffer) {
+ statementExecutor.addToBatch(row);
+ }
+ statementExecutor.executeBatch();
+ buffer.clear();
+ }
+ }
+
+ @Override
+ public void closeStatements() throws SQLException {
+ if (!buffer.isEmpty()) {
+ executeBatch();
+ }
+ if (statementExecutor != null) {
+ statementExecutor.closeStatements();
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/InsertOrUpdateBatchStatementExecutor.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/InsertOrUpdateBatchStatementExecutor.java
new file mode 100644
index 000000000..a824772af
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/InsertOrUpdateBatchStatementExecutor.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.function.Function;
+
+@RequiredArgsConstructor
+public class InsertOrUpdateBatchStatementExecutor implements
JdbcBatchStatementExecutor {
+ private final StatementFactory existStmtFactory;
+ @NonNull
+ private final StatementFactory insertStmtFactory;
+ @NonNull
+ private final StatementFactory updateStmtFactory;
+ private final Function<SeaTunnelRow, SeaTunnelRow> keyExtractor;
+ private final JdbcRowConverter keyRowConverter;
+ @NonNull
+ private final JdbcRowConverter valueRowConverter;
+ private transient PreparedStatement existStatement;
+ private transient PreparedStatement insertStatement;
+ private transient PreparedStatement updateStatement;
+ private transient Boolean preChangeFlag;
+ private transient boolean submitted;
+
+ public InsertOrUpdateBatchStatementExecutor(StatementFactory
insertStmtFactory,
+ StatementFactory
updateStmtFactory,
+ JdbcRowConverter rowConverter)
{
+ this(null, insertStmtFactory, updateStmtFactory,
+ null, null, rowConverter);
+ }
+
+ @Override
+ public void prepareStatements(Connection connection) throws SQLException {
+ if (upsertMode()) {
+ existStatement = existStmtFactory.createStatement(connection);
+ }
+ insertStatement = insertStmtFactory.createStatement(connection);
+ updateStatement = updateStmtFactory.createStatement(connection);
+ }
+
+ private boolean upsertMode() {
+ return existStmtFactory != null;
+ }
+
+ private boolean hasInsert(SeaTunnelRow record) throws SQLException {
+ if (upsertMode()) {
+ return !exist(keyExtractor.apply(record));
+ }
+ switch (record.getRowKind()) {
+ case INSERT:
+ return true;
+ case UPDATE_AFTER:
+ return false;
+ default:
+ // todo
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Override
+ public void addToBatch(SeaTunnelRow record) throws SQLException {
+ boolean currentChangeFlag = hasInsert(record);
+ if (currentChangeFlag) {
+ if (preChangeFlag != null && !preChangeFlag) {
+ updateStatement.executeBatch();
+ updateStatement.clearBatch();
+ }
+ valueRowConverter.toExternal(record, insertStatement);
+ insertStatement.addBatch();
+ } else {
+ if (preChangeFlag != null && preChangeFlag) {
+ insertStatement.executeBatch();
+ insertStatement.clearBatch();
+ }
+ valueRowConverter.toExternal(record, updateStatement);
+ updateStatement.addBatch();
+ }
+ preChangeFlag = currentChangeFlag;
+ submitted = false;
+ }
+
+ @Override
+ public void executeBatch() throws SQLException {
+ if (preChangeFlag != null) {
+ if (preChangeFlag) {
+ insertStatement.executeBatch();
+ insertStatement.clearBatch();
+ } else {
+ updateStatement.executeBatch();
+ updateStatement.clearBatch();
+ }
+ }
+ submitted = true;
+ }
+
+ @Override
+ public void closeStatements() throws SQLException {
+ if (!submitted) {
+ executeBatch();
+ }
+ for (PreparedStatement statement : Arrays.asList(existStatement,
insertStatement, updateStatement)) {
+ if (statement != null) {
+ statement.close();
+ }
+ }
+ }
+
+ private boolean exist(SeaTunnelRow pk) throws SQLException {
+ keyRowConverter.toExternal(pk, existStatement);
+ try (ResultSet resultSet = existStatement.executeQuery()) {
+ return resultSet.next();
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutor.java
similarity index 50%
copy from
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
copy to
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutor.java
index 6a15d5919..961a11ad5 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutor.java
@@ -15,44 +15,25 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink;
+package
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor;
-import java.io.Serializable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-public class DistributedEngine implements Serializable {
+import java.sql.Connection;
+import java.sql.SQLException;
- private static final long serialVersionUID = -1L;
- private String clusterName;
- private String database;
- private String table;
+public interface JdbcBatchStatementExecutor extends AutoCloseable{
- public DistributedEngine(String clusterName, String database, String
table) {
- this.clusterName = clusterName;
- this.database = database;
- this.table = table;
- }
-
- public String getClusterName() {
- return clusterName;
- }
-
- public void setClusterName(String clusterName) {
- this.clusterName = clusterName;
- }
+ void prepareStatements(Connection connection) throws SQLException;
- public String getDatabase() {
- return database;
- }
+ void addToBatch(SeaTunnelRow record) throws SQLException;
- public void setDatabase(String database) {
- this.database = database;
- }
+ void executeBatch() throws SQLException;
- public String getTable() {
- return table;
- }
+ void closeStatements() throws SQLException;
- public void setTable(String table) {
- this.table = table;
+ @Override
+ default void close() throws SQLException {
+ closeStatements();
}
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java
new file mode 100644
index 000000000..cf2508841
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import lombok.Setter;
+import lombok.experimental.Accessors;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+
+@Setter
+@Accessors(chain = true)
+public class JdbcBatchStatementExecutorBuilder {
+ private static final String MERGE_TREE_ENGINE_SUFFIX = "MergeTree";
+ private static final String REPLACING_MERGE_TREE_ENGINE_SUFFIX =
"ReplacingMergeTree";
+ private String table;
+ private String tableEngine;
+ private SeaTunnelRowType rowType;
+ private String[] primaryKeys;
+ private String[] projectionFields;
+ private Map<String, String> clickhouseTableSchema;
+ private boolean supportUpsert;
+ private boolean allowExperimentalLightweightDelete;
+ private String[] orderByKeys;
+
+ private boolean supportMergeTreeEngineExperimentalLightweightDelete() {
+ return tableEngine.endsWith(MERGE_TREE_ENGINE_SUFFIX)
+ && allowExperimentalLightweightDelete;
+ }
+
+ private boolean supportReplacingMergeTreeTableUpsert() {
+ return tableEngine.endsWith(REPLACING_MERGE_TREE_ENGINE_SUFFIX)
+ && Objects.equals(primaryKeys, orderByKeys);
+ }
+
+ private String[] getDefaultProjectionFields() {
+ List<String> fieldNames = Arrays.asList(rowType.getFieldNames());
+ return clickhouseTableSchema.keySet()
+ .stream()
+ .filter(field -> fieldNames.contains(field))
+ .toArray(value -> new String[0]);
+ }
+
+ public JdbcBatchStatementExecutor build() {
+ Objects.requireNonNull(table);
+ Objects.requireNonNull(tableEngine);
+ Objects.requireNonNull(rowType);
+ Objects.requireNonNull(clickhouseTableSchema);
+ if (projectionFields == null) {
+ projectionFields = getDefaultProjectionFields();
+ }
+
+ JdbcRowConverter valueRowConverter = new JdbcRowConverter(
+ rowType, clickhouseTableSchema, projectionFields);
+ if (primaryKeys == null || primaryKeys.length == 0) {
+ // INSERT: writer all events when primary-keys is empty
+ return createInsertBufferedExecutor(table, rowType,
valueRowConverter);
+ }
+
+ int[] pkFields = Arrays.stream(primaryKeys)
+ .mapToInt(Arrays.asList(rowType.getFieldNames())::indexOf)
+ .toArray();
+ SeaTunnelDataType[] pkTypes = getKeyTypes(pkFields, rowType);
+ JdbcRowConverter pkRowConverter = new JdbcRowConverter(
+ new SeaTunnelRowType(primaryKeys, pkTypes), clickhouseTableSchema,
primaryKeys);
+ Function<SeaTunnelRow, SeaTunnelRow> pkExtractor =
createKeyExtractor(pkFields);
+
+ if (supportMergeTreeEngineExperimentalLightweightDelete()) {
+ boolean convertUpdateBeforeEventToDeleteAction;
+ // DELETE: delete sql
+ JdbcBatchStatementExecutor deleteExecutor = createDeleteExecutor(
+ table, primaryKeys, pkRowConverter);
+ JdbcBatchStatementExecutor updateExecutor;
+ if (supportReplacingMergeTreeTableUpsert()) {
+ // ReplacingMergeTree Update Row: upsert row by
order-by-keys(update_after event)
+ updateExecutor = createInsertExecutor(table, rowType,
valueRowConverter);
+ convertUpdateBeforeEventToDeleteAction = false;
+ } else {
+ // *MergeTree Update Row:
+ // 1. delete(update_before event) + insert or update by query
primary-keys(update_after event)
+ // 2. delete(update_before event) + insert(update_after event)
+ updateExecutor = supportUpsert ?
+ createUpsertExecutor(table, rowType, primaryKeys,
pkExtractor, pkRowConverter, valueRowConverter)
+ : createInsertExecutor(table, rowType, valueRowConverter);
+ convertUpdateBeforeEventToDeleteAction = true;
+ }
+ return new ReduceBufferedBatchStatementExecutor(updateExecutor,
deleteExecutor, pkExtractor,
+ Function.identity(), !convertUpdateBeforeEventToDeleteAction);
+ }
+
+ // DELETE: alter table delete sql
+ JdbcBatchStatementExecutor deleteExecutor =
createAlterTableDeleteExecutor(
+ table, primaryKeys, pkRowConverter);
+ JdbcBatchStatementExecutor updateExecutor;
+ if (supportReplacingMergeTreeTableUpsert()) {
+ updateExecutor = createInsertExecutor(table, rowType,
valueRowConverter);
+ } else {
+ // Other-Engine Update Row:
+ // 1. insert or update by query primary-keys(insert/update_after
event)
+ // 2. insert(insert event) + alter table update(update_after event)
+ updateExecutor = supportUpsert ?
+ createUpsertExecutor(table, rowType, primaryKeys, pkExtractor,
pkRowConverter, valueRowConverter)
+ : createInsertOrUpdateExecutor(table, rowType, primaryKeys,
valueRowConverter);
+ }
+ return new ReduceBufferedBatchStatementExecutor(
+ updateExecutor, deleteExecutor, pkExtractor,
+ Function.identity(), true);
+ }
+
+ private static JdbcBatchStatementExecutor
createInsertBufferedExecutor(String table,
+
SeaTunnelRowType rowType,
+
JdbcRowConverter rowConverter) {
+ return new BufferedBatchStatementExecutor(
+ createInsertExecutor(table, rowType, rowConverter),
Function.identity());
+ }
+
+ private static JdbcBatchStatementExecutor
createInsertOrUpdateExecutor(String table,
+
SeaTunnelRowType rowType,
+
String[] pkNames,
+
JdbcRowConverter rowConverter) {
+ return new InsertOrUpdateBatchStatementExecutor(
+ connection ->
connection.prepareStatement(SqlUtils.getInsertIntoStatement(table,
rowType.getFieldNames())),
+ connection ->
connection.prepareStatement(SqlUtils.getAlterTableUpdateStatement(table,
rowType.getFieldNames(), pkNames)),
+ rowConverter);
+ }
+
+ private static JdbcBatchStatementExecutor createUpsertExecutor(String
table,
+
SeaTunnelRowType rowType,
+ String[]
pkNames,
+
Function<SeaTunnelRow, SeaTunnelRow> keyExtractor,
+
JdbcRowConverter keyConverter,
+
JdbcRowConverter valueConverter) {
+ return new InsertOrUpdateBatchStatementExecutor(
+ connection ->
connection.prepareStatement(SqlUtils.getRowExistsStatement(table, pkNames)),
+ connection ->
connection.prepareStatement(SqlUtils.getInsertIntoStatement(table,
rowType.getFieldNames())),
+ connection ->
connection.prepareStatement(SqlUtils.getAlterTableUpdateStatement(table,
rowType.getFieldNames(), pkNames)),
+ keyExtractor,
+ keyConverter,
+ valueConverter);
+ }
+
+ private static JdbcBatchStatementExecutor createInsertExecutor(String
table,
+
SeaTunnelRowType rowType,
+
JdbcRowConverter rowConverter) {
+ String insertSQL = SqlUtils.getInsertIntoStatement(table,
rowType.getFieldNames());
+ return createSimpleExecutor(insertSQL, rowConverter);
+ }
+
+ private static JdbcBatchStatementExecutor createDeleteExecutor(String
table,
+ String[]
primaryKeys,
+
JdbcRowConverter rowConverter) {
+ String deleteSQL = SqlUtils.getDeleteStatement(table, primaryKeys);
+ return createSimpleExecutor(deleteSQL, rowConverter);
+ }
+
+ private static JdbcBatchStatementExecutor
createAlterTableDeleteExecutor(String table,
+
String[] primaryKeys,
+
JdbcRowConverter rowConverter) {
+ String alterTableDeleteSQL =
SqlUtils.getAlterTableDeleteStatement(table, primaryKeys);
+ return createSimpleExecutor(alterTableDeleteSQL, rowConverter);
+ }
+
+ private static JdbcBatchStatementExecutor createSimpleExecutor(String sql,
+
JdbcRowConverter rowConverter) {
+ return new SimpleBatchStatementExecutor(
+ connection -> connection.prepareStatement(sql),
+ rowConverter);
+ }
+
+ private static SeaTunnelDataType[] getKeyTypes(int[] pkFields,
SeaTunnelRowType rowType) {
+ return Arrays.stream(pkFields)
+ .mapToObj((IntFunction<SeaTunnelDataType>) index ->
rowType.getFieldType(index))
+ .toArray(length -> new SeaTunnelDataType[length]);
+ }
+
+ private static Function<SeaTunnelRow, SeaTunnelRow>
createKeyExtractor(int[] pkFields) {
+ return row -> {
+ Object[] fields = new Object[pkFields.length];
+ for (int i = 0; i < pkFields.length; i++) {
+ fields[i] = row.getField(pkFields[i]);
+ }
+ SeaTunnelRow newRow = new SeaTunnelRow(fields);
+ newRow.setTableId(row.getTableId());
+ newRow.setRowKind(row.getRowKind());
+ return row;
+ };
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcRowConverter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcRowConverter.java
new file mode 100644
index 000000000..ec0ce2d80
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcRowConverter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.ArrayInjectFunction;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.BigDecimalInjectFunction;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.ClickhouseFieldInjectFunction;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DateInjectFunction;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DateTimeInjectFunction;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DoubleInjectFunction;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.FloatInjectFunction;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.IntInjectFunction;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.LongInjectFunction;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.MapInjectFunction;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.StringInjectFunction;
+
+import lombok.NonNull;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class JdbcRowConverter implements Serializable {
+ private static final Pattern NULLABLE =
Pattern.compile("Nullable\\((.*)\\)");
+ private static final Pattern LOW_CARDINALITY =
Pattern.compile("LowCardinality\\((.*)\\)");
+ private static final ClickhouseFieldInjectFunction DEFAULT_INJECT_FUNCTION
= new StringInjectFunction();
+
+ private final String[] projectionFields;
+ private final Map<String, ClickhouseFieldInjectFunction>
fieldInjectFunctionMap;
+ private final Map<String, Function<SeaTunnelRow, Object>> fieldGetterMap;
+
+ public JdbcRowConverter(@NonNull SeaTunnelRowType rowType,
+ @NonNull Map<String, String> clickhouseTableSchema,
+ @NonNull String[] projectionFields) {
+ this.projectionFields = projectionFields;
+ this.fieldInjectFunctionMap = createFieldInjectFunctionMap(
+ projectionFields, clickhouseTableSchema);
+ this.fieldGetterMap = createFieldGetterMap(
+ projectionFields, rowType);
+ }
+
+ public PreparedStatement toExternal(SeaTunnelRow row,
+ PreparedStatement statement) throws
SQLException {
+ for (int i = 0; i < projectionFields.length; i++) {
+ String fieldName = projectionFields[i];
+ Object fieldValue = fieldGetterMap.get(fieldName).apply(row);
+ if (fieldValue == null) {
+ // field does not exist in row
+ // todo: do we need to transform to default value of each type
+ statement.setObject(i + 1, null);
+ continue;
+ }
+ fieldInjectFunctionMap.getOrDefault(fieldName,
DEFAULT_INJECT_FUNCTION)
+ .injectFields(statement, i + 1, fieldValue);
+ }
+ return statement;
+ }
+
+ private Map<String, ClickhouseFieldInjectFunction>
createFieldInjectFunctionMap(String[] fields,
+
Map<String, String> clickhouseTableSchema) {
+ Map<String, ClickhouseFieldInjectFunction> fieldInjectFunctionMap =
new HashMap<>();
+ for (String field : fields) {
+ String fieldType = clickhouseTableSchema.get(field);
+ ClickhouseFieldInjectFunction injectFunction = Arrays.asList(
+ new ArrayInjectFunction(),
+ new MapInjectFunction(),
+ new BigDecimalInjectFunction(),
+ new DateInjectFunction(),
+ new DateTimeInjectFunction(),
+ new LongInjectFunction(),
+ new DoubleInjectFunction(),
+ new FloatInjectFunction(),
+ new IntInjectFunction(),
+ new StringInjectFunction())
+ .stream()
+ .filter(f ->
f.isCurrentFieldType(unwrapCommonPrefix(fieldType)))
+ .findFirst()
+ .orElse(new StringInjectFunction());
+ fieldInjectFunctionMap.put(field, injectFunction);
+ }
+ return fieldInjectFunctionMap;
+ }
+
+ private Map<String, Function<SeaTunnelRow, Object>>
createFieldGetterMap(String[] fields,
+
SeaTunnelRowType rowType) {
+ Map<String, Function<SeaTunnelRow, Object>> fieldGetterMap = new
HashMap<>();
+ for (int i = 0; i < fields.length; i++) {
+ String fieldName = fields[i];
+ int fieldIndex = rowType.indexOf(fieldName);
+ fieldGetterMap.put(fieldName, row -> row.getField(fieldIndex));
+ }
+ return fieldGetterMap;
+ }
+
+ private String unwrapCommonPrefix(String fieldType) {
+ Matcher nullMatcher = NULLABLE.matcher(fieldType);
+ Matcher lowMatcher = LOW_CARDINALITY.matcher(fieldType);
+ if (nullMatcher.matches()) {
+ return nullMatcher.group(1);
+ } else if (lowMatcher.matches()) {
+ return lowMatcher.group(1);
+ } else {
+ return fieldType;
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/ReduceBufferedBatchStatementExecutor.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/ReduceBufferedBatchStatementExecutor.java
new file mode 100644
index 000000000..2dab2abea
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/ReduceBufferedBatchStatementExecutor.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor;
+
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import lombok.AllArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+@AllArgsConstructor
+@RequiredArgsConstructor
+public class ReduceBufferedBatchStatementExecutor implements
JdbcBatchStatementExecutor {
+ @NonNull
+ private final JdbcBatchStatementExecutor insertOrUpdateExecutor;
+ @NonNull
+ private final JdbcBatchStatementExecutor deleteExecutor;
+ @NonNull
+ private final Function<SeaTunnelRow, SeaTunnelRow> keyExtractor;
+ @NonNull
+ private final Function<SeaTunnelRow, SeaTunnelRow> valueTransform;
+ private boolean ignoreUpdateBefore;
+ @NonNull
+ private final LinkedHashMap<SeaTunnelRow, Pair<Boolean, SeaTunnelRow>>
buffer = new LinkedHashMap<>();
+
+ @Override
+ public void prepareStatements(Connection connection) throws SQLException {
+ insertOrUpdateExecutor.prepareStatements(connection);
+ deleteExecutor.prepareStatements(connection);
+ }
+
+ @Override
+ public void addToBatch(SeaTunnelRow record) throws SQLException {
+ if (RowKind.UPDATE_BEFORE.equals(record.getRowKind()) &&
ignoreUpdateBefore) {
+ return;
+ }
+
+ SeaTunnelRow key = keyExtractor.apply(record);
+ boolean changeFlag = changeFlag(record.getRowKind());
+ SeaTunnelRow value = valueTransform.apply(record);
+ buffer.put(key, Pair.of(changeFlag, value));
+ }
+
+ @Override
+ public void executeBatch() throws SQLException {
+ Boolean preChangeFlag = null;
+ Set<Map.Entry<SeaTunnelRow, Pair<Boolean, SeaTunnelRow>>> entrySet =
buffer.entrySet();
+ for (Map.Entry<SeaTunnelRow, Pair<Boolean, SeaTunnelRow>> entry :
entrySet) {
+ Boolean currentChangeFlag = entry.getValue().getKey();
+ if (currentChangeFlag) {
+ if (preChangeFlag != null && !preChangeFlag) {
+ deleteExecutor.executeBatch();
+ }
+ insertOrUpdateExecutor.addToBatch(entry.getValue().getValue());
+ } else {
+ if (preChangeFlag != null && preChangeFlag) {
+ insertOrUpdateExecutor.executeBatch();
+ }
+ deleteExecutor.addToBatch(entry.getKey());
+ }
+ preChangeFlag = currentChangeFlag;
+ }
+
+ if (preChangeFlag != null) {
+ if (preChangeFlag) {
+ insertOrUpdateExecutor.executeBatch();
+ } else {
+ deleteExecutor.executeBatch();
+ }
+ }
+ buffer.clear();
+ }
+
+ @Override
+ public void closeStatements() throws SQLException {
+ if (!buffer.isEmpty()) {
+ executeBatch();
+ }
+ insertOrUpdateExecutor.closeStatements();
+ deleteExecutor.closeStatements();
+ }
+
+ private boolean changeFlag(RowKind rowKind) {
+ switch (rowKind) {
+ case INSERT:
+ case UPDATE_AFTER:
+ return true;
+ case DELETE:
+ case UPDATE_BEFORE:
+ return false;
+ default:
+ throw new UnsupportedOperationException("Unsupported rowKind:
" + rowKind);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SimpleBatchStatementExecutor.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SimpleBatchStatementExecutor.java
new file mode 100644
index 000000000..44b84dcb6
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SimpleBatchStatementExecutor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+@RequiredArgsConstructor
+public class SimpleBatchStatementExecutor implements
JdbcBatchStatementExecutor {
+ @NonNull
+ private final StatementFactory statementFactory;
+ @NonNull
+ private final JdbcRowConverter converter;
+ private transient PreparedStatement statement;
+
+ @Override
+ public void prepareStatements(Connection connection) throws SQLException {
+ statement = statementFactory.createStatement(connection);
+ }
+
+ @Override
+ public void addToBatch(SeaTunnelRow record) throws SQLException {
+ converter.toExternal(record, statement);
+ statement.addBatch();
+ }
+
+ @Override
+ public void executeBatch() throws SQLException {
+ statement.executeBatch();
+ statement.clearBatch();
+ }
+
+ @Override
+ public void closeStatements() throws SQLException {
+ if (statement != null) {
+ statement.close();
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SqlUtils.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SqlUtils.java
new file mode 100644
index 000000000..6582a1747
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SqlUtils.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor;
+
+import static java.lang.String.format;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+public class SqlUtils {
+ public static String quoteIdentifier(String identifier) {
+ return "" + identifier + "";
+ }
+
+ public static String getInsertIntoStatement(String tableName,
+ String[] fieldNames) {
+ String columns = Arrays.stream(fieldNames)
+ .map(fieldName -> quoteIdentifier(fieldName))
+ .collect(Collectors.joining(", "));
+ String placeholders = Arrays.stream(fieldNames)
+ .map(fieldName -> "?")
+ .collect(Collectors.joining(", "));
+ return String.format("INSERT INTO %s (%s) VALUES (%s)",
+ tableName, columns, placeholders);
+ }
+
+ public static String getDeleteStatement(String tableName,
+ String[] conditionFields) {
+ String conditionClause = Arrays.stream(conditionFields)
+ .map(fieldName -> format("%s = ?", quoteIdentifier(fieldName)))
+ .collect(Collectors.joining(" AND "));
+ return String.format("" +
+ "SET allow_experimental_lightweight_delete = true;" +
+ "DELETE FROM %s WHERE %s", quoteIdentifier(tableName),
conditionClause);
+ }
+
+ public static String getAlterTableUpdateStatement(String tableName,
+ String[] fieldNames,
+ String[]
conditionFields) {
+ String setClause = Arrays.stream(fieldNames)
+ .map(fieldName -> String.format("%s = ?",
quoteIdentifier(fieldName)))
+ .collect(Collectors.joining(", "));
+ String conditionClause = Arrays.stream(conditionFields)
+ .map(fieldName -> String.format("%s = ?",
quoteIdentifier(fieldName)))
+ .collect(Collectors.joining(" AND "));
+ return String.format("ALTER TABLE %s UPDATE %s WHERE %s",
+ tableName, setClause, conditionClause);
+ }
+
+ public static String getAlterTableDeleteStatement(String tableName,
+ String[]
conditionFields) {
+ String conditionClause = Arrays.stream(conditionFields)
+ .map(fieldName -> format("%s = ?", quoteIdentifier(fieldName)))
+ .collect(Collectors.joining(" AND "));
+ return String.format("ALTER TABLE %s DELETE WHERE %s",
+ tableName, conditionClause);
+ }
+
+ public static String getRowExistsStatement(String tableName,
+ String[] conditionFields) {
+ String fieldExpressions = Arrays.stream(conditionFields)
+ .map(field -> format("%s = ?", quoteIdentifier(field)))
+ .collect(Collectors.joining(" AND "));
+ return String.format("SELECT 1 FROM %s WHERE %s",
+ quoteIdentifier(tableName), fieldExpressions);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/StatementFactory.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/StatementFactory.java
new file mode 100644
index 000000000..3093b39db
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/StatementFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+@FunctionalInterface
+public interface StatementFactory {
+
+ PreparedStatement createStatement(Connection connection) throws
SQLException;
+
+}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index 7aa5010f2..1da905450 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -105,6 +105,7 @@ public class ClickhouseFileSink implements
SeaTunnelSink<SeaTunnelRow, Clickhous
ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
Map<String, String> tableSchema =
proxy.getClickhouseTableSchema(config.getString(TABLE.key()));
+ ClickhouseTable table =
proxy.getClickhouseTable(config.getString(DATABASE.key()),
config.getString(TABLE.key()));
String shardKey = null;
String shardKeyType = null;
if (config.hasPath(SHARDING_KEY.key())) {
@@ -116,6 +117,7 @@ public class ClickhouseFileSink implements
SeaTunnelSink<SeaTunnelRow, Clickhous
shardKeyType,
config.getString(DATABASE.key()),
config.getString(TABLE.key()),
+ table.getEngine(),
false, // we don't need to set splitMode in clickhouse file mode.
new Shard(1, 1, nodes.get(0)), config.getString(USERNAME.key()),
config.getString(PASSWORD.key()));
List<String> fields;