This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 073ebeeddf [Feature][Connector-V2] Support CDC mode for databend sink
connector (#9661)
073ebeeddf is described below
commit 073ebeeddf3b25617121561b746c65befb71e262
Author: Jeremy <[email protected]>
AuthorDate: Mon Sep 22 23:01:18 2025 +0800
[Feature][Connector-V2] Support CDC mode for databend sink connector (#9661)
---
docs/en/connector-v2/sink/Databend.md | 53 ++-
docs/zh/connector-v2/sink/Databend.md | 54 ++-
.../databend/catalog/DatabendCatalog.java | 2 +-
.../seatunnel/databend/config/DatabendOptions.java | 6 +
.../databend/config/DatabendSinkConfig.java | 229 +++++-----
.../databend/config/DatabendSinkOptions.java | 18 +
.../seatunnel/databend/sink/DatabendSink.java | 141 ++++++-
.../sink/DatabendSinkAggregatedCommitInfo.java | 58 +++
.../sink/DatabendSinkAggregatedCommitter.java | 250 +++++++++++
.../databend/sink/DatabendSinkCommitterInfo.java | 65 +++
.../databend/sink/DatabendSinkWriter.java | 470 ++++++++++++++++++---
.../databend/sink/DatabendSinkWriterTest.java | 211 +++++++++
.../e2e/connector/databend/DatabendCDCSinkIT.java | 335 +++++++++++++++
.../resources/databend/fake_to_databend_cdc.conf | 92 ++++
.../container/seatunnel/SeaTunnelContainer.java | 1 +
15 files changed, 1785 insertions(+), 200 deletions(-)
diff --git a/docs/en/connector-v2/sink/Databend.md
b/docs/en/connector-v2/sink/Databend.md
index e040d2c355..4999a600c9 100644
--- a/docs/en/connector-v2/sink/Databend.md
+++ b/docs/en/connector-v2/sink/Databend.md
@@ -12,9 +12,9 @@ import ChangeLog from '../changelog/connector-databend.md';
## Key Features
-- [ ] [Exactly-Once](../../concept/connector-v2-features.md)
- [ ] [Support Multi-table Writing](../../concept/connector-v2-features.md)
-- [ ] [CDC](../../concept/connector-v2-features.md)
+- [x] [Exactly-Once](../../concept/connector-v2-features.md)
+- [x] [CDC](../../concept/connector-v2-features.md)
- [x] [Parallelism](../../concept/connector-v2-features.md)
## Description
@@ -34,21 +34,23 @@ The Databend sink internally implements bulk data import
through stage attachmen
## Sink Options
-| Name | Type | Required | Default Value | Description
|
-|------|------|----------|---------------|---------------------------------------------|
-| url | String | Yes | - | Databend JDBC connection URL |
-| username | String | Yes | - | Databend database username |
-| password | String | Yes | - | Databend database password
|
-| database | String | No | - | Databend database name, defaults to the
database name specified in the connection URL |
-| table | String | No | - | Databend table name |
-| batch_size | Integer | No | 1000 | Number of records for batch writing
|
-| auto_commit | Boolean | No | true | Whether to auto-commit transactions
|
-| max_retries | Integer | No | 3 | Maximum retry attempts on write failure
|
-| schema_save_mode | Enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | Schema save
mode |
-| data_save_mode | Enum | No | APPEND_DATA | Data save mode
|
-| custom_sql | String | No | - | Custom write SQL, typically used for complex
write scenarios |
+| Name | Type | Required | Default Value | Description
|
+|---------------------|------|----------|---------------|---------------------------------------------|
+| url | String | Yes | - | Databend JDBC connection URL
|
+| username | String | Yes | - | Databend database username
|
+| password | String | Yes | - | Databend database password
|
+| database | String | No | - | Databend database name, defaults to
the database name specified in the connection URL |
+| table | String | No | - | Databend table name
|
+| batch_size | Integer | No | 1000 | Number of records for batch
writing |
+| auto_commit | Boolean | No | true | Whether to auto-commit
transactions |
+| max_retries | Integer | No | 3 | Maximum retry attempts on write
failure |
+| schema_save_mode | Enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | Schema save
mode |
+| data_save_mode | Enum | No | APPEND_DATA | Data save mode
|
+| custom_sql | String | No | - | Custom write SQL, typically used for
complex write scenarios |
| execute_timeout_sec | Integer | No | 300 | SQL execution timeout (seconds)
|
-| jdbc_config | Map | No | - | Additional JDBC connection configuration, such
as connection timeout parameters |
+| jdbc_config | Map | No | - | Additional JDBC connection
configuration, such as connection timeout parameters |
+| conflict_key | String | No | - | Conflict key for CDC mode, used to
determine the primary key for conflict resolution |
+| enable_delete | Boolean | No | false | Whether to allow delete
operations in CDC mode |
### schema_save_mode [Enum]
@@ -152,6 +154,25 @@ sink {
}
```
+### CDC mode
+
+```hocon
+sink {
+ Databend {
+ url = "jdbc:databend://databend:8000/default?ssl=false"
+ username = "root"
+ password = ""
+ database = "default"
+ table = "sink_table"
+
+ # Enable CDC mode
+ batch_size = 1
+ conflict_key = "id"
+ enable_delete = true
+ }
+}
+```
+
## Related Links
- [Databend Official Website](https://databend.rs/)
diff --git a/docs/zh/connector-v2/sink/Databend.md
b/docs/zh/connector-v2/sink/Databend.md
index aa4b9dab15..62944b6b18 100644
--- a/docs/zh/connector-v2/sink/Databend.md
+++ b/docs/zh/connector-v2/sink/Databend.md
@@ -12,9 +12,9 @@ import ChangeLog from '../changelog/connector-databend.md';
## 主要特性
-- [ ] [精确一次](../../concept/connector-v2-features.md)
- [ ] [支持多表写入](../../concept/connector-v2-features.md)
-- [ ] [cdc](../../concept/connector-v2-features.md)
+- [x] [精确一次](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
- [x] [并行度](../../concept/connector-v2-features.md)
## 描述
@@ -34,21 +34,23 @@ Databend sink 内部通过 stage attachment 实现数据的批量导入。
## Sink 选项
-| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
-|------|------|----------|--------|------------------------------------|
-| url | String | 是 | - | Databend JDBC 连接 URL |
-| username | String | 是 | - | Databend 数据库用户名 |
-| password | String | 是 | - | Databend 数据库密码 |
-| database | String | 否 | - | Databend 数据库名称,默认使用连接 URL 中指定的数据库名 |
-| table | String | 否 | - | Databend 表名称 |
-| batch_size | Integer | 否 | 1000 | 批量写入的记录数 |
-| auto_commit | Boolean | 否 | true | 是否自动提交事务 |
-| max_retries | Integer | 否 | 3 | 写入失败时的最大重试次数 |
-| schema_save_mode | Enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | 保存 Schema 的模式
|
-| data_save_mode | Enum | 否 | APPEND_DATA | 保存数据的模式
|
-| custom_sql | String | 否 | - | 自定义写入 SQL,通常用于复杂的写入场景 |
+| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
+|---------------------|------|----------|--------|------------------------------------|
+| url | String | 是 | - | Databend JDBC 连接 URL |
+| username | String | 是 | - | Databend 数据库用户名 |
+| password | String | 是 | - | Databend 数据库密码 |
+| database | String | 否 | - | Databend 数据库名称,默认使用连接 URL 中指定的数据库名 |
+| table | String | 否 | - | Databend 表名称 |
+| batch_size | Integer | 否 | 1000 | 批量写入的记录数
|
+| auto_commit | Boolean | 否 | true | 是否自动提交事务
|
+| max_retries | Integer | 否 | 3 | 写入失败时的最大重试次数 |
+| schema_save_mode | Enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | 保存 Schema
的模式 |
+| data_save_mode | Enum | 否 | APPEND_DATA | 保存数据的模式
|
+| custom_sql | String | 否 | - | 自定义写入 SQL,通常用于复杂的写入场景 |
| execute_timeout_sec | Integer | 否 | 300 | 执行SQL的超时时间(秒)
|
-| jdbc_config | Map | 否 | - | 额外的 JDBC 连接配置,如连接超时参数等 |
+| jdbc_config | Map | 否 | - | 额外的 JDBC 连接配置,如连接超时参数等 |
+| conflict_key | String | 否 | - | cdc 模式下的冲突键,用于确定冲突解决的主键 |
+| enable_delete | Boolean | 否 | false | cdc 模式下是否允许删除操作 |
### schema_save_mode [Enum]
@@ -152,6 +154,26 @@ sink {
}
```
+### CDC mode
+
+```hocon
+sink {
+ Databend {
+ url = "jdbc:databend://databend:8000/default?ssl=false"
+ username = "root"
+ password = ""
+ database = "default"
+ table = "sink_table"
+
+ # Enable CDC mode
+ batch_size = 1
+ interval = 3
+ conflict_key = "id"
+ enable_delete = true
+ }
+}
+```
+
## 相关链接
- [Databend 官方网站](https://databend.rs/)
diff --git
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/catalog/DatabendCatalog.java
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/catalog/DatabendCatalog.java
index 29d529bef3..56010f1d69 100644
---
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/catalog/DatabendCatalog.java
+++
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/catalog/DatabendCatalog.java
@@ -449,7 +449,7 @@ public class DatabendCatalog implements Catalog {
return "TIME";
case TIMESTAMP:
LocalTimeType timeType = (LocalTimeType) dataType;
- return String.format("TIMESTAMP(%d)");
+ return "TIMESTAMP";
default:
throw new DatabendConnectorException(
DatabendConnectorErrorCode.UNSUPPORTED_DATA_TYPE,
diff --git
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendOptions.java
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendOptions.java
index e9cf5fec7e..184f30d088 100644
---
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendOptions.java
+++
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendOptions.java
@@ -97,4 +97,10 @@ public class DatabendOptions {
.booleanType()
.defaultValue(true)
.withDescription("Whether to auto commit for sink");
+
+ public static final Option<String> CONFLICT_KEY =
+ Options.key("conflict_key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The conflict key for sink, used in
upsert mode");
}
diff --git
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendSinkConfig.java
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendSinkConfig.java
index a9176636a7..cf644de087 100644
---
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendSinkConfig.java
@@ -18,131 +18,154 @@
package org.apache.seatunnel.connectors.seatunnel.databend.config;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.sink.DataSaveMode;
-import org.apache.seatunnel.api.sink.SchemaSaveMode;
import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Properties;
-import static
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.AUTO_COMMIT;
-import static
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.BATCH_SIZE;
-import static
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.JDBC_CONFIG;
-import static
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.MAX_RETRIES;
-import static
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.SSL;
-import static
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.TABLE;
-import static
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.URL;
-import static
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions.USERNAME;
-import static
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendSinkOptions.CUSTOM_SQL;
-import static
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendSinkOptions.DATA_SAVE_MODE;
-import static
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendSinkOptions.EXECUTE_TIMEOUT_SEC;
-import static
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendSinkOptions.SCHEMA_SAVE_MODE;
-
-@Setter
+@Slf4j
@Getter
-@ToString
public class DatabendSinkConfig implements Serializable {
private static final long serialVersionUID = 1L;
- // common options
- private String url;
- private String username;
- private String password;
- private Boolean ssl;
- private String database;
- private String table;
- private Boolean autoCommit;
- private Integer batchSize;
- private Integer maxRetries;
- private Map<String, String> jdbcConfig;
-
- // sink options
- private Integer executeTimeoutSec;
- private String customSql;
- private SchemaSaveMode schemaSaveMode;
- private DataSaveMode dataSaveMode;
- private Properties properties;
+ private final String url;
+ private final String username;
+ private final String password;
+ private final String database;
+ private final String table;
+ private final boolean autoCommit;
+ private final int batchSize;
+ private final int executeTimeoutSec;
+ private final int interval;
+ private final String conflictKey;
+ private final boolean enableDelete;
+
+ private DatabendSinkConfig(Builder builder) {
+ this.url = builder.url;
+ this.username = builder.username;
+ this.password = builder.password;
+ this.database = builder.database;
+ this.table = builder.table;
+ this.autoCommit = builder.autoCommit;
+ this.batchSize = builder.batchSize;
+ this.executeTimeoutSec = builder.executeTimeoutSec;
+ this.interval = builder.interval;
+ this.conflictKey = builder.conflictKey;
+ this.enableDelete = builder.enableDelete;
+ }
public static DatabendSinkConfig of(ReadonlyConfig config) {
- DatabendSinkConfig sinkConfig = new DatabendSinkConfig();
-
- // common options
- sinkConfig.setUrl(config.get(URL));
- sinkConfig.setUsername(config.get(USERNAME));
- sinkConfig.setPassword(config.get(PASSWORD));
- sinkConfig.setDatabase(config.get(DATABASE));
- sinkConfig.setTable(config.get(TABLE));
- sinkConfig.setAutoCommit(config.get(AUTO_COMMIT));
- sinkConfig.setBatchSize(config.get(BATCH_SIZE));
- sinkConfig.setMaxRetries(config.get(MAX_RETRIES));
- sinkConfig.setJdbcConfig(config.get(JDBC_CONFIG));
-
- // sink options
- sinkConfig.setExecuteTimeoutSec(config.get(EXECUTE_TIMEOUT_SEC));
- sinkConfig.setCustomSql(config.getOptional(CUSTOM_SQL).orElse(null));
- sinkConfig.setSchemaSaveMode(config.get(SCHEMA_SAVE_MODE));
- sinkConfig.setDataSaveMode(config.get(DATA_SAVE_MODE));
- // Create properties for JDBC connection
- Properties properties = new Properties();
- if (sinkConfig.getJdbcConfig() != null) {
- sinkConfig.getJdbcConfig().forEach(properties::setProperty);
+ return new Builder()
+ .withUrl(config.get(DatabendOptions.URL))
+ .withUsername(config.get(DatabendOptions.USERNAME))
+ .withPassword(config.get(DatabendOptions.PASSWORD))
+ .withDatabase(config.get(DatabendOptions.DATABASE))
+ .withTable(config.get(DatabendOptions.TABLE))
+ .withAutoCommit(config.get(DatabendOptions.AUTO_COMMIT))
+ .withBatchSize(config.get(DatabendOptions.BATCH_SIZE))
+
.withExecuteTimeoutSec(config.get(DatabendSinkOptions.EXECUTE_TIMEOUT_SEC))
+ .withConflictKey(config.get(DatabendSinkOptions.CONFLICT_KEY))
+ .withAllowDelete(config.get(DatabendSinkOptions.ENABLE_DELETE))
+ .build();
+ }
+
+ public static class Builder {
+ private String url;
+ private String username;
+ private String password;
+ private String database;
+ private String table;
+ private boolean autoCommit = true;
+ private int batchSize = 1000;
+ private int executeTimeoutSec = 300;
+ private int interval = 30;
+ private String conflictKey;
+ private boolean enableDelete = false;
+
+ public Builder withUrl(String url) {
+ this.url = url;
+ return this;
+ }
+
+ public Builder withUsername(String username) {
+ this.username = username;
+ return this;
}
- if (!properties.containsKey("user")) {
- properties.setProperty("user", sinkConfig.getUsername());
+
+ public Builder withPassword(String password) {
+ this.password = password;
+ return this;
}
- if (!properties.containsKey("password")) {
- properties.setProperty("password", sinkConfig.getPassword());
+
+ public Builder withDatabase(String database) {
+ this.database = database;
+ return this;
}
- if (sinkConfig.getSsl() != null) {
- properties.setProperty("ssl", sinkConfig.getSsl().toString());
+
+ public Builder withTable(String table) {
+ this.table = table;
+ return this;
}
- sinkConfig.setProperties(properties);
- return sinkConfig;
- }
+ public Builder withAutoCommit(boolean autoCommit) {
+ this.autoCommit = autoCommit;
+ return this;
+ }
- // Change UserName, password, jdbcConfig to properties from
databendSinkConfig
- public Properties toProperties() {
- Properties properties = new Properties();
- properties.put("user", username);
- properties.put("password", password);
- properties.put("ssl", ssl);
- if (jdbcConfig != null) {
- jdbcConfig.forEach(properties::put);
+ public Builder withBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ return this;
}
- return properties;
- }
- /** Convert this config to a ReadonlyConfig */
- public ReadonlyConfig toReadonlyConfig() {
- Map<String, Object> map = new HashMap<>();
- map.put(URL.key(), url);
- map.put(USERNAME.key(), username);
- map.put(PASSWORD.key(), password);
- if (ssl != null) {
- map.put(SSL.key(), ssl);
+
+ public Builder withExecuteTimeoutSec(int executeTimeoutSec) {
+ this.executeTimeoutSec = executeTimeoutSec;
+ return this;
}
- map.put(DATABASE.key(), database);
- map.put(TABLE.key(), table);
- map.put(AUTO_COMMIT.key(), autoCommit);
- map.put(BATCH_SIZE.key(), batchSize);
- map.put(MAX_RETRIES.key(), maxRetries);
- if (jdbcConfig != null) {
- map.put(JDBC_CONFIG.key(), jdbcConfig);
+
+ public Builder withInterval(int interval) {
+ this.interval = interval;
+ return this;
}
- map.put(EXECUTE_TIMEOUT_SEC.key(), executeTimeoutSec);
- if (customSql != null) {
- map.put(CUSTOM_SQL.key(), customSql);
+
+ public Builder withConflictKey(String conflictKey) {
+ this.conflictKey = conflictKey;
+ return this;
}
- map.put(SCHEMA_SAVE_MODE.key(), schemaSaveMode);
- map.put(DATA_SAVE_MODE.key(), dataSaveMode);
- return ReadonlyConfig.fromMap(map);
+ public Builder withAllowDelete(boolean allowDelete) {
+ this.enableDelete = allowDelete;
+ return this;
+ }
+
+ public DatabendSinkConfig build() {
+ return new DatabendSinkConfig(this);
+ }
+ }
+
+ public Properties getProperties() {
+ Properties properties = new Properties();
+ properties.setProperty("user", username);
+ properties.setProperty("password", password);
+ return properties;
+ }
+
+ public String getRawTableName() {
+ long timestamp = System.currentTimeMillis();
+ return table + "_raw_" + timestamp;
+ }
+
+ public String getStreamName() {
+ long timestamp = System.currentTimeMillis();
+ return table + "_stream_" + timestamp;
+ }
+
+ public Properties toProperties() {
+ return getProperties();
+ }
+
+ public boolean isCdcMode() {
+ return conflictKey != null && !conflictKey.isEmpty();
}
}
diff --git
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendSinkOptions.java
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendSinkOptions.java
index 19fa2823a1..3b186693a4 100644
---
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/config/DatabendSinkOptions.java
@@ -47,4 +47,22 @@ public class DatabendSinkOptions {
.intType()
.defaultValue(300)
.withDescription("The timeout seconds for Databend client
execution");
+
+ public static final Option<Integer> BATCH_SIZE =
+ Options.key("batch_size")
+ .intType()
+ .defaultValue(1000)
+ .withDescription("Batch size for CDC merge operations");
+
+ public static final Option<String> CONFLICT_KEY =
+ Options.key("conflict_key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Conflict key for CDC merge operations");
+
+ public static final Option<Boolean> ENABLE_DELETE =
+ Options.key("enable_delete")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to allow delete operations in
CDC mode");
}
diff --git
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSink.java
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSink.java
index 342dee7bab..284004a908 100644
---
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSink.java
+++
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSink.java
@@ -17,12 +17,16 @@
package org.apache.seatunnel.connectors.seatunnel.databend.sink;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -50,13 +54,20 @@ import java.util.stream.Collectors;
@Slf4j
public class DatabendSink
- implements SeaTunnelSink<SeaTunnelRow, Void, Void, Void>,
SupportSaveMode {
+ implements SeaTunnelSink<
+ SeaTunnelRow,
+ Void,
+ DatabendSinkCommitterInfo,
+ DatabendSinkAggregatedCommitInfo>,
+ SupportSaveMode {
private final CatalogTable catalogTable;
private final SchemaSaveMode schemaSaveMode;
private final DataSaveMode dataSaveMode;
private final String database;
private final String table;
+ private final String rawTableName;
+ private final String streamName;
private final String customSql;
private final boolean autoCommit;
private final int batchSize;
@@ -64,6 +75,10 @@ public class DatabendSink
private final DatabendSinkConfig databendSinkConfig;
private ReadonlyConfig readonlyConfig;
+ // CDC infrastructure initialization fields
+ private boolean isCdcInfrastructureInitialized = false;
+ private JobContext jobContext;
+
public DatabendSink(CatalogTable catalogTable, ReadonlyConfig options) {
this.catalogTable = catalogTable;
this.databendSinkConfig = DatabendSinkConfig.of(options);
@@ -82,6 +97,8 @@ public class DatabendSink
} else {
this.table = configuredTable;
}
+ this.rawTableName = databendSinkConfig.getRawTableName();
+ this.streamName = databendSinkConfig.getStreamName();
this.autoCommit = options.get(DatabendOptions.AUTO_COMMIT);
this.batchSize = options.get(DatabendOptions.BATCH_SIZE);
this.executeTimeoutSec =
options.get(DatabendSinkOptions.EXECUTE_TIMEOUT_SEC);
@@ -111,6 +128,13 @@ public class DatabendSink
log.info("Auto commit: {}", autoCommit);
log.info("Batch size: {}", batchSize);
log.info("Execute timeout: {} seconds", executeTimeoutSec);
+
+ // CDC mode info
+ if (databendSinkConfig.isCdcMode()) {
+ log.info("CDC mode enabled with conflict key: {}",
databendSinkConfig.getConflictKey());
+ log.info("Enable delete: {}", databendSinkConfig.isEnableDelete());
+ log.info("Interval: {} seconds", databendSinkConfig.getInterval());
+ }
}
@Override
@@ -119,7 +143,8 @@ public class DatabendSink
}
@Override
- public DatabendSinkWriter createWriter(@NonNull SinkWriter.Context
context) throws IOException {
+ public SinkWriter<SeaTunnelRow, DatabendSinkCommitterInfo, Void>
createWriter(
+ @NonNull SinkWriter.Context context) throws IOException {
try {
Connection connection =
DatabendUtil.createConnection(databendSinkConfig);
connection.setAutoCommit(autoCommit);
@@ -132,6 +157,8 @@ public class DatabendSink
customSql,
database,
table,
+ rawTableName,
+ streamName,
batchSize,
executeTimeoutSec);
} catch (SQLException e) {
@@ -142,11 +169,6 @@ public class DatabendSink
}
}
- @Override
- public Optional<CatalogTable> getWriteCatalogTable() {
- return Optional.of(catalogTable);
- }
-
@Override
public Optional<SaveModeHandler> getSaveModeHandler() {
try {
@@ -220,4 +242,109 @@ public class DatabendSink
return "STRING"; // Default to STRING for complex types
}
}
+
+ @Override
+ public Optional<
+ SinkAggregatedCommitter<
+ DatabendSinkCommitterInfo,
DatabendSinkAggregatedCommitInfo>>
+ createAggregatedCommitter() throws IOException {
+ DatabendSinkAggregatedCommitter committer =
+ new DatabendSinkAggregatedCommitter(
+ databendSinkConfig, database, table, rawTableName,
streamName);
+ committer.setCatalogTable(catalogTable);
+ return Optional.of(committer);
+ }
+
+ @Override
+ public Optional<Serializer<DatabendSinkCommitterInfo>>
getCommitInfoSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ @Override
+ public Optional<Serializer<DatabendSinkAggregatedCommitInfo>>
+ getAggregatedCommitInfoSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ @Override
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
+
+ // Only initialize CDC infrastructure on coordinator node in BATCH mode
+ // jobContext.getJobMode() == JobMode.BATCH
+ if (databendSinkConfig.isCdcMode() && !isCdcInfrastructureInitialized)
{
+ initializeCdcInfrastructure();
+ isCdcInfrastructureInitialized = true;
+ }
+ }
+
+ /** Initialize CDC infrastructure (raw table and stream) only once on the
coordinator node */
+ private void initializeCdcInfrastructure() {
+ log.info("Initializing CDC infrastructure for database: {}, table:
{}", database, table);
+ try (Connection connection =
DatabendUtil.createConnection(databendSinkConfig)) {
+ // Generate unique names for raw table and stream
+ String rawTableName = this.rawTableName;
+ String streamName = this.streamName;
+
+ // Create raw table
+ createRawTable(connection, rawTableName);
+
+ // Create stream on raw table
+ createStream(connection, database, rawTableName, streamName);
+
+ log.info(
+ "CDC infrastructure initialized - raw table: {}, stream:
{}",
+ rawTableName,
+ streamName);
+ } catch (SQLException e) {
+ throw new DatabendConnectorException(
+ DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
+ "Failed to initialize CDC infrastructure: " +
e.getMessage(),
+ e);
+ }
+ }
+
+ private String getCurrentTimestamp() {
+ return java.time.LocalDateTime.now()
+
.format(java.time.format.DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS"));
+ }
+
+ private void createRawTable(Connection connection, String rawTableName)
throws SQLException {
+ String createTableSql =
+ String.format(
+ "CREATE TABLE IF NOT EXISTS %s.%s ("
+ + " id VARCHAR(255),"
+ + " table_name VARCHAR(255),"
+ + " raw_data JSON,"
+ + " add_time TIMESTAMP,"
+ + " action STRING"
+ + ")",
+ database, rawTableName);
+
+ log.info("Creating raw table with SQL: {}", createTableSql);
+ try (java.sql.Statement stmt = connection.createStatement()) {
+ stmt.execute(createTableSql);
+ log.info("Raw table {} created successfully", rawTableName);
+ }
+ }
+
+ private void createStream(
+ Connection connection, String database, String rawTableName,
String streamName)
+ throws SQLException {
+ String createStreamSql =
+ String.format(
+ "CREATE STREAM IF NOT EXISTS %s.%s ON TABLE %s.%s",
+ database, streamName, database, rawTableName);
+
+ log.info("Creating stream with SQL: {}", createStreamSql);
+ try (java.sql.Statement stmt = connection.createStatement()) {
+ stmt.execute(createStreamSql);
+ log.info("Stream {} created successfully", streamName);
+ }
+ }
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.of(catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitInfo.java
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitInfo.java
new file mode 100644
index 0000000000..f6d0518e1f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitInfo.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.databend.sink;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class DatabendSinkAggregatedCommitInfo implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final List<DatabendSinkCommitterInfo> commitInfos;
+ private final String rawTableName;
+ private final String streamName;
+
+ public DatabendSinkAggregatedCommitInfo(
+ List<DatabendSinkCommitterInfo> commitInfos, String rawTableName,
String streamName) {
+ this.commitInfos = commitInfos;
+ this.rawTableName = rawTableName;
+ this.streamName = streamName;
+ }
+
+ public List<DatabendSinkCommitterInfo> getCommitInfos() {
+ return commitInfos;
+ }
+
+ public String getRawTableName() {
+ return rawTableName;
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new
StringBuilder("DatabendSinkAggregatedCommitInfo{");
+ sb.append("commitInfos=").append(commitInfos);
+ sb.append(", rawTableName='").append(rawTableName).append("'");
+ sb.append(", streamName='").append(streamName).append("'");
+ sb.append('}');
+ return sb.toString();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitter.java
new file mode 100644
index 0000000000..0f55132fa9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitter.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.databend.sink;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import
org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.databend.exception.DatabendConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.databend.exception.DatabendConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.databend.util.DatabendUtil;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Aggregated committer for Databend sink that handles CDC (Change Data
Capture) operations. In CDC
+ * mode, this committer performs merge operations to apply changes to the
target table. Merge
+ * operations are only performed when the accumulated record count reaches the
configured batch
+ * size, which helps optimize performance by reducing the frequency of merge
operations.
+ */
+@Slf4j
+public class DatabendSinkAggregatedCommitter
+ implements SinkAggregatedCommitter<
+ DatabendSinkCommitterInfo, DatabendSinkAggregatedCommitInfo> {
+
+ // Add a unique identifier for each instance
+ private static final AtomicLong INSTANCE_COUNTER = new AtomicLong(0);
+ private final long instanceId = INSTANCE_COUNTER.getAndIncrement();
+
+ private final DatabendSinkConfig databendSinkConfig;
+ private final String database;
+ private final String table;
+ private final String rawTableName;
+ private final String streamName;
+
+ private Connection connection;
+ private boolean isCdcMode;
+ // Store catalog table to access schema information
+ private CatalogTable catalogTable;
+
+ // Add a setter for catalogTable
+ public void setCatalogTable(CatalogTable catalogTable) {
+ this.catalogTable = catalogTable;
+ }
+
+ public DatabendSinkAggregatedCommitter(
+ DatabendSinkConfig databendSinkConfig,
+ String database,
+ String table,
+ String rawTableName,
+ String streamName) {
+ this.databendSinkConfig = databendSinkConfig;
+ this.database = database;
+ this.table = table;
+ this.rawTableName = rawTableName;
+ this.streamName = streamName;
+ this.isCdcMode = databendSinkConfig.isCdcMode();
+ }
+
+ @Override
+ public void init() {
+ try {
+ log.info("[Instance {}] Initializing
DatabendSinkAggregatedCommitter", instanceId);
+ log.info("[Instance {}] DatabendSinkConfig: {}", instanceId,
databendSinkConfig);
+ log.info("[Instance {}] Database: {}", instanceId, database);
+ log.info("[Instance {}] Table: {}", instanceId, table);
+ log.info("[Instance {}] Is CDC mode: {}", instanceId, isCdcMode);
+
+ this.connection =
DatabendUtil.createConnection(databendSinkConfig);
+ log.info(
+ "[Instance {}] Databend connection created successfully:
{}",
+ instanceId,
+ connection);
+
+ // CDC infrastructure is now initialized in
DatabendSink.setJobContext
+ // Just log that we're in CDC mode
+ if (isCdcMode) {
+ log.info("[Instance {}] Running in CDC mode", instanceId);
+ }
+ } catch (SQLException e) {
+ log.error(
+ "[Instance {}] Failed to initialize
DatabendSinkAggregatedCommitter: {}",
+ instanceId,
+ e.getMessage(),
+ e);
+ throw new DatabendConnectorException(
+ DatabendConnectorErrorCode.CONNECT_FAILED,
+ "Failed to initialize DatabendSinkAggregatedCommitter: " +
e.getMessage(),
+ e);
+ } catch (Exception e) {
+ log.error(
+ "[Instance {}] Unexpected error during initialization: {}",
+ instanceId,
+ e.getMessage(),
+ e);
+ throw e;
+ }
+ }
+
+ private String getCurrentTimestamp() {
+ return
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS"));
+ }
+
+ @Override
+ public List<DatabendSinkAggregatedCommitInfo> commit(
+ List<DatabendSinkAggregatedCommitInfo> aggregatedCommitInfos)
throws IOException {
+ // Perform final merge operation in CDC mode only when necessary
+ if (isCdcMode) {
+ performMerge(aggregatedCommitInfos);
+ }
+
+ // Return empty list as there's no need to retry
+ return new ArrayList<>();
+ }
+
+ private void performMerge(List<DatabendSinkAggregatedCommitInfo>
aggregatedCommitInfos) {
+ // Merge all the data from raw table to target table
+ String mergeSql = generateMergeSql();
+ log.info("[Instance {}] Executing MERGE INTO statement: {}",
instanceId, mergeSql);
+
+ try (Statement stmt = connection.createStatement()) {
+ stmt.execute(mergeSql);
+ log.info("[Instance {}] Merge operation completed successfully",
instanceId);
+ } catch (SQLException e) {
+ log.error(
+ "[Instance {}] Failed to execute merge operation: {}",
+ instanceId,
+ e.getMessage(),
+ e);
+ throw new DatabendConnectorException(
+ DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
+ "Failed to execute merge operation: " + e.getMessage(),
+ e);
+ }
+ }
+
+ private String generateMergeSql() {
+ StringBuilder sql = new StringBuilder();
+ sql.append(String.format("MERGE INTO %s.%s a ", database, table));
+ sql.append("USING (SELECT ");
+
+ // Add all columns from raw_data
+ if (catalogTable != null && catalogTable.getSeaTunnelRowType() !=
null) {
+ String[] fieldNames =
catalogTable.getSeaTunnelRowType().getFieldNames();
+ for (int i = 0; i < fieldNames.length; i++) {
+ if (i > 0) {
+ sql.append(", ");
+ }
+ sql.append(String.format("raw_data:%s as %s", fieldNames[i],
fieldNames[i]));
+ }
+ } else {
+ // Fallback to generic raw_data if schema is not available
+ sql.append("raw_data");
+ }
+
+ sql.append(", action FROM ")
+ .append(database)
+ .append(".")
+ // In the new approach, we don't have streamName in this class
+ // The stream name should be passed from DatabendSink or
retrieved differently
+ .append(streamName) // Placeholder, will be replaced properly
+ .append(" QUALIFY ROW_NUMBER() OVER(PARTITION BY ")
+ .append(databendSinkConfig.getConflictKey())
+ .append(" ORDER BY add_time DESC) = 1) b ");
+
+ sql.append("ON a.")
+ .append(databendSinkConfig.getConflictKey())
+ .append(" = b.")
+ .append(databendSinkConfig.getConflictKey())
+ .append(" ");
+
+ sql.append("WHEN MATCHED AND b.action = 'update' THEN UPDATE * ");
+
+ if (databendSinkConfig.isEnableDelete()) {
+ sql.append("WHEN MATCHED AND b.action = 'delete' THEN DELETE ");
+ }
+
+ sql.append("WHEN NOT MATCHED AND b.action!='delete' THEN INSERT *");
+
+ return sql.toString();
+ }
+
+ @Override
+ public DatabendSinkAggregatedCommitInfo
combine(List<DatabendSinkCommitterInfo> commitInfos) {
+ // Just combine all commit infos into one aggregated commit info
+ // In the new approach, rawTableName and streamName are not needed here
+ return new DatabendSinkAggregatedCommitInfo(commitInfos, null, null);
+ }
+
+ @Override
+ public void abort(List<DatabendSinkAggregatedCommitInfo>
aggregatedCommitInfos)
+ throws IOException {
+ // In case of abort, we might want to clean up the raw table and stream
+ log.info("[Instance {}] Aborting Databend sink operations",
instanceId);
+ try {
+ if (isCdcMode && connection != null && !connection.isClosed()) {
+ // In the new approach, raw table and stream names are not
stored in this class
+ // Cleanup would need to be handled differently or at the
DatabendSink level
+ log.info(
+ "[Instance {}] CDC mode abort - cleanup handled at
DatabendSink level",
+ instanceId);
+ }
+ } catch (Exception e) {
+ log.warn(
+ "[Instance {}] Failed to clean up during abort: {}",
+ instanceId,
+ e.getMessage(),
+ e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (connection != null && !connection.isClosed()) {
+ connection.close();
+ }
+ } catch (SQLException e) {
+ throw new DatabendConnectorException(
+ DatabendConnectorErrorCode.CONNECT_FAILED,
+ "[Instance {}] Failed to close connection in
DatabendSinkAggregatedCommitter: "
+ + e.getMessage(),
+ e);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkCommitterInfo.java
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkCommitterInfo.java
new file mode 100644
index 0000000000..e5c88643ff
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkCommitterInfo.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.databend.sink;
+
+import java.io.Serializable;
+
+public class DatabendSinkCommitterInfo implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ // CDC related fields
+ private String rawTableName;
+ private String streamName;
+
+ public DatabendSinkCommitterInfo() {
+ // Default constructor
+ }
+
+ public DatabendSinkCommitterInfo(String rawTableName, String streamName) {
+ this.rawTableName = rawTableName;
+ this.streamName = streamName;
+ }
+
+ public String getRawTableName() {
+ return rawTableName;
+ }
+
+ public void setRawTableName(String rawTableName) {
+ this.rawTableName = rawTableName;
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public void setStreamName(String streamName) {
+ this.streamName = streamName;
+ }
+
+ @Override
+ public String toString() {
+ return "DatabendSinkCommitterInfo{"
+ + "rawTableName='"
+ + rawTableName
+ + '\''
+ + ", streamName='"
+ + streamName
+ + '\''
+ + '}';
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriter.java
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriter.java
index f18a4006b3..dc2da3133e 100644
---
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriter.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.databend.sink;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -25,6 +28,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import
org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventDispatcher;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -36,13 +40,17 @@ import
org.apache.seatunnel.connectors.seatunnel.databend.schema.SchemaChangeMan
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -52,7 +60,8 @@ import java.util.stream.Collectors;
@Slf4j
public class DatabendSinkWriter
- implements SinkWriter<SeaTunnelRow, Void, Void>,
SupportSchemaEvolutionSinkWriter {
+ implements SinkWriter<SeaTunnelRow, DatabendSinkCommitterInfo, Void>,
+ SupportSchemaEvolutionSinkWriter {
private final Connection connection;
private final Context context;
@@ -69,6 +78,17 @@ public class DatabendSinkWriter
private int batchCount = 0;
private DatabendSinkConfig databendSinkConfig;
+ // CDC related fields
+ // Note: In CDC mode, rawTableName and streamName are set by
DatabendSinkAggregatedCommitter
+ // The writer receives these values through the prepareCommit process
+ private boolean isCdcMode = false;
+ private String rawTableName;
+ private String streamName;
+ private String targetTableName;
+ private PreparedStatement cdcPreparedStatement;
+ private String conflictKey;
+ private boolean enableDelete;
+
public DatabendSinkWriter(
Context context,
Connection connection,
@@ -77,6 +97,8 @@ public class DatabendSinkWriter
String customSql,
String database,
String table,
+ String rawTableName,
+ String streamName,
int batchSize,
int executeTimeoutSec) {
this.context = context;
@@ -88,11 +110,25 @@ public class DatabendSinkWriter
this.tableSchema = catalogTable.getTableSchema();
this.sinkTablePath = TablePath.of(database, table);
+ // CDC mode check
+ this.isCdcMode = databendSinkConfig.isCdcMode();
+ if (databendSinkConfig.isCdcMode()) {
+ this.rawTableName = rawTableName;
+ this.streamName = streamName;
+ log.info("DatabendSinkWriter initialized in CDC mode with raw
table: {}", rawTableName);
+ } else {
+ log.info("DatabendSinkWriter initialized in traditional mode");
+ }
+ this.conflictKey = databendSinkConfig.getConflictKey();
+ this.enableDelete = databendSinkConfig.isEnableDelete();
+ this.targetTableName = table;
+
log.info("DatabendSinkWriter constructor - catalogTable: {}",
catalogTable);
log.info("DatabendSinkWriter constructor - tableSchema: {}",
tableSchema);
log.info(
"DatabendSinkWriter constructor - rowType: {}",
catalogTable.getSeaTunnelRowType());
log.info("DatabendSinkWriter constructor - target table path: {}",
sinkTablePath);
+ log.info("DatabendSinkWriter constructor - CDC mode: {}", isCdcMode);
// if custom SQL is provided, use it directly
if (customSql != null && !customSql.isEmpty()) {
@@ -110,46 +146,131 @@ public class DatabendSinkWriter
e);
}
} else {
- // use the catalog table schema to create the target table
- SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
- if (rowType == null || rowType.getFieldNames().length == 0) {
- throw new DatabendConnectorException(
- DatabendConnectorErrorCode.SCHEMA_NOT_FOUND,
- "Source table schema is empty or null");
- }
-
try {
- if (!tableExists(database, table)) {
+ if (isCdcMode) {
+ // In CDC mode, we don't create tables here, it's done in
AggregatedCommitter
+ // We'll get the raw table and stream names from the
committer via prepareCommit
log.info(
- "Target table {}.{} does not exist, creating with
source schema",
- database,
- table);
- createTable(database, table, rowType);
+ "CDC mode enabled, table creation will be handled
by AggregatedCommitter");
} else {
- log.info("Target table {}.{} exists, verifying schema",
database, table);
- verifyTableSchema(database, table, rowType);
+ // Traditional mode
+ initTraditionalMode(database, table);
}
} catch (SQLException e) {
throw new DatabendConnectorException(
DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
- "Failed to verify/create target table: " +
e.getMessage(),
+ "Failed to initialize sink writer: " + e.getMessage(),
e);
}
+ }
+ }
- this.insertSql = generateInsertSql(database, table, rowType);
- log.info("Generated insert SQL: {}", insertSql);
- try {
- this.schemaChangeManager = new
SchemaChangeManager(databendSinkConfig);
- this.preparedStatement =
connection.prepareStatement(insertSql);
- this.preparedStatement.setQueryTimeout(executeTimeoutSec);
- log.info("PreparedStatement created successfully");
- } catch (SQLException e) {
- throw new DatabendConnectorException(
- DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
- "Failed to prepare statement: " + e.getMessage(),
- e);
- }
+ private String getCurrentTimestamp() {
+ return
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"));
+ }
+
+ private void initializeCdcPreparedStatement() throws SQLException {
+ log.info("Initializing CDC PreparedStatement");
+
+ // In CDC mode, the rawTableName should be set by the
AggregatedCommitter
+ // If it's not set yet, we can't proceed with CDC operations
+ if (rawTableName == null || rawTableName.isEmpty()) {
+ throw new DatabendConnectorException(
+ DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
+ "Raw table name not set by AggregatedCommitter. Cannot
initialize CDC PreparedStatement.");
}
+
+ // Generate insert SQL for raw table
+ String insertRawSql =
generateInsertRawSql(sinkTablePath.getDatabaseName());
+
+ // Create the PreparedStatement
+ this.cdcPreparedStatement = connection.prepareStatement(insertRawSql);
+ this.cdcPreparedStatement.setQueryTimeout(executeTimeoutSec);
+
+ log.info("CDC PreparedStatement created successfully with SQL: {}",
insertRawSql);
+ }
+
+ private void initTraditionalMode(String database, String table) throws
SQLException {
+ // use the catalog table schema to create the target table
+ SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
+ if (rowType == null || rowType.getFieldNames().length == 0) {
+ throw new DatabendConnectorException(
+ DatabendConnectorErrorCode.SCHEMA_NOT_FOUND,
+ "Source table schema is empty or null");
+ }
+
+ this.insertSql = generateInsertSql(database, table, rowType);
+ log.info("Generated insert SQL: {}", insertSql);
+ try {
+ this.schemaChangeManager = new
SchemaChangeManager(databendSinkConfig);
+ this.preparedStatement = connection.prepareStatement(insertSql);
+ this.preparedStatement.setQueryTimeout(executeTimeoutSec);
+ log.info("PreparedStatement created successfully");
+ } catch (SQLException e) {
+ throw new DatabendConnectorException(
+ DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
+ "Failed to prepare statement: " + e.getMessage(),
+ e);
+ }
+ }
+
+ private String generateInsertRawSql(String database) {
+ return String.format(
+ "INSERT INTO %s.%s (id, table_name, raw_data, add_time,
action) VALUES (?, ?, ?, ?, ?)",
+ database, rawTableName);
+ }
+
+ private void performMerge() {
+ if (batchCount <= 0) {
+ log.debug("No data to merge, skipping");
+ return;
+ }
+
+ String mergeSql = generateMergeSql();
+ log.info("Executing MERGE INTO statement: {}", mergeSql);
+
+ try (Statement stmt = connection.createStatement()) {
+ stmt.execute(mergeSql);
+ log.info("Merge operation completed successfully");
+ batchCount = 0; // Reset batch count after successful merge
+ } catch (SQLException e) {
+ log.error("Failed to execute merge operation: {}", e.getMessage(),
e);
+ }
+ }
+
+ String generateMergeSql() {
+ StringBuilder sql = new StringBuilder();
+ sql.append(
+ String.format(
+ "MERGE INTO %s.%s a ",
sinkTablePath.getDatabaseName(), targetTableName));
+ sql.append(String.format("USING (SELECT "));
+
+ // Add all columns from raw_data
+ String[] fieldNames =
catalogTable.getSeaTunnelRowType().getFieldNames();
+ for (int i = 0; i < fieldNames.length; i++) {
+ if (i > 0) sql.append(", ");
+ sql.append(String.format("raw_data:%s as %s", fieldNames[i],
fieldNames[i]));
+ }
+
+ sql.append(", action FROM ")
+ .append(sinkTablePath.getDatabaseName())
+ .append(".")
+ .append(streamName)
+ .append(" QUALIFY ROW_NUMBER() OVER(PARTITION BY ")
+ .append(conflictKey)
+ .append(" ORDER BY add_time DESC) = 1) b ");
+
+ sql.append("ON a.").append(conflictKey).append(" =
b.").append(conflictKey).append(" ");
+
+ sql.append("WHEN MATCHED AND b.action = 'update' THEN UPDATE * ");
+
+ if (enableDelete) {
+ sql.append("WHEN MATCHED AND b.action = 'delete' THEN DELETE ");
+ }
+
+ sql.append("WHEN NOT MATCHED AND b.action!='delete' THEN INSERT *");
+
+ return sql.toString();
}
@Override
@@ -217,26 +338,12 @@ public class DatabendSinkWriter
return;
}
- if (preparedStatement == null) {
- log.info("PreparedStatement is null, initializing...");
- initializePreparedStatement(row);
- log.info("PreparedStatement initialized successfully");
- }
-
- boolean allFieldsNull = true;
- for (Object field : row.getFields()) {
- if (field != null) {
- allFieldsNull = false;
- break;
- }
- }
-
- if (allFieldsNull) {
- log.warn("All fields in row are null, skipping");
- return;
+ if (isCdcMode) {
+ processCdcRow(row);
+ } else {
+ processTraditionalRow(row);
}
- processRow(row);
batchCount++;
log.info("Batch count after adding row: {}", batchCount);
@@ -247,7 +354,7 @@ public class DatabendSinkWriter
}
} catch (Exception e) {
log.error("Failed to write row: {}", row, e);
- // tru to execute the remaining batch if any error occurs
+ // try to execute the remaining batch if any error occurs
try {
if (batchCount > 0) {
log.info("Attempting to execute remaining batch after
error");
@@ -263,6 +370,160 @@ public class DatabendSinkWriter
}
}
+ private void processCdcRow(SeaTunnelRow row) throws SQLException {
+ log.info("Processing CDC row with kind: {}", row.getRowKind());
+
+ String action = mapRowKindToAction(row.getRowKind());
+ if ("update_before".equals(action)) {
+ log.debug("UPDATE_BEFORE operation detected, skipping row");
+ return;
+ }
+
+ if ("delete".equals(action) && !enableDelete) {
+ log.debug("DELETE operation not allowed, skipping row");
+ return;
+ }
+
+ // Ensure cdcPreparedStatement is initialized
+ if (cdcPreparedStatement == null) {
+ log.info("CDC PreparedStatement is null, initializing...");
+ initializeCdcPreparedStatement();
+
+ // If it's still null, we need to throw an exception as we can't
proceed
+ if (cdcPreparedStatement == null) {
+ throw new DatabendConnectorException(
+ DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
+ "Failed to initialize CDC PreparedStatement. Raw table
name might not be set by AggregatedCommitter.");
+ }
+
+ log.info("CDC PreparedStatement initialized successfully");
+ }
+
+ // Get conflict key value
+ String conflictKeyValue = getConflictKeyValue(row);
+
+ // Convert row to JSON
+ String jsonData = convertRowToJson(row);
+
+ cdcPreparedStatement.setString(1, conflictKeyValue);
+ cdcPreparedStatement.setString(2, targetTableName);
+ cdcPreparedStatement.setString(3, jsonData);
+ cdcPreparedStatement.setTimestamp(4,
java.sql.Timestamp.valueOf(LocalDateTime.now()));
+ cdcPreparedStatement.setString(5, action);
+
+ cdcPreparedStatement.addBatch();
+ }
+
+ private void processTraditionalRow(SeaTunnelRow row) throws SQLException {
+ // Ensure preparedStatement is initialized
+ if (preparedStatement == null) {
+ log.info("PreparedStatement is null, initializing...");
+ initializePreparedStatement(row);
+
+ // If it's still null, we need to throw an exception as we can't
proceed
+ if (preparedStatement == null) {
+ throw new DatabendConnectorException(
+ DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
+ "Failed to initialize PreparedStatement.");
+ }
+
+ log.info("PreparedStatement initialized successfully");
+ }
+
+ boolean allFieldsNull = true;
+ for (Object field : row.getFields()) {
+ if (field != null) {
+ allFieldsNull = false;
+ break;
+ }
+ }
+
+ if (allFieldsNull) {
+ log.warn("All fields in row are null, skipping");
+ return;
+ }
+
+ processRow(row);
+ }
+
+ private String mapRowKindToAction(RowKind rowKind) {
+ switch (rowKind) {
+ case INSERT:
+ return "insert";
+ case UPDATE_AFTER:
+ return "update";
+ case DELETE:
+ return "delete";
+ }
+ return "update_before";
+ }
+
+ /**
+ * Get the value of the conflict key field from the row. This value will
be used as the ID in
+ * the raw table.
+ */
+ private String getConflictKeyValue(SeaTunnelRow row) {
+ String[] fieldNames =
catalogTable.getSeaTunnelRowType().getFieldNames();
+ int index = Arrays.asList(fieldNames).indexOf(conflictKey);
+
+ if (index >= 0 && index < row.getFields().length) {
+ Object value = row.getField(index);
+ if (value != null) {
+ return value.toString();
+ }
+ }
+
+ // This should not happen in a proper CDC setup where conflict key
values are always present
+ // If we reach here, it indicates a data issue
+ throw new IllegalArgumentException(
+ "Conflict key field '" + conflictKey + "' value is null or not
found in row");
+ }
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ private String convertRowToJson(SeaTunnelRow row) {
+ try {
+ ObjectNode jsonNode = objectMapper.createObjectNode();
+ String[] fieldNames =
catalogTable.getSeaTunnelRowType().getFieldNames();
+ Object[] fields = row.getFields();
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ String fieldName = fieldNames[i];
+ Object value = fields[i];
+
+ if (value == null) {
+ jsonNode.putNull(fieldName);
+ } else if (value instanceof String) {
+ jsonNode.put(fieldName, (String) value);
+ } else if (value instanceof Integer) {
+ jsonNode.put(fieldName, (Integer) value);
+ } else if (value instanceof Long) {
+ jsonNode.put(fieldName, (Long) value);
+ } else if (value instanceof Float) {
+ jsonNode.put(fieldName, (Float) value);
+ } else if (value instanceof Double) {
+ jsonNode.put(fieldName, (Double) value);
+ } else if (value instanceof Boolean) {
+ jsonNode.put(fieldName, (Boolean) value);
+ } else if (value instanceof BigDecimal) {
+ jsonNode.put(fieldName, (BigDecimal) value);
+ } else if (value instanceof java.sql.Timestamp) {
+ jsonNode.put(fieldName, value.toString());
+ } else if (value instanceof java.sql.Date) {
+ jsonNode.put(fieldName, value.toString());
+ } else if (value instanceof byte[]) {
+ jsonNode.put(fieldName,
Base64.getEncoder().encodeToString((byte[]) value));
+ } else {
+ jsonNode.put(fieldName, value.toString());
+ }
+ }
+
+ return objectMapper.writeValueAsString(jsonNode);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to convert row to JSON", e);
+ }
+ }
+
private void initializePreparedStatement(SeaTunnelRow row) throws
SQLException {
log.info("Initializing PreparedStatement based on row data");
@@ -423,16 +684,65 @@ public class DatabendSinkWriter
log.info("Added row to batch, current batch count: {}", batchCount +
1);
}
+ private void verifyRawTableData(String rawTableName, String database)
throws SQLException {
+ try (Statement stmt = connection.createStatement();
+ ResultSet rs =
+ stmt.executeQuery(
+ "SELECT COUNT(*), COUNT(DISTINCT raw_data:id)
FROM "
+ + database
+ + "."
+ + rawTableName)) {
+ if (rs.next()) {
+ log.info(
+ "Raw table sjh {} has {} total rows, {} unique ids",
+ rawTableName,
+ rs.getInt(1),
+ rs.getInt(2));
+ }
+ }
+
+ try (Statement stmt = connection.createStatement();
+ ResultSet dataRs =
+ stmt.executeQuery(
+ "SELECT raw_data, action, add_time FROM "
+ + database
+ + "."
+ + rawTableName
+ + " ORDER BY add_time"); ) {
+ while (dataRs.next()) {
+ log.info(
+ "Raw data : {}, action: {}, time: {}",
+ dataRs.getString(1),
+ dataRs.getString(2),
+ dataRs.getTimestamp(3));
+ }
+ }
+ }
+
private void executeBatch() {
if (batchCount > 0) {
try {
log.info("Executing batch of {} records", batchCount);
- int[] results = preparedStatement.executeBatch();
- int totalAffected = 0;
- for (int result : results) {
- totalAffected += result;
+ if (isCdcMode) {
+ int[] results = cdcPreparedStatement.executeBatch();
+ int totalAffected = 0;
+ for (int result : results) {
+ totalAffected += result;
+ }
+ log.info(
+ "CDC batch executed successfully, total affected
rows: {}",
+ totalAffected);
+ verifyRawTableData(rawTableName,
sinkTablePath.getDatabaseName());
+ } else {
+ int[] results = preparedStatement.executeBatch();
+ int totalAffected = 0;
+ for (int result : results) {
+ totalAffected += result;
+ }
+ log.info(
+ "Traditional batch executed successfully, total
affected rows: {}",
+ totalAffected);
}
- log.info("Batch executed successfully, total affected rows:
{}", totalAffected);
batchCount = 0;
} catch (SQLException e) {
log.error("Failed to execute batch", e);
@@ -447,11 +757,13 @@ public class DatabendSinkWriter
}
@Override
- public Optional<Void> prepareCommit() throws IOException {
+ public Optional<DatabendSinkCommitterInfo> prepareCommit() throws
IOException {
log.info("Preparing to commit, executing remaining batch");
executeBatch();
log.info("Commit prepared successfully");
- return Optional.empty();
+ // In the new approach, rawTableName and streamName are initialized in
DatabendSink
+ // We pass null values as they're not needed in the committer info
+ return Optional.of(new DatabendSinkCommitterInfo(null, null));
}
@Override
@@ -490,12 +802,30 @@ public class DatabendSinkWriter
public void close() throws IOException {
log.info("Closing DatabendSinkWriter");
try {
- if (preparedStatement != null) {
+ // Execute final batch before closing
+ if (batchCount > 0) {
log.info("Executing final batch before closing");
executeBatch();
+ }
+
+ // Perform final merge in CDC mode
+ if (isCdcMode) {
+ log.info("Performing final merge before closing");
+ performMerge();
+ }
+
+ // Close prepared statements
+ if (preparedStatement != null) {
log.info("Closing PreparedStatement");
preparedStatement.close();
}
+
+ if (cdcPreparedStatement != null) {
+ log.info("Closing CDC PreparedStatement");
+ cdcPreparedStatement.close();
+ }
+
+ // Close connection
if (connection != null) {
if (!connection.getAutoCommit()) {
log.info("Committing transaction");
@@ -504,6 +834,7 @@ public class DatabendSinkWriter
log.info("Closing connection");
connection.close();
}
+
log.info("DatabendSinkWriter closed successfully");
} catch (SQLException e) {
log.error("Failed to close DatabendSinkWriter", e);
@@ -634,4 +965,29 @@ public class DatabendSinkWriter
return "VARCHAR"; // default use VARCHAR
}
}
+
+ // Package-private methods for testing
+ String getConflictKey() {
+ return conflictKey;
+ }
+
+ TablePath getSinkTablePath() {
+ return sinkTablePath;
+ }
+
+ String getRawTableName() {
+ return rawTableName;
+ }
+
+ String getStreamName() {
+ return streamName;
+ }
+
+ boolean isEnableDelete() {
+ return enableDelete;
+ }
+
+ CatalogTable getCatalogTable() {
+ return catalogTable;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-databend/src/test/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriterTest.java
b/seatunnel-connectors-v2/connector-databend/src/test/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriterTest.java
new file mode 100644
index 0000000000..9fa07b5c62
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-databend/src/test/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriterTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.databend.sink;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class DatabendSinkWriterTest {
+
+ @Test
+ public void testGenerateMergeSql() throws Exception {
+ // Create a mock DatabendSinkWriter
+ DatabendSinkWriter sinkWriter = mock(DatabendSinkWriter.class);
+
+ // Set up the real method to test
+ when(sinkWriter.generateMergeSql()).thenCallRealMethod();
+
+ // Use reflection to set private fields
+ setPrivateField(sinkWriter, "conflictKey", "id");
+ setPrivateField(sinkWriter, "sinkTablePath", TablePath.of("test_db",
"target_table"));
+ setPrivateField(sinkWriter, "streamName", "cdc_stream");
+ setPrivateField(sinkWriter, "enableDelete", true);
+ setPrivateField(sinkWriter, "targetTableName", "target_table");
+
+ // Mock catalogTable
+ org.apache.seatunnel.api.table.catalog.CatalogTable catalogTable =
+
mock(org.apache.seatunnel.api.table.catalog.CatalogTable.class);
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"id", "name", "score"},
+ new
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+ BasicType.INT_TYPE, BasicType.STRING_TYPE,
BasicType.DOUBLE_TYPE
+ });
+ when(catalogTable.getSeaTunnelRowType()).thenReturn(rowType);
+ setPrivateField(sinkWriter, "catalogTable", catalogTable);
+
+ // Call the method
+ String mergeSql = sinkWriter.generateMergeSql();
+
+ // Expected SQL
+ String expectedSql =
+ "MERGE INTO test_db.target_table a "
+ + "USING (SELECT raw_data:id as id, raw_data:name as
name, raw_data:score as score, action "
+ + "FROM test_db.cdc_stream "
+ + "QUALIFY ROW_NUMBER() OVER(PARTITION BY id ORDER BY
add_time DESC) = 1) b "
+ + "ON a.id = b.id "
+ + "WHEN MATCHED AND b.action = 'update' THEN UPDATE * "
+ + "WHEN MATCHED AND b.action = 'delete' THEN DELETE "
+ + "WHEN NOT MATCHED AND b.action!='delete' THEN INSERT
*";
+
+ assertEquals(expectedSql, mergeSql);
+ }
+
+ @Test
+ public void testGenerateMergeSqlWithoutDelete() throws Exception {
+ // Create a mock DatabendSinkWriter
+ DatabendSinkWriter sinkWriter = mock(DatabendSinkWriter.class);
+
+ // Set up the real method to test
+ when(sinkWriter.generateMergeSql()).thenCallRealMethod();
+
+ // Use reflection to set private fields
+ setPrivateField(sinkWriter, "conflictKey", "id");
+ setPrivateField(sinkWriter, "sinkTablePath", TablePath.of("test_db",
"target_table"));
+ setPrivateField(sinkWriter, "streamName", "cdc_stream");
+ setPrivateField(sinkWriter, "enableDelete", false);
+ setPrivateField(sinkWriter, "targetTableName", "target_table");
+
+ // Mock catalogTable
+ org.apache.seatunnel.api.table.catalog.CatalogTable catalogTable =
+
mock(org.apache.seatunnel.api.table.catalog.CatalogTable.class);
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"id", "name", "score"},
+ new
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+ BasicType.INT_TYPE, BasicType.STRING_TYPE,
BasicType.DOUBLE_TYPE
+ });
+ when(catalogTable.getSeaTunnelRowType()).thenReturn(rowType);
+ setPrivateField(sinkWriter, "catalogTable", catalogTable);
+
+ // Call the method
+ String mergeSql = sinkWriter.generateMergeSql();
+
+ // Expected SQL without DELETE clause
+ String expectedSql =
+ "MERGE INTO test_db.target_table a "
+ + "USING (SELECT raw_data:id as id, raw_data:name as
name, raw_data:score as score, action "
+ + "FROM test_db.cdc_stream "
+ + "QUALIFY ROW_NUMBER() OVER(PARTITION BY id ORDER BY
add_time DESC) = 1) b "
+ + "ON a.id = b.id "
+ + "WHEN MATCHED AND b.action = 'update' THEN UPDATE * "
+ + "WHEN NOT MATCHED AND b.action!='delete' THEN INSERT
*";
+
+ assertEquals(expectedSql, mergeSql);
+ }
+
+ @Test
+ public void testGetConflictKeyValue() throws Exception {
+ // Create a mock DatabendSinkWriter
+ DatabendSinkWriter sinkWriter = mock(DatabendSinkWriter.class);
+
+ // Get the method to test
+ Method method =
+ DatabendSinkWriter.class.getDeclaredMethod(
+ "getConflictKeyValue", SeaTunnelRow.class);
+ method.setAccessible(true);
+
+ // Mock catalogTable
+ org.apache.seatunnel.api.table.catalog.CatalogTable catalogTable =
+
mock(org.apache.seatunnel.api.table.catalog.CatalogTable.class);
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"id", "name", "score"},
+ new
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+ BasicType.INT_TYPE, BasicType.STRING_TYPE,
BasicType.DOUBLE_TYPE
+ });
+ when(catalogTable.getSeaTunnelRowType()).thenReturn(rowType);
+ setPrivateField(sinkWriter, "catalogTable", catalogTable);
+
+ // Create test row
+ Object[] fields = {1, "test", 95.5};
+ SeaTunnelRow row = new SeaTunnelRow(fields);
+
+ // Set conflict key
+ setPrivateField(sinkWriter, "conflictKey", "id");
+
+ // Call the method
+ String conflictKeyValue = (String) method.invoke(sinkWriter, row);
+
+ // Expected value - should be 1
+ assertEquals("1", conflictKeyValue);
+ }
+
+ @Test
+ public void testGetConflictKeyValueWithNullValue() throws Exception {
+ // Create a mock DatabendSinkWriter
+ DatabendSinkWriter sinkWriter = mock(DatabendSinkWriter.class);
+
+ // Get the method to test
+ Method method =
+ DatabendSinkWriter.class.getDeclaredMethod(
+ "getConflictKeyValue", SeaTunnelRow.class);
+ method.setAccessible(true);
+
+ // Mock catalogTable
+ org.apache.seatunnel.api.table.catalog.CatalogTable catalogTable =
+
mock(org.apache.seatunnel.api.table.catalog.CatalogTable.class);
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"id", "name", "score"},
+ new
org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
+ BasicType.INT_TYPE, BasicType.STRING_TYPE,
BasicType.DOUBLE_TYPE
+ });
+ when(catalogTable.getSeaTunnelRowType()).thenReturn(rowType);
+ setPrivateField(sinkWriter, "catalogTable", catalogTable);
+
+ // Create test row with null conflict key value
+ Object[] fields = {null, "test", 95.5};
+ SeaTunnelRow row = new SeaTunnelRow(fields);
+
+ // Set conflict key
+ setPrivateField(sinkWriter, "conflictKey", "id");
+
+ // Call the method - should throw IllegalArgumentException wrapped in
+ // InvocationTargetException
+ InvocationTargetException exception =
+ assertThrows(
+ InvocationTargetException.class,
+ () -> {
+ method.invoke(sinkWriter, row);
+ });
+
+ // Verify the cause is IllegalArgumentException
+ assertEquals(IllegalArgumentException.class,
exception.getCause().getClass());
+ }
+
+ // Helper method to set private fields using reflection
+ private void setPrivateField(Object target, String fieldName, Object
value) throws Exception {
+ Field field = target.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-databend-e2e/src/test/java/org/apache/seatunnel/e2e/connector/databend/DatabendCDCSinkIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-databend-e2e/src/test/java/org/apache/seatunnel/e2e/connector/databend/DatabendCDCSinkIT.java
new file mode 100644
index 0000000000..c837dbeb61
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-databend-e2e/src/test/java/org/apache/seatunnel/e2e/connector/databend/DatabendCDCSinkIT.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.databend;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.databend.DatabendContainer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+public class DatabendCDCSinkIT extends TestSuiteBase implements TestResource {
+ private static final Logger LOG =
LoggerFactory.getLogger(DatabendCDCSinkIT.class);
+ private static final String DATABEND_DOCKER_IMAGE =
"datafuselabs/databend:nightly";
+ private static final String DATABEND_CONTAINER_HOST = "databend";
+ private static final int PORT = 8000;
+ private static final int LOCAL_PORT = 8000;
+ private static final String DATABASE = "default";
+ private static final String SINK_TABLE = "sink_table";
+ private DatabendContainer container;
+ private GenericContainer<?> minioContainer;
+ private Connection connection;
+
+ @TestTemplate
+ public void testDatabendSinkCDC(TestContainer container) throws Exception {
+ // Run the CDC test job
+ Container.ExecResult execResult =
+ container.executeJob("/databend/fake_to_databend_cdc.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+
+ // Wait for the merge operation to complete
+ // Increased wait time to ensure merge operations finish
+ Thread.sleep(10000);
+
+ // Verify the sink results
+ try (Statement statement = connection.createStatement()) {
+
+ // First check how many records we have
+ try (ResultSet countRs =
+ statement.executeQuery("SELECT COUNT(*) as count FROM
sink_table")) {
+ if (countRs.next()) {
+ int count = countRs.getInt("count");
+ LOG.info("Found {} records in sink_table", count);
+ }
+ }
+
+ // Then get all records for debugging
+ try (ResultSet allRs = statement.executeQuery("SELECT * FROM
sink_table ORDER BY id")) {
+ LOG.info("All records in sink_table:");
+ while (allRs.next()) {
+ LOG.info(
+ "Record: id={}, name={}, position={}, age={},
score={}",
+ allRs.getInt("id"),
+ allRs.getString("name"),
+ allRs.getString("position"),
+ allRs.getInt("age"),
+ allRs.getDouble("score"));
+ }
+ }
+
+ // Finally check with expected results
+ try (ResultSet resultSet =
+ statement.executeQuery("SELECT * FROM sink_table ORDER BY
id")) {
+
+ List<List<Object>> expectedRecords =
+ Arrays.asList(
+ Arrays.asList(1, "Alice", "Engineer", 30,
95.5),
+ Arrays.asList(3, "Charlie", "Engineer", 35,
92.5),
+ Arrays.asList(4, "David", "Designer", 28,
88.0));
+
+ List<List<Object>> actualRecords = new ArrayList<>();
+
+ while (resultSet.next()) {
+ List<Object> row = new ArrayList<>();
+ row.add(resultSet.getInt("id"));
+ row.add(resultSet.getString("name"));
+ row.add(resultSet.getString("position"));
+ row.add(resultSet.getInt("age"));
+ row.add(resultSet.getDouble("score"));
+ actualRecords.add(row);
+ }
+
+ LOG.info("Expected records: {}", expectedRecords);
+ LOG.info("Actual records: {}", actualRecords);
+
+ Assertions.assertEquals(
+ expectedRecords.size(),
+ actualRecords.size(),
+ "Record count mismatch. Expected: "
+ + expectedRecords.size()
+ + ", Actual: "
+ + actualRecords.size());
+ for (int i = 0; i < expectedRecords.size(); i++) {
+ Assertions.assertEquals(
+ expectedRecords.get(i),
+ actualRecords.get(i),
+ "Record at index " + i + " does not match");
+ }
+ }
+ }
+ clearSinkTable();
+ }
+
+ private void clearSinkTable() throws SQLException {
+ try (Statement statement = connection.createStatement()) {
+ statement.execute("TRUNCATE TABLE sink_table");
+ }
+ }
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ this.minioContainer =
+ new GenericContainer<>("minio/minio:latest")
+ .withNetwork(NETWORK)
+ .withNetworkAliases("minio")
+ .withEnv("MINIO_ROOT_USER", "minioadmin")
+ .withEnv("MINIO_ROOT_PASSWORD", "minioadmin")
+ .withCommand("server", "/data")
+ .withExposedPorts(9000);
+
+ this.minioContainer.setWaitStrategy(
+
Wait.defaultWaitStrategy().withStartupTimeout(Duration.ofSeconds(60)));
+
+
this.minioContainer.setPortBindings(Lists.newArrayList(String.format("%s:%s",
9000, 9000)));
+
+ this.minioContainer.start();
+
+ LOG.info("MinIO container starting,wait 5 secs ...");
+ Thread.sleep(5000);
+
+ boolean bucketCreated = createMinIOBucketWithAWSSDK("databend");
+ if (!bucketCreated) {
+ LOG.warn("can't make sure MinIO bucket create success,continue to
start Databend");
+ }
+ this.container =
+ new DatabendContainer(DATABEND_DOCKER_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(DATABEND_CONTAINER_HOST)
+ .withUsername("root")
+ .withPassword("")
+ .withEnv("STORAGE_TYPE", "s3")
+ .withEnv("STORAGE_S3_ENDPOINT_URL",
"http://minio:9000")
+ .withEnv("STORAGE_S3_ACCESS_KEY_ID", "minioadmin")
+ .withEnv("STORAGE_S3_SECRET_ACCESS_KEY", "minioadmin")
+ .withEnv("STORAGE_S3_BUCKET", "databend")
+ .withEnv("STORAGE_S3_REGION", "us-east-1")
+ .withEnv("STORAGE_S3_ENABLE_VIRTUAL_HOST_STYLE",
"false")
+ .withEnv("STORAGE_S3_FORCE_PATH_STYLE", "true")
+ .withUrlParam("ssl", "false");
+
+ this.container.setPortBindings(
+ Lists.newArrayList(
+ String.format(
+ "%s:%s", LOCAL_PORT, PORT) // host 8000 map to
container port 8000
+ ));
+
+ Startables.deepStart(Stream.of(this.container)).join();
+ LOG.info("Databend container started");
+ Awaitility.given()
+ .ignoreExceptions()
+ .atMost(360, TimeUnit.SECONDS)
+ .untilAsserted(this::initConnection);
+
+ this.initializeDatabendTable();
+ }
+
+ private void initializeDatabendTable() {
+ try (Statement statement = connection.createStatement(); ) {
+ // Create sink table
+ String createTableSql =
+ "CREATE TABLE IF NOT EXISTS sink_table ("
+ + " id INT, "
+ + " name STRING, "
+ + " position STRING, "
+ + " age INT, "
+ + " score DOUBLE"
+ + ")";
+ statement.execute(createTableSql);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing Databend table failed!",
e);
+ }
+ }
+
+ /**
+ * using AWS SDK create MinIO bucket
+ *
+ * @param bucketName bucket
+ * @return success or not
+ */
+ private boolean createMinIOBucketWithAWSSDK(String bucketName) {
+ try {
+ LOG.info("using AWS SDK to create MinIO bucket: {}", bucketName);
+
+ AwsClientBuilder.EndpointConfiguration endpointConfig =
+ new AwsClientBuilder.EndpointConfiguration(
+ "http://localhost:9000", "us-east-1");
+
+ AWSCredentials credentials = new BasicAWSCredentials("minioadmin",
"minioadmin");
+ AWSCredentialsProvider credentialsProvider =
+ new AWSStaticCredentialsProvider(credentials);
+
+ AmazonS3 s3Client =
+ AmazonS3ClientBuilder.standard()
+ .withEndpointConfiguration(endpointConfig)
+ .withCredentials(credentialsProvider)
+ .withPathStyleAccessEnabled(true)
+ .disableChunkedEncoding()
+ .build();
+
+ boolean bucketExists = s3Client.doesBucketExistV2(bucketName);
+ if (bucketExists) {
+ LOG.info("bucket {} exist,no need to create", bucketName);
+ return true;
+ }
+
+ s3Client.createBucket(bucketName);
+ LOG.info("create MinIO bucket success: {}", bucketName);
+ return true;
+ } catch (Exception e) {
+ LOG.error("using AWS SDK to create MinIO failed", e);
+ return false;
+ }
+ }
+
+ // private synchronized Connection getConnection() throws SQLException {
+ // if (this.connection == null || this.connection.isClosed()) {
+ // LOG.info("Creating new database connection");
+ // final Properties info = new Properties();
+ // info.put("user", "root");
+ // info.put("password", "");
+ //
+ // String jdbcUrl =
+ // String.format(
+ // "jdbc:databend://%s:%d/%s?ssl=false",
+ // container.getHost(),
container.getMappedPort(8000), DATABASE);
+ //
+ // this.connection = DriverManager.getConnection(jdbcUrl, info);
+ // }
+ // return this.connection;
+ // }
+
+ private void initConnection()
+ throws SQLException, ClassNotFoundException,
InstantiationException,
+ IllegalAccessException {
+ final Properties info = new Properties();
+ info.put("user", "root"); // Default Databend user
+ info.put("password", ""); // Default Databend password is empty
+ System.out.println("maped port is: " + container.getMappedPort(8000));
+ System.out.println("mapped host: is: " + container.getHost());
+
+ String jdbcUrl =
+ String.format(
+ "jdbc:databend://%s:%d/%s?ssl=false",
+ container.getHost(), container.getMappedPort(8000),
DATABASE);
+
+ this.connection = DriverManager.getConnection(jdbcUrl, info);
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+
+ if (this.connection != null) {
+ try {
+ this.connection.close();
+ LOG.info("Database connection closed");
+
+ this.connection = null;
+ } catch (SQLException e) {
+ LOG.error("Error closing database connection", e);
+ }
+ }
+
+ // Add a longer sleep to ensure all heartbeat threads are properly
terminated
+ Thread.sleep(10000);
+
+ if (this.container != null) {
+ this.container.stop();
+ LOG.info("Container stopped");
+ }
+
+ if (this.minioContainer != null) {
+ this.minioContainer.stop();
+ LOG.info("MinIO container stopped");
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-databend-e2e/src/test/resources/databend/fake_to_databend_cdc.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-databend-e2e/src/test/resources/databend/fake_to_databend_cdc.conf
new file mode 100644
index 0000000000..9a9eed6f9f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-databend-e2e/src/test/resources/databend/fake_to_databend_cdc.conf
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 2
+ job.mode = "BATCH"
+ checkpoint.interval = 1000
+}
+
+source {
+ FakeSource {
+ row.num = 10
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ position = "string"
+ age = "int"
+ score = "double"
+ }
+ }
+
+ # CDC data with different row kinds
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "Alice", "Engineer", 30, 95.5]
+ },
+ {
+ kind = INSERT
+ fields = [2, "Bob", "Developer", 25, 85.0]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [2, "Bob", "Developer", 25, 85.0]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [2, "Bob", "Senior Developer", 25, 87.0]
+ },
+ {
+ kind = INSERT
+ fields = [3, "Charlie", "Engineer", 35, 92.5]
+ },
+ {
+ kind = INSERT
+ fields = [4, "David", "Designer", 28, 88.0]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [2, "Bob", "Senior Developer", 25, 87.0]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [2, "Bob", "Tech Lead", 25, 90.0]
+ },
+ {
+ kind = DELETE
+ fields = [2, "Bob", "Tech Lead", 25, 90.0]
+ }
+ ]
+ }
+}
+
+sink {
+ Databend {
+ url = "jdbc:databend://databend:8000/default?ssl=false"
+ username = "root"
+ password = ""
+ database = "default"
+ table = "sink_table"
+
+ # Enable CDC mode
+ batch_size = 1
+ conflict_key = "id"
+ enable_delete = true
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 354a408745..2c1ef8076a 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -418,6 +418,7 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
|| s.contains(
"org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner")
|| s.startsWith("Log4j2-TF-")
+ || s.startsWith("heartbeat") // Add heartbeat threads as
system threads
|| aqsThread.matcher(s).matches()
// The renewed background thread of the hdfs client
|| s.startsWith("LeaseRenewer")