This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new eebee8785b [Feature][Connector-V2] add fluss Connector (#10121)
eebee8785b is described below
commit eebee8785ba366e6d23037287a024011195878ff
Author: dyp12 <[email protected]>
AuthorDate: Mon Dec 29 10:35:51 2025 +0800
[Feature][Connector-V2] add fluss Connector (#10121)
---
docs/en/connector-v2/changelog/connector-fluss.md | 6 +
docs/en/connector-v2/sink/Fluss.md | 352 ++++++++++++++++++
docs/zh/connector-v2/changelog/connector-fluss.md | 6 +
docs/zh/connector-v2/sink/Fluss.md | 351 ++++++++++++++++++
plugin-mapping.properties | 1 +
seatunnel-connectors-v2/connector-fluss/pom.xml | 51 +++
.../seatunnel/fluss/config/FlussBaseOptions.java | 49 +++
.../seatunnel/fluss/config/FlussSinkOptions.java | 20 +
.../connectors/seatunnel/fluss/sink/FlussSink.java | 58 +++
.../seatunnel/fluss/sink/FlussSinkFactory.java | 53 +++
.../seatunnel/fluss/sink/FlussSinkWriter.java | 228 ++++++++++++
seatunnel-connectors-v2/pom.xml | 1 +
seatunnel-dist/pom.xml | 7 +
.../connector-fluss-e2e/pom.xml | 82 +++++
.../seatunnel/e2e/connector/fluss/FlussSinkIT.java | 402 +++++++++++++++++++++
.../src/test/resources/fake_to_fluss.conf | 96 +++++
.../resources/fake_to_multipletable_fluss.conf | 200 ++++++++++
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
.../seatunnel/format/json/JsonToRowConverters.java | 13 +
.../seatunnel/format/json/RowToJsonConverters.java | 10 +
.../format/json/JsonRowDataSerDeSchemaTest.java | 47 ++-
21 files changed, 2020 insertions(+), 14 deletions(-)
diff --git a/docs/en/connector-v2/changelog/connector-fluss.md
b/docs/en/connector-v2/changelog/connector-fluss.md
new file mode 100644
index 0000000000..97ff142816
--- /dev/null
+++ b/docs/en/connector-v2/changelog/connector-fluss.md
@@ -0,0 +1,6 @@
+<details><summary> Change Log </summary>
+
+| Change | Commit | Version |
+|--------|--------|---------|
+
+</details>
diff --git a/docs/en/connector-v2/sink/Fluss.md
b/docs/en/connector-v2/sink/Fluss.md
new file mode 100644
index 0000000000..06f70d0a27
--- /dev/null
+++ b/docs/en/connector-v2/sink/Fluss.md
@@ -0,0 +1,352 @@
+import ChangeLog from '../changelog/connector-fluss.md';
+
+# Fluss
+
+> Fluss sink connector
+
+## Support These Engines
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## Key Features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
+- [x] [support multiple table write](../../concept/connector-v2-features.md)
+
+## Description
+
+Used to send data to Fluss. Both support streaming and batch mode.
+
+## Using Dependency
+ <dependency>
+ <groupId>com.alibaba.fluss</groupId>
+ <artifactId>fluss-client</artifactId>
+ <version>0.7.0</version>
+ </dependency>
+
+## Sink Options
+
+| Name | Type | Required | Default | Description
|
+|-------------------|--------|----------|---------|-------------------------------------------------------------------------------------------------------------|
+| bootstrap.servers | string | yes | - | The bootstrap servers for
the Fluss sink connection.
|
+| database | string | no | - | The name of Fluss
database, If not set, the table name will be the name of the upstream db
|
+| table | string | no | - | The name of Fluss table,
If not set, the table name will be the name of the upstream table
|
+| client.config | Map | no | - | set other client config.
Please refer to
https://fluss.apache.org/docs/engine-flink/options/#other-options |
+
+
+### database [string]
+
+The name of Fluss database, If not set, the table name will be the name of the
upstream db
+
+for example:
+
+1. test_${schema_name}_test
+2. sink_sinkdb
+3. ss_${database_name}
+
+
+### table [string]
+
+The name of Fluss table, If not set, the table name will be the name of the
upstream table
+
+for example:
+1. test_${table_name}_test
+2. sink_sinktable
+3. ss_${table_name}
+
+
+## Data Type Mapping
+
+| StarRocks Data type | Fluss Data type |
+|---------------------|-----------------|
+| BOOLEAN | BOOLEAN |
+| TINYINT | TINYINT |
+| SMALLINT | SMALLINT |
+| INT | INT |
+| BIGINT | BIGINT |
+| FLOAT | FLOAT |
+| DOUBLE | DOUBLE |
+| DOUBLE | DOUBLE |
+| BYTES | BYTES |
+| DATE | DATE |
+| TIME | TIME |
+| TIMESTAMP | TIMESTAMP |
+| TIMESTAMP_TZ | TIMESTAMP_TZ |
+| STRING | STRING |
+
+## Task Example
+
+### Simple
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ parallelism = 1
+ tables_configs = [
+ {
+ row.num = 7
+ schema {
+ table = "test.table1"
+ fields {
+ fbytes = bytes
+ fboolean = boolean
+ fint = int
+ ftinyint = tinyint
+ fsmallint = smallint
+ fbigint = bigint
+ ffloat = float
+ fdouble = double
+ fdecimal = "decimal(30, 8)"
+ fstring = string
+ fdate = date
+ ftime = time
+ ftimestamp = timestamp
+ ftimestamp_ltz = timestamp_tz
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 1940337748, 73, 17489,
7408919466156976747, 9.434991E37, 3.140411637757371E307,
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10",
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169,
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb",
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = DELETE
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_AFTER
+ fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856,
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd",
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+ }
+ ]
+ }
+ ]
+}
+}
+
+transform {
+}
+
+sink {
+ Fluss {
+ bootstrap.servers="fluss_coordinator_e2e:9123"
+ database = "fluss_db_${database_name}"
+ table = "fluss_tb_${table_name}"
+ }
+}
+```
+
+### Multiple table
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ parallelism = 1
+ tables_configs = [
+ {
+ row.num = 7
+ schema {
+ table = "test2.table1"
+ fields {
+ fbytes = bytes
+ fboolean = boolean
+ fint = int
+ ftinyint = tinyint
+ fsmallint = smallint
+ fbigint = bigint
+ ffloat = float
+ fdouble = double
+ fdecimal = "decimal(30, 8)"
+ fstring = string
+ fdate = date
+ ftime = time
+ ftimestamp = timestamp
+ ftimestamp_ltz = timestamp_tz
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 1940337748, 73, 17489,
7408919466156976747, 9.434991E37, 3.140411637757371E307,
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10",
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169,
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb",
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = DELETE
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_AFTER
+ fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856,
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd",
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+ }
+ ]
+ },
+ {
+ row.num = 7
+ schema {
+ table = "test2.table2"
+ fields {
+ fbytes = bytes
+ fboolean = boolean
+ fint = int
+ ftinyint = tinyint
+ fsmallint = smallint
+ fbigint = bigint
+ ffloat = float
+ fdouble = double
+ fdecimal = "decimal(30, 8)"
+ fstring = string
+ fdate = date
+ ftime = time
+ ftimestamp = timestamp
+ ftimestamp_ltz = timestamp_tz
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 1940337748, 73, 17489,
7408919466156976747, 9.434991E37, 3.140411637757371E307,
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10",
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169,
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb",
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = DELETE
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_AFTER
+ fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856,
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd",
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+ }
+ ]
+ },
+ {
+ row.num = 7
+ schema {
+ table = "test3.table3"
+ fields {
+ fbytes = bytes
+ fboolean = boolean
+ fint = int
+ ftinyint = tinyint
+ fsmallint = smallint
+ fbigint = bigint
+ ffloat = float
+ fdouble = double
+ fdecimal = "decimal(30, 8)"
+ fstring = string
+ fdate = date
+ ftime = time
+ ftimestamp = timestamp
+ ftimestamp_ltz = timestamp_tz
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 1940337748, 73, 17489,
7408919466156976747, 9.434991E37, 3.140411637757371E307,
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10",
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169,
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb",
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = DELETE
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_AFTER
+ fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856,
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd",
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+ }
+ ]
+ }
+ ]
+}
+}
+
+transform {
+}
+
+sink {
+ Fluss {
+ bootstrap.servers="fluss_coordinator_e2e:9123"
+ database = "fluss_db_${database_name}"
+ table = "fluss_tb_${table_name}"
+ }
+}
+```
+
+
+## Changelog
+
+<ChangeLog />
+
diff --git a/docs/zh/connector-v2/changelog/connector-fluss.md
b/docs/zh/connector-v2/changelog/connector-fluss.md
new file mode 100644
index 0000000000..97ff142816
--- /dev/null
+++ b/docs/zh/connector-v2/changelog/connector-fluss.md
@@ -0,0 +1,6 @@
+<details><summary> Change Log </summary>
+
+| Change | Commit | Version |
+|--------|--------|---------|
+
+</details>
diff --git a/docs/zh/connector-v2/sink/Fluss.md
b/docs/zh/connector-v2/sink/Fluss.md
new file mode 100644
index 0000000000..bc32f8db8f
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Fluss.md
@@ -0,0 +1,351 @@
+import ChangeLog from '../changelog/connector-fluss.md';
+
+# Fluss
+
+> Fluss 数据接收器
+
+## 引擎支持
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## 主要特性
+
+- [ ] [精准一次](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
+- [x] [支持多表写入](../../concept/connector-v2-features.md)
+
+## 描述
+
+该接收器用于将数据写入到Fluss中。支持批和流两种模式。
+
+## 依赖
+ <dependency>
+ <groupId>com.alibaba.fluss</groupId>
+ <artifactId>fluss-client</artifactId>
+ <version>0.7.0</version>
+ </dependency>
+
+
+## 接收器选项
+
+| 名称 | 类型 | 是否必须 | 默认值 | Description
|
+|-------------------|--------|------|-----|----------------------------------------------------------------------------------|
+| bootstrap.servers | string | yes | - | fluss 集群地址
|
+| database | string | no | - | 指定目标 Fluss 表所在的数据库的名称,
如果没有设置该值,则表名与上游库名相同 |
+| table | string | no | - | 指定目标 Fluss 表的名称,
如果没有设置该值,则表名与上游表名相同 |
+| client.config | Map | no | - | 设置其他客户端配置. 参考
https://fluss.apache.org/docs/engine-flink/options/#other-options |
+
+
+### database [string]
+
+database选项参数可以填入一任意库名,这个名字最终会被用作目标表的库名,并且支持变量(`${database_name}`,`${schema_name}`)。
+替换规则如下:`${schema_name}` 将替换传递给目标端的 SCHEMA 名称,`${database_name}` 将替换传递给目标端的库名。
+
+例如:
+1. test_${schema_name}_test
+2. sink_sinkdb
+3. ss_${database_name}
+
+
+### table [string]
+
+table选项参数可以填入一任意表名,这个名字最终会被用作目标表的表名,并且支持变量(`${table_name}`,`${schema_name}`)。
+替换规则如下:`${schema_name}` 将替换传递给目标端的 SCHEMA 名称,`${table_name}` 将替换传递给目标端的表名。
+
+例如:
+1. test_${schema_name}_test
+2. sink_sinktable
+3. ss_${table_name}
+
+## 数据类型映射
+
+| FLuss数据类型 | SeaTunnel数据类型 |
+|--------------|---------------|
+| BOOLEAN | BOOLEAN |
+| TINYINT | TINYINT |
+| SMALLINT | SMALLINT |
+| INT | INT |
+| BIGINT | BIGINT |
+| FLOAT | FLOAT |
+| DOUBLE | DOUBLE |
+| DOUBLE | DOUBLE |
+| BYTES | BYTES |
+| DATE | DATE |
+| TIME | TIME |
+| TIMESTAMP | TIMESTAMP |
+| TIMESTAMP_TZ | TIMESTAMP_TZ |
+| STRING | STRING |
+
+
+## 任务示例
+
+### 简单示例
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ parallelism = 1
+ tables_configs = [
+ {
+ row.num = 7
+ schema {
+ table = "test.table1"
+ fields {
+ fbytes = bytes
+ fboolean = boolean
+ fint = int
+ ftinyint = tinyint
+ fsmallint = smallint
+ fbigint = bigint
+ ffloat = float
+ fdouble = double
+ fdecimal = "decimal(30, 8)"
+ fstring = string
+ fdate = date
+ ftime = time
+ ftimestamp = timestamp
+ ftimestamp_ltz = timestamp_tz
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 1940337748, 73, 17489,
7408919466156976747, 9.434991E37, 3.140411637757371E307,
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10",
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169,
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb",
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = DELETE
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_AFTER
+ fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856,
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd",
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+ }
+ ]
+ }
+ ]
+}
+}
+
+transform {
+}
+
+sink {
+ Fluss {
+ bootstrap.servers="fluss_coordinator_e2e:9123"
+ database = "fluss_db_${database_name}"
+ table = "fluss_tb_${table_name}"
+ }
+}
+```
+### 多表写入
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ parallelism = 1
+ tables_configs = [
+ {
+ row.num = 7
+ schema {
+ table = "test2.table1"
+ fields {
+ fbytes = bytes
+ fboolean = boolean
+ fint = int
+ ftinyint = tinyint
+ fsmallint = smallint
+ fbigint = bigint
+ ffloat = float
+ fdouble = double
+ fdecimal = "decimal(30, 8)"
+ fstring = string
+ fdate = date
+ ftime = time
+ ftimestamp = timestamp
+ ftimestamp_ltz = timestamp_tz
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 1940337748, 73, 17489,
7408919466156976747, 9.434991E37, 3.140411637757371E307,
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10",
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169,
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb",
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = DELETE
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_AFTER
+ fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856,
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd",
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+ }
+ ]
+ },
+ {
+ row.num = 7
+ schema {
+ table = "test2.table2"
+ fields {
+ fbytes = bytes
+ fboolean = boolean
+ fint = int
+ ftinyint = tinyint
+ fsmallint = smallint
+ fbigint = bigint
+ ffloat = float
+ fdouble = double
+ fdecimal = "decimal(30, 8)"
+ fstring = string
+ fdate = date
+ ftime = time
+ ftimestamp = timestamp
+ ftimestamp_ltz = timestamp_tz
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 1940337748, 73, 17489,
7408919466156976747, 9.434991E37, 3.140411637757371E307,
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10",
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169,
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb",
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = DELETE
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_AFTER
+ fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856,
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd",
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+ }
+ ]
+ },
+ {
+ row.num = 7
+ schema {
+ table = "test3.table3"
+ fields {
+ fbytes = bytes
+ fboolean = boolean
+ fint = int
+ ftinyint = tinyint
+ fsmallint = smallint
+ fbigint = bigint
+ ffloat = float
+ fdouble = double
+ fdecimal = "decimal(30, 8)"
+ fstring = string
+ fdate = date
+ ftime = time
+ ftimestamp = timestamp
+ ftimestamp_ltz = timestamp_tz
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 1940337748, 73, 17489,
7408919466156976747, 9.434991E37, 3.140411637757371E307,
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10",
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169,
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb",
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = DELETE
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_AFTER
+ fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856,
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd",
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+ }
+ ]
+ }
+ ]
+}
+}
+
+transform {
+}
+
+sink {
+ Fluss {
+ bootstrap.servers="fluss_coordinator_e2e:9123"
+ database = "fluss_db_${database_name}"
+ table = "fluss_tb_${table_name}"
+ }
+}
+```
+
+## 变更日志
+
+<ChangeLog />
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 4172161f5e..74b84a4758 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -150,6 +150,7 @@ seatunnel.sink.GraphQL = connector-graphql
seatunnel.sink.Aerospike = connector-aerospike
seatunnel.sink.SensorsData = connector-sensorsdata
seatunnel.sink.HugeGraph = connector-hugegraph
+seatunnel.sink.Fluss = connector-fluss
# For custom transforms, make sure to use the
seatunnel.transform.[PluginIdentifier]=[JarPerfix] naming convention. For
example:
# seatunnel.transform.Sql = seatunnel-transforms-v2
diff --git a/seatunnel-connectors-v2/connector-fluss/pom.xml
b/seatunnel-connectors-v2/connector-fluss/pom.xml
new file mode 100644
index 0000000000..8db032b6f9
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-fluss/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-v2</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-fluss</artifactId>
+ <name>SeaTunnel : Connectors V2 : Fluss</name>
+
+ <properties>
+ <fluss.client.version>0.7.0</fluss.client.version>
+ <connector.name>connector.fluss</connector.name>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba.fluss</groupId>
+ <artifactId>fluss-client</artifactId>
+ <version>${fluss.client.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/config/FlussBaseOptions.java
b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/config/FlussBaseOptions.java
new file mode 100644
index 0000000000..e582c760ec
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/config/FlussBaseOptions.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.fluss.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class FlussBaseOptions implements Serializable {
+ public static final String CONNECTOR_IDENTITY = "Fluss";
+ public static final Option<String> BOOTSTRAP_SERVERS =
+ Options.key("bootstrap.servers")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Fluss cluster address");
+ public static final Option<String> DATABASE =
+ Options.key("database")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The name of Fluss database");
+
+ public static final Option<String> TABLE =
+ Options.key("table")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The name of Fluss table");
+
+ public static final Option<Map<String, String>> CLIENT_CONFIG =
+ Options.key("client.config")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("The parameter of Fluss client add to
Connection ");
+}
diff --git
a/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/config/FlussSinkOptions.java
b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/config/FlussSinkOptions.java
new file mode 100644
index 0000000000..293770eba9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/config/FlussSinkOptions.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fluss.config;
+
+public class FlussSinkOptions extends FlussBaseOptions {}
diff --git
a/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSink.java
b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSink.java
new file mode 100644
index 0000000000..771d414c70
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSink.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fluss.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.fluss.config.FlussSinkOptions;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Optional;
+
+@Slf4j
+public class FlussSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+ implements SupportMultiTableSink {
+
+ private final ReadonlyConfig pluginConfig;
+ private final CatalogTable catalogTable;
+
+ public FlussSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
+ this.pluginConfig = pluginConfig;
+ this.catalogTable = catalogTable;
+ }
+
+ @Override
+ public FlussSinkWriter createWriter(SinkWriter.Context context) {
+ return new FlussSinkWriter(context, catalogTable, pluginConfig);
+ }
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.of(catalogTable);
+ }
+
+ @Override
+ public String getPluginName() {
+ return FlussSinkOptions.CONNECTOR_IDENTITY;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSinkFactory.java
b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSinkFactory.java
new file mode 100644
index 0000000000..13ee142468
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSinkFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fluss.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.fluss.config.FlussSinkOptions;
+
+import com.google.auto.service.AutoService;
+
+import static
org.apache.seatunnel.api.options.SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA;
+
+@AutoService(Factory.class)
+public class FlussSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return FlussSinkOptions.CONNECTOR_IDENTITY;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(FlussSinkOptions.BOOTSTRAP_SERVERS)
+ .optional(FlussSinkOptions.DATABASE)
+ .optional(FlussSinkOptions.TABLE)
+ .optional(FlussSinkOptions.CLIENT_CONFIG)
+ .optional(MULTI_TABLE_SINK_REPLICA)
+ .build();
+ }
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ return () -> new FlussSink(context.getOptions(),
context.getCatalogTable());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSinkWriter.java
b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSinkWriter.java
new file mode 100644
index 0000000000..91881d30e5
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSinkWriter.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fluss.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.fluss.config.FlussSinkOptions;
+
+import com.alibaba.fluss.client.Connection;
+import com.alibaba.fluss.client.ConnectionFactory;
+import com.alibaba.fluss.client.table.Table;
+import com.alibaba.fluss.client.table.writer.AppendWriter;
+import com.alibaba.fluss.client.table.writer.TableWriter;
+import com.alibaba.fluss.client.table.writer.UpsertWriter;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.util.Map;
+import java.util.Optional;
+
+@Slf4j
+public class FlussSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+ implements SupportMultiTableSinkWriter<Void> {
+
+ private Connection connection;
+ private TableWriter writer;
+ private Table table;
+ private String dbName;
+ private String tableName;
+ private final SeaTunnelRowType seaTunnelRowType;
+
+ public FlussSinkWriter(
+ SinkWriter.Context context, CatalogTable catalogTable,
ReadonlyConfig pluginConfig) {
+ seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
+ Configuration flussConfig = new Configuration();
+ flussConfig.setString(
+ FlussSinkOptions.BOOTSTRAP_SERVERS.key(),
+ pluginConfig.get(FlussSinkOptions.BOOTSTRAP_SERVERS));
+ Optional<Map<String, String>> clientConfig =
+ pluginConfig.getOptional(FlussSinkOptions.CLIENT_CONFIG);
+ if (clientConfig.isPresent()) {
+ clientConfig
+ .get()
+ .forEach(
+ (k, v) -> {
+ flussConfig.setString(k, v);
+ });
+ }
+ log.info("Connect to Fluss with config: {}", flussConfig);
+ connection = ConnectionFactory.createConnection(flussConfig);
+ log.info("Connect to Fluss success");
+ dbName =
+ pluginConfig
+ .getOptional(FlussSinkOptions.DATABASE)
+ .orElseGet(() ->
catalogTable.getTableId().getDatabaseName());
+ tableName =
+ pluginConfig
+ .getOptional(FlussSinkOptions.TABLE)
+ .orElseGet(() ->
catalogTable.getTableId().getTableName());
+ TablePath tablePath = TablePath.of(dbName, tableName);
+ table = connection.getTable(tablePath);
+ if (table.getTableInfo().hasPrimaryKey()) {
+ log.info("Table {} has primary key, use upsert writer", tableName);
+ writer = table.newUpsert().createWriter();
+ } else {
+ log.info("Table {} has no primary key, use append writer",
tableName);
+ writer = table.newAppend().createWriter();
+ }
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) {
+ RowKind rowKind = element.getRowKind();
+ GenericRow genericRow = new GenericRow(element.getFields().length);
+ for (int i = 0; i < element.getFields().length; i++) {
+ genericRow.setField(
+ i,
+ convert(
+ seaTunnelRowType.getFieldType(i),
+ seaTunnelRowType.getFieldName(i),
+ element.getField(i)));
+ }
+
+ if (writer instanceof UpsertWriter) {
+ UpsertWriter upsertWriter = (UpsertWriter) writer;
+ switch (rowKind) {
+ case INSERT:
+ case UPDATE_AFTER:
+ upsertWriter.upsert(genericRow);
+ break;
+ case DELETE:
+ upsertWriter.delete(genericRow);
+ break;
+ case UPDATE_BEFORE:
+ return;
+ default:
+ throw CommonError.unsupportedRowKind(
+ FlussSinkOptions.CONNECTOR_IDENTITY, tableName,
rowKind.shortString());
+ }
+ } else if (writer instanceof AppendWriter) {
+ AppendWriter appendWriter = (AppendWriter) writer;
+ switch (rowKind) {
+ case INSERT:
+ case UPDATE_AFTER:
+ appendWriter.append(genericRow);
+ break;
+ case DELETE:
+ case UPDATE_BEFORE:
+ return;
+ default:
+ throw CommonError.unsupportedRowKind(
+ FlussSinkOptions.CONNECTOR_IDENTITY, tableName,
rowKind.shortString());
+ }
+ } else {
+ throw CommonError.unsupportedOperation(
+ FlussSinkOptions.CONNECTOR_IDENTITY,
writer.getClass().getName());
+ }
+ }
+
+ @Override
+ public Optional<Void> prepareCommit(long checkpointId) throws IOException {
+ writer.flush();
+ return super.prepareCommit(checkpointId);
+ }
+
+ @Override
+ public void close() {
+ log.info("Close Fluss table.");
+ try {
+ if (table != null) {
+ table.close();
+ }
+ } catch (Exception e) {
+ throw CommonError.closeFailed("Close Fluss table failed.", e);
+ }
+
+ log.info("Close Fluss connection.");
+ try {
+ if (connection != null) {
+ connection.close();
+ }
+ } catch (Exception e) {
+ throw CommonError.closeFailed("Close Fluss connection failed.", e);
+ }
+ }
+
+ protected Object convert(SeaTunnelDataType dataType, String fieldName,
Object val) {
+ if (val == null) {
+ return null;
+ }
+ switch (dataType.getSqlType()) {
+ case BOOLEAN:
+ case TINYINT:
+ case SMALLINT:
+ case INT:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case BYTES:
+ return val;
+ case STRING:
+ return BinaryString.fromString((String) val);
+ case DECIMAL:
+ return Decimal.fromBigDecimal(
+ (BigDecimal) val,
+ ((DecimalType) dataType).getPrecision(),
+ ((DecimalType) dataType).getScale());
+ case DATE:
+ return (int) ((LocalDate) val).toEpochDay();
+ case TIME:
+ return (int) (((LocalTime) val).toNanoOfDay() / 1_000_000);
+ case TIMESTAMP:
+ return TimestampNtz.fromLocalDateTime((LocalDateTime) val);
+ case TIMESTAMP_TZ:
+ if (val instanceof Instant) {
+ return TimestampLtz.fromInstant((Instant) val);
+ } else if (val instanceof OffsetDateTime) {
+ return TimestampLtz.fromInstant(((OffsetDateTime)
val).toInstant());
+ }
+ throw CommonError.unsupportedDataType(
+ FlussSinkOptions.CONNECTOR_IDENTITY,
+ dataType.getSqlType().name(),
+ fieldName);
+ default:
+ throw CommonError.unsupportedDataType(
+ FlussSinkOptions.CONNECTOR_IDENTITY,
+ dataType.getSqlType().name(),
+ fieldName);
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index c587da3740..37e0e668a6 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -89,6 +89,7 @@
<module>connector-graphql</module>
<module>connector-aerospike</module>
<module>connector-sensorsdata</module>
+ <module>connector-fluss</module>
</modules>
<dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index be4ba48396..6093ac3fb3 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -657,6 +657,13 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-fluss</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<!-- jdbc driver -->
<dependency>
<groupId>com.aliyun.phoenix</groupId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/pom.xml
new file mode 100644
index 0000000000..4105b14d24
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/pom.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-fluss-e2e</artifactId>
+ <name>SeaTunnel : E2E : Connector V2 : Fluss</name>
+
+ <dependencies>
+ <!-- SeaTunnel connectors -->
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-fluss</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-fake</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-cdc-mysql</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-assert</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <!-- test dependencies on TestContainers -->
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mysql</artifactId>
+ <version>${testcontainer.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>postgresql</artifactId>
+ <version>${testcontainer.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>8</source>
+ <target>8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fluss/FlussSinkIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fluss/FlussSinkIT.java
new file mode 100644
index 0000000000..04230c362e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fluss/FlussSinkIT.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.fluss;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.alibaba.fluss.client.Connection;
+import com.alibaba.fluss.client.ConnectionFactory;
+import com.alibaba.fluss.client.admin.Admin;
+import com.alibaba.fluss.client.table.Table;
+import com.alibaba.fluss.client.table.scanner.ScanRecord;
+import com.alibaba.fluss.client.table.scanner.log.LogScanner;
+import com.alibaba.fluss.client.table.scanner.log.ScanRecords;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.metadata.DatabaseDescriptor;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.utils.CloseableIterator;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+@Slf4j
+public class FlussSinkIT extends TestSuiteBase implements TestResource {
+ private static final String DOCKER_IMAGE = "fluss/fluss:0.7.0";
+ private static final String DOCKER_ZK_IMAGE = "zookeeper:3.9.2";
+
+ private static final String FLUSS_Coordinator_HOST =
"fluss_coordinator_e2e";
+ private static final String FLUSS_Tablet_HOST = "fluss_tablet_e2e";
+ private static final String ZK_HOST = "zk_e2e";
+ private static final int ZK_PORT = 2181;
+ private static final int FLUSS_Coordinator_PORT = 9123;
+ private static final int FLUSS_Tablet_PORT = 9124;
+ private static final int FLUSS_Coordinator_LOCAL_PORT = 8123;
+ private static final int FLUSS_Tablet_LOCAL_PORT = 8124;
+
+ private GenericContainer<?> zookeeperServer;
+ private GenericContainer<?> coordinatorServer;
+ private GenericContainer<?> tabletServer;
+
+ private Connection flussConnection;
+
+ private static final String DB_NAME = "fluss_db_test";
+ private static final String DB_NAME_2 = "fluss_db_test2";
+ private static final String DB_NAME_3 = "fluss_db_test3";
+ private static final String TABLE_NAME = "fluss_tb_table1";
+ private static final String TABLE_NAME_2 = "fluss_tb_table2";
+ private static final String TABLE_NAME_3 = "fluss_tb_table3";
+
+ @BeforeAll
+ @Override
+ public void startUp() {
+ createZookeeperContainer();
+ createFlussContainer();
+ }
+
+ private void createFlussContainer() {
+ log.info("Starting FlussServer container...");
+ String coordinatorEnv =
+ String.format(
+ "zookeeper.address: %s:%d\n"
+ + "bind.listeners: INTERNAL://%s:%d,
LOCALCLIENT://%s:%d \n"
+ + "advertised.listeners: INTERNAL://%s:%d,
LOCALCLIENT://localhost:%d\n"
+ + "internal.listener.name: INTERNAL",
+ ZK_HOST,
+ ZK_PORT,
+ FLUSS_Coordinator_HOST,
+ FLUSS_Coordinator_PORT,
+ FLUSS_Coordinator_HOST,
+ FLUSS_Coordinator_LOCAL_PORT,
+ FLUSS_Coordinator_HOST,
+ FLUSS_Coordinator_PORT,
+ FLUSS_Coordinator_LOCAL_PORT);
+ coordinatorServer =
+ new GenericContainer<>(DOCKER_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(FLUSS_Coordinator_HOST)
+ .withEnv("FLUSS_PROPERTIES", coordinatorEnv)
+ .withCommand("coordinatorServer")
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+
DockerLoggerFactory.getLogger("coordinatorServer")));
+ coordinatorServer.setPortBindings(
+ Lists.newArrayList(
+ String.format(
+ "%s:%s",
+ FLUSS_Coordinator_LOCAL_PORT,
FLUSS_Coordinator_LOCAL_PORT)));
+ Startables.deepStart(Stream.of(coordinatorServer)).join();
+ given().ignoreExceptions()
+ .await()
+ .atMost(120, TimeUnit.SECONDS)
+ .pollInterval(5, TimeUnit.SECONDS)
+ .until(
+ () ->
+ checkPort(
+ coordinatorServer.getHost(),
+ FLUSS_Coordinator_LOCAL_PORT,
+ 1000));
+ log.info("coordinatorServer container start success");
+
+ String tabletEnv =
+ String.format(
+ "zookeeper.address: %s:%d\n"
+ + "bind.listeners: INTERNAL://%s:%d,
LOCALCLIENT://%s:%d\n"
+ + "advertised.listeners: INTERNAL://%s:%d,
LOCALCLIENT://localhost:%d\n"
+ + "internal.listener.name: INTERNAL\n"
+ + "tablet-server.id: 0\n"
+ + "kv.snapshot.interval: 0s\n"
+ + "data.dir: /tmp/fluss/data\n"
+ + "remote.data.dir: /tmp/fluss/remote-data",
+ ZK_HOST,
+ ZK_PORT,
+ FLUSS_Tablet_HOST,
+ FLUSS_Tablet_PORT,
+ FLUSS_Tablet_HOST,
+ FLUSS_Tablet_LOCAL_PORT,
+ FLUSS_Tablet_HOST,
+ FLUSS_Tablet_PORT,
+ FLUSS_Tablet_LOCAL_PORT);
+ tabletServer =
+ new GenericContainer<>(DOCKER_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(FLUSS_Tablet_HOST)
+ .withEnv("FLUSS_PROPERTIES", tabletEnv)
+ .withCommand("tabletServer")
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+
DockerLoggerFactory.getLogger("tabletServer")));
+ tabletServer.setPortBindings(
+ Lists.newArrayList(
+ String.format("%s:%s", FLUSS_Tablet_LOCAL_PORT,
FLUSS_Tablet_LOCAL_PORT)));
+ Startables.deepStart(Stream.of(tabletServer)).join();
+ given().ignoreExceptions()
+ .await()
+ .atMost(120, TimeUnit.SECONDS)
+ .pollInterval(5, TimeUnit.SECONDS)
+ .untilAsserted(this::initializeConnection);
+ log.info("tabletServer container start success");
+ log.info("FlussServer Containers are started");
+ }
+
+ private void createZookeeperContainer() {
+ log.info("Starting ZookeeperServer container...");
+ zookeeperServer =
+ new GenericContainer<>(DOCKER_ZK_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(ZK_HOST)
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+
DockerLoggerFactory.getLogger(DOCKER_ZK_IMAGE)));
+ zookeeperServer.setPortBindings(
+ Lists.newArrayList(String.format("%s:%s", ZK_PORT, ZK_PORT)));
+ Startables.deepStart(Stream.of(zookeeperServer)).join();
+ given().ignoreExceptions()
+ .await()
+ .atMost(60, TimeUnit.SECONDS)
+ .pollInterval(5, TimeUnit.SECONDS)
+ .until(() -> checkPort(zookeeperServer.getHost(), ZK_PORT,
1000));
+ log.info("ZookeeperServer Containers are started");
+ }
+
+ private void initializeConnection() throws ExecutionException,
InterruptedException {
+ Configuration flussConfig = new Configuration();
+ flussConfig.setString(
+ "bootstrap.servers",
+ coordinatorServer.getHost() + ":" +
FLUSS_Coordinator_LOCAL_PORT);
+ flussConnection = ConnectionFactory.createConnection(flussConfig);
+ createDb(flussConnection, DB_NAME);
+ }
+
+ public void createDb(Connection connection, String dbName)
+ throws ExecutionException, InterruptedException {
+ Admin admin = connection.getAdmin();
+ DatabaseDescriptor descriptor = DatabaseDescriptor.builder().build();
+ admin.dropDatabase(dbName, true, true).get();
+ admin.createDatabase(dbName, descriptor, true).get();
+ }
+
+ public Schema getFlussSchema() {
+ return Schema.newBuilder()
+ .column("fbytes", DataTypes.BYTES())
+ .column("fboolean", DataTypes.BOOLEAN())
+ .column("fint", DataTypes.INT())
+ .column("ftinyint", DataTypes.TINYINT())
+ .column("fsmallint", DataTypes.SMALLINT())
+ .column("fbigint", DataTypes.BIGINT())
+ .column("ffloat", DataTypes.FLOAT())
+ .column("fdouble", DataTypes.DOUBLE())
+ .column("fdecimal", DataTypes.DECIMAL(30, 8))
+ .column("fstring", DataTypes.STRING())
+ .column("fdate", DataTypes.DATE())
+ .column("ftime", DataTypes.TIME())
+ .column("ftimestamp", DataTypes.TIMESTAMP())
+ .column("ftimestamp_ltz", DataTypes.TIMESTAMP_LTZ())
+ .primaryKey("fstring")
+ .build();
+ }
+
+ public void createTable(Connection connection, String dbName, String
tableName, Schema schema)
+ throws ExecutionException, InterruptedException {
+ Admin admin = connection.getAdmin();
+ TableDescriptor tableDescriptor =
TableDescriptor.builder().schema(schema).build();
+ TablePath tablePath = TablePath.of(dbName, tableName);
+ admin.dropTable(tablePath, true).get();
+ admin.createTable(tablePath, tableDescriptor, true).get(); // blocking
call
+ }
+
+ public static boolean checkPort(String host, int port, int timeoutMs)
throws IOException {
+ try (Socket socket = new Socket()) {
+ socket.connect(new java.net.InetSocketAddress(host, port),
timeoutMs);
+ return true;
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ if (tabletServer != null) {
+ tabletServer.close();
+ }
+ if (coordinatorServer != null) {
+ coordinatorServer.close();
+ }
+ if (zookeeperServer != null) {
+ zookeeperServer.close();
+ }
+ }
+
+ @TestTemplate
+ public void testFlussSink(TestContainer container) throws Exception {
+ log.info(" create fluss table");
+ createDb(flussConnection, DB_NAME);
+ createTable(flussConnection, DB_NAME, TABLE_NAME, getFlussSchema());
+ Container.ExecResult execFake2fluss =
container.executeJob("/fake_to_fluss.conf");
+ Assertions.assertEquals(0, execFake2fluss.getExitCode(),
execFake2fluss.getStderr());
+ checkFlussData(DB_NAME, TABLE_NAME);
+ }
+
+ @TestTemplate
+ public void testFlussMultiTableSink(TestContainer container) throws
Exception {
+ log.info(" create fluss tables");
+ createDb(flussConnection, DB_NAME_2);
+ createDb(flussConnection, DB_NAME_3);
+ createTable(flussConnection, DB_NAME_2, TABLE_NAME, getFlussSchema());
+ createTable(flussConnection, DB_NAME_2, TABLE_NAME_2,
getFlussSchema());
+ createTable(flussConnection, DB_NAME_3, TABLE_NAME_3,
getFlussSchema());
+
+ Container.ExecResult execFake2fluss =
+ container.executeJob("/fake_to_multipletable_fluss.conf");
+ Assertions.assertEquals(0, execFake2fluss.getExitCode(),
execFake2fluss.getStderr());
+ checkFlussData(DB_NAME_2, TABLE_NAME);
+ checkFlussData(DB_NAME_2, TABLE_NAME_2);
+ checkFlussData(DB_NAME_3, TABLE_NAME_3);
+ }
+
+ public void checkFlussData(String dbName, String tableName) throws
IOException {
+ // check log data
+ List<GenericRow> streamData =
+ getFlussTableStreamData(flussConnection, dbName, tableName,
10);
+ checkFlussTableStreamData(streamData);
+ // check data
+ List<GenericRow> data = getFlussTableData(flussConnection, dbName,
tableName, 10);
+ checkFlussTableData(data);
+ }
+
+ public void checkFlussTableData(List<GenericRow> streamData) {
+ Assertions.assertEquals(3, streamData.size());
+ List<String> expectedResult =
+ Arrays.asList(
+ "([109, 105, 73, 90,
106],true,1940337748,73,17489,7408919466156976747,9.434991E37,3.140411637757371E307,4029933791018936000000.00000000,aaaaa,20091,9010000,2025-05-27T21:56:09,2025-09-27T18:54:08Z)",
+ "([109, 105, 73, 90,
106],true,90650390,37,22504,5851888708829345169,2.6221706E36,1.8915341983748786E307,3093109630614623000000.00000000,bbbbb,20089,76964000,2025-05-08T05:26:18,2025-08-04T08:49:45Z)",
+ "([109, 105, 73, 90,
106],true,388742243,89,15831,159071788675312856,7.310445E37,1.2166972324288247E308,7994947075691901000000.00000000,ddddd,20092,55687000,2025-07-18T08:59:49,2025-09-12T15:46:25Z)");
+ ArrayList<String> result = new ArrayList<>();
+ for (GenericRow streamDatum : streamData) {
+ result.add(streamDatum.toString());
+ }
+ Assertions.assertEquals(expectedResult, result);
+ }
+
+ public void checkFlussTableStreamData(List<GenericRow> streamData) {
+ Assertions.assertEquals(7, streamData.size());
+ List<String> expectedResult =
+ Arrays.asList(
+ "([109, 105, 73, 90,
106],true,1940337748,73,17489,7408919466156976747,9.434991E37,3.140411637757371E307,4029933791018936000000.00000000,aaaaa,20091,9010000,2025-05-27T21:56:09,2025-09-27T18:54:08Z)",
+ "([109, 105, 73, 90,
106],true,90650390,37,22504,5851888708829345169,2.6221706E36,1.8915341983748786E307,3093109630614623000000.00000000,bbbbb,20089,76964000,2025-05-08T05:26:18,2025-08-04T08:49:45Z)",
+ "([109, 105, 73, 90,
106],true,2146418323,79,19821,6393905306944584839,2.0462337E38,1.4868114385836557E308,5594947262031770000000.00000000,ccccc,20367,79840000,2025-03-25T01:49:14,2025-07-03T03:52:06Z)",
+ "([109, 105, 73, 90,
106],true,2146418323,79,19821,6393905306944584839,2.0462337E38,1.4868114385836557E308,5594947262031770000000.00000000,ccccc,20367,79840000,2025-03-25T01:49:14,2025-07-03T03:52:06Z)",
+ "([109, 105, 73, 90,
106],true,82794384,27,30339,5826566947079347516,2.2137477E37,1.7737681870839753E308,3984670873242882300000.00000000,ddddd,20344,37972000,2025-01-27T19:20:51,2025-11-06T18:38:54Z)",
+ "([109, 105, 73, 90,
106],true,82794384,27,30339,5826566947079347516,2.2137477E37,1.7737681870839753E308,3984670873242882300000.00000000,ddddd,20344,37972000,2025-01-27T19:20:51,2025-11-06T18:38:54Z)",
+ "([109, 105, 73, 90,
106],true,388742243,89,15831,159071788675312856,7.310445E37,1.2166972324288247E308,7994947075691901000000.00000000,ddddd,20092,55687000,2025-07-18T08:59:49,2025-09-12T15:46:25Z)");
+ ArrayList<String> result = new ArrayList<>();
+ for (GenericRow streamDatum : streamData) {
+ result.add(streamDatum.toString());
+ }
+ Assertions.assertEquals(expectedResult, result);
+ }
+
+ public List<GenericRow> getFlussTableStreamData(
+ Connection connection, String dbName, String tableName, int
scanNum) {
+ TablePath tablePath = TablePath.of(dbName, tableName);
+ Table table = connection.getTable(tablePath);
+ LogScanner logScanner = table.newScan().createLogScanner();
+ int numBuckets = table.getTableInfo().getNumBuckets();
+ for (int i = 0; i < numBuckets; i++) {
+ logScanner.subscribeFromBeginning(i);
+ }
+ int scanned = 0;
+ List<GenericRow> rows = new ArrayList<>();
+
+ while (true) {
+ if (scanned > scanNum) break;
+ log.info("Polling for stream records...");
+ ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
+ for (TableBucket bucket : scanRecords.buckets()) {
+ for (ScanRecord record : scanRecords.records(bucket)) {
+ GenericRow row = (GenericRow) record.getRow();
+ rows.add(row);
+ }
+ }
+ scanned++;
+ }
+ return rows;
+ }
+
+ public List<GenericRow> getFlussTableData(
+ Connection connection, String dbName, String tableName, int
scanNum)
+ throws IOException {
+ TablePath tablePath = TablePath.of(dbName, tableName);
+ Table table = connection.getTable(tablePath);
+ LogScanner logScanner = table.newScan().createLogScanner();
+ int numBuckets = table.getTableInfo().getNumBuckets();
+ for (int i = 0; i < numBuckets; i++) {
+ logScanner.subscribeFromBeginning(i);
+ }
+ int scanned = 0;
+ List<GenericRow> rows = new ArrayList<>();
+
+ while (true) {
+ if (scanned > scanNum) break;
+ log.info("Polling for records...");
+ ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
+ for (TableBucket bucket : scanRecords.buckets()) {
+ CloseableIterator<InternalRow> data =
+ table.newScan()
+ .limit(10)
+ .createBatchScanner(bucket)
+ .pollBatch(Duration.ofSeconds(5));
+ while (data.hasNext()) {
+ rows.add((GenericRow) data.next());
+ }
+ }
+ scanned++;
+ }
+ return rows;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/resources/fake_to_fluss.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/resources/fake_to_fluss.conf
new file mode 100644
index 0000000000..27ea0435bb
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/resources/fake_to_fluss.conf
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ parallelism = 1
+ tables_configs = [
+ {
+ row.num = 7
+ schema {
+ table = "test.table1"
+ fields {
+ fbytes = bytes
+ fboolean = boolean
+ fint = int
+ ftinyint = tinyint
+ fsmallint = smallint
+ fbigint = bigint
+ ffloat = float
+ fdouble = double
+ fdecimal = "decimal(30, 8)"
+ fstring = string
+ fdate = date
+ ftime = time
+ ftimestamp = timestamp
+ ftimestamp_ltz = timestamp_tz
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 1940337748, 73, 17489,
7408919466156976747, 9.434991E37, 3.140411637757371E307,
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10",
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169,
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb",
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = DELETE
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_AFTER
+ fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856,
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd",
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+ }
+ ]
+ }
+ ]
+}
+}
+
+transform {
+}
+
+sink {
+ Fluss {
+ bootstrap.servers="fluss_coordinator_e2e:9123"
+ database = "fluss_db_${database_name}"
+ table = "fluss_tb_${table_name}"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/resources/fake_to_multipletable_fluss.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/resources/fake_to_multipletable_fluss.conf
new file mode 100644
index 0000000000..b1b1307652
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/resources/fake_to_multipletable_fluss.conf
@@ -0,0 +1,200 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ parallelism = 1
+ tables_configs = [
+ {
+ row.num = 7
+ schema {
+ table = "test2.table1"
+ fields {
+ fbytes = bytes
+ fboolean = boolean
+ fint = int
+ ftinyint = tinyint
+ fsmallint = smallint
+ fbigint = bigint
+ ffloat = float
+ fdouble = double
+ fdecimal = "decimal(30, 8)"
+ fstring = string
+ fdate = date
+ ftime = time
+ ftimestamp = timestamp
+ ftimestamp_ltz = timestamp_tz
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 1940337748, 73, 17489,
7408919466156976747, 9.434991E37, 3.140411637757371E307,
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10",
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169,
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb",
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = DELETE
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_AFTER
+ fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856,
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd",
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+ }
+ ]
+ },
+ {
+ row.num = 7
+ schema {
+ table = "test2.table2"
+ fields {
+ fbytes = bytes
+ fboolean = boolean
+ fint = int
+ ftinyint = tinyint
+ fsmallint = smallint
+ fbigint = bigint
+ ffloat = float
+ fdouble = double
+ fdecimal = "decimal(30, 8)"
+ fstring = string
+ fdate = date
+ ftime = time
+ ftimestamp = timestamp
+ ftimestamp_ltz = timestamp_tz
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 1940337748, 73, 17489,
7408919466156976747, 9.434991E37, 3.140411637757371E307,
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10",
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169,
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb",
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = DELETE
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_AFTER
+ fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856,
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd",
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+ }
+ ]
+ },
+ {
+ row.num = 7
+ schema {
+ table = "test3.table3"
+ fields {
+ fbytes = bytes
+ fboolean = boolean
+ fint = int
+ ftinyint = tinyint
+ fsmallint = smallint
+ fbigint = bigint
+ ffloat = float
+ fdouble = double
+ fdecimal = "decimal(30, 8)"
+ fstring = string
+ fdate = date
+ ftime = time
+ ftimestamp = timestamp
+ ftimestamp_ltz = timestamp_tz
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 1940337748, 73, 17489,
7408919466156976747, 9.434991E37, 3.140411637757371E307,
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10",
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169,
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb",
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = DELETE
+ fields = ["bWlJWmo=", true, 2146418323, 79, 19821,
6393905306944584839, 2.0462337E38, 1.4868114385836557E308,
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40",
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+ }
+ {
+ kind = INSERT
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516,
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd",
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+ }
+ {
+ kind = UPDATE_AFTER
+ fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856,
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd",
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+ }
+ ]
+ }
+ ]
+}
+}
+
+transform {
+}
+
+sink {
+ Fluss {
+ bootstrap.servers="fluss_coordinator_e2e:9123"
+ database = "fluss_db_${database_name}"
+ table = "fluss_tb_${table_name}"
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 8d885e75a5..a60fe67ef2 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -90,6 +90,7 @@
<module>connector-aerospike-e2e</module>
<module>connector-sensorsdata-e2e</module>
<module>connector-hugegraph-e2e</module>
+ <module>connector-fluss-e2e</module>
</modules>
<dependencies>
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
index 949392d172..a452db6436 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
@@ -41,6 +41,7 @@ import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
@@ -155,6 +156,13 @@ public class JsonToRowConverters implements Serializable {
return convertToLocalDateTime(jsonNode, fieldName);
}
};
+ case TIMESTAMP_TZ:
+ return new JsonToObjectConverter() {
+ @Override
+ public Object convert(JsonNode jsonNode, String fieldName)
{
+ return convertToOffsetDateTime(jsonNode, fieldName);
+ }
+ };
case FLOAT:
return new JsonToObjectConverter() {
@Override
@@ -284,6 +292,11 @@ public class JsonToRowConverters implements Serializable {
return LocalDateTime.of(localDate, localTime);
}
+ private OffsetDateTime convertToOffsetDateTime(JsonNode jsonNode, String
fieldName) {
+ String datetimeStr = jsonNode.asText();
+ return OffsetDateTime.parse(datetimeStr);
+ }
+
private String convertToString(JsonNode jsonNode) {
if (jsonNode.isContainerNode()) {
return jsonNode.toString();
diff --git
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
index 2cf8ae092e..13a30442d1 100644
---
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
+++
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
@@ -37,6 +37,7 @@ import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
@@ -44,6 +45,7 @@ import java.util.function.IntFunction;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE_TIME;
+import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
public class RowToJsonConverters implements Serializable {
@@ -183,6 +185,14 @@ public class RowToJsonConverters implements Serializable {
.textNode(ISO_LOCAL_DATE_TIME.format((LocalDateTime) value));
}
};
+ case TIMESTAMP_TZ:
+ return new RowToJsonConverter() {
+ @Override
+ public JsonNode convert(ObjectMapper mapper, JsonNode
reuse, Object value) {
+ return mapper.getNodeFactory()
+
.textNode(ISO_OFFSET_DATE_TIME.format((OffsetDateTime) value));
+ }
+ };
case ARRAY:
return createArrayConverter((ArrayType) type);
case MAP:
diff --git
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
index 8471756b8b..13d8b5c820 100644
---
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
+++
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
@@ -47,6 +47,7 @@ import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalQueries;
import java.util.HashMap;
@@ -76,6 +77,7 @@ public class JsonRowDataSerDeSchemaTest {
String name = "asdlkjasjkdla998y1122";
LocalDate date = LocalDate.parse("1990-10-14");
LocalTime time = LocalTime.parse("12:12:43");
+ OffsetDateTime offsetDateTime =
OffsetDateTime.parse("2025-09-12T23:46:25+08:00");
Timestamp timestamp3 = Timestamp.valueOf("1990-10-14 12:12:43.123");
Timestamp timestamp9 = Timestamp.valueOf("1990-10-14
12:12:43.123456789");
Map<String, Long> map = new HashMap<>();
@@ -100,6 +102,7 @@ public class JsonRowDataSerDeSchemaTest {
root.put("name", name);
root.put("date", "1990-10-14");
root.put("time", "12:12:43");
+ root.put("timestamp_tz", "2025-09-12T23:46:25+08:00");
root.put("timestamp3", "1990-10-14T12:12:43.123");
root.put("timestamp9", "1990-10-14T12:12:43.123456789");
root.putObject("map").put("element", 123);
@@ -121,6 +124,7 @@ public class JsonRowDataSerDeSchemaTest {
"name",
"date",
"time",
+ "timestamp_tz",
"timestamp3",
"timestamp9",
"map",
@@ -136,6 +140,7 @@ public class JsonRowDataSerDeSchemaTest {
STRING_TYPE,
LocalTimeType.LOCAL_DATE_TYPE,
LocalTimeType.LOCAL_TIME_TYPE,
+ LocalTimeType.OFFSET_DATE_TIME_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
new MapType(STRING_TYPE, LONG_TYPE),
@@ -150,6 +155,7 @@ public class JsonRowDataSerDeSchemaTest {
"name",
"date",
"time",
+ "timestamp_tz",
"timestamp3",
"timestamp9",
"map",
@@ -164,6 +170,7 @@ public class JsonRowDataSerDeSchemaTest {
STRING_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
LocalTimeType.LOCAL_TIME_TYPE,
+ LocalTimeType.OFFSET_DATE_TIME_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
new MapType(STRING_TYPE, LONG_TYPE),
@@ -175,7 +182,7 @@ public class JsonRowDataSerDeSchemaTest {
JsonDeserializationSchema deserializationSchema =
new JsonDeserializationSchema(catalogTables, false, false);
- SeaTunnelRow expected = new SeaTunnelRow(13);
+ SeaTunnelRow expected = new SeaTunnelRow(14);
expected.setField(0, true);
expected.setField(1, intValue);
expected.setField(2, longValue);
@@ -183,13 +190,14 @@ public class JsonRowDataSerDeSchemaTest {
expected.setField(4, name);
expected.setField(5, date);
expected.setField(6, time);
- expected.setField(7, timestamp3.toLocalDateTime());
- expected.setField(8, timestamp9.toLocalDateTime());
- expected.setField(9, map);
- expected.setField(10, multiSet);
- expected.setField(11, nestedMap);
-
- SeaTunnelRow rowFieldRow = new SeaTunnelRow(12);
+ expected.setField(7, offsetDateTime);
+ expected.setField(8, timestamp3.toLocalDateTime());
+ expected.setField(9, timestamp9.toLocalDateTime());
+ expected.setField(10, map);
+ expected.setField(11, multiSet);
+ expected.setField(12, nestedMap);
+
+ SeaTunnelRow rowFieldRow = new SeaTunnelRow(13);
rowFieldRow.setField(0, true);
rowFieldRow.setField(1, intValue);
rowFieldRow.setField(2, longValue);
@@ -197,13 +205,14 @@ public class JsonRowDataSerDeSchemaTest {
rowFieldRow.setField(4, name);
rowFieldRow.setField(5, timestamp3.toLocalDateTime());
rowFieldRow.setField(6, time);
- rowFieldRow.setField(7, timestamp3.toLocalDateTime());
- rowFieldRow.setField(8, timestamp9.toLocalDateTime());
- rowFieldRow.setField(9, map);
- rowFieldRow.setField(10, multiSet);
- rowFieldRow.setField(11, nestedMap);
+ rowFieldRow.setField(7, offsetDateTime);
+ rowFieldRow.setField(8, timestamp3.toLocalDateTime());
+ rowFieldRow.setField(9, timestamp9.toLocalDateTime());
+ rowFieldRow.setField(10, map);
+ rowFieldRow.setField(11, multiSet);
+ rowFieldRow.setField(12, nestedMap);
- expected.setField(12, rowFieldRow);
+ expected.setField(13, rowFieldRow);
SeaTunnelRow seaTunnelRow =
deserializationSchema.deserialize(serializedJson);
assertEquals(expected, seaTunnelRow);
@@ -678,6 +687,16 @@ public class JsonRowDataSerDeSchemaTest {
assertEquals(
"{\"timestamp\":\"2022-09-24T22:45:00.000123456\"}",
new String(new JsonSerializationSchema(schema,
"\\N").serialize(row)));
+
+ schema =
+ new SeaTunnelRowType(
+ new String[] {"timestamp_tz"},
+ new SeaTunnelDataType[]
{LocalTimeType.OFFSET_DATE_TIME_TYPE});
+ OffsetDateTime offsetDateTime =
OffsetDateTime.parse("2025-09-12T23:46:25+08:00");
+ row = new SeaTunnelRow(new Object[] {offsetDateTime});
+ assertEquals(
+ "{\"timestamp_tz\":\"2025-09-12T23:46:25+08:00\"}",
+ new String(new JsonSerializationSchema(schema,
"\\N").serialize(row)));
}
@Test