This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new dc271dcfb4 [Feature][Connector-V2] [Hudi]Add hudi sink connector
(#4405)
dc271dcfb4 is described below
commit dc271dcfb4a9438c0fca15321a451dab37bf6018
Author: Guangdong Liu <[email protected]>
AuthorDate: Tue Jul 9 16:00:05 2024 +0800
[Feature][Connector-V2] [Hudi]Add hudi sink connector (#4405)
---
docs/en/Connector-v2-release-state.md | 1 -
docs/en/connector-v2/sink/Hudi.md | 98 ++++++
docs/en/connector-v2/source/Hudi.md | 90 ------
docs/zh/Connector-v2-release-state.md | 1 -
docs/zh/connector-v2/sink/Hudi.md | 92 ++++++
plugin-mapping.properties | 2 +-
seatunnel-connectors-v2/connector-hudi/pom.xml | 99 +++---
.../seatunnel/hudi/config/HudiOptions.java | 97 ++++++
.../seatunnel/hudi/config/HudiSinkConfig.java | 82 +++++
.../seatunnel/hudi/config/HudiSourceConfig.java | 60 ----
.../connectors/seatunnel/hudi/sink/HudiSink.java | 94 ++++++
.../seatunnel/hudi/sink/HudiSinkFactory.java | 73 +++++
.../committer/HudiSinkAggregatedCommitter.java | 132 ++++++++
.../hudi/sink/writer/AvroSchemaConverter.java | 168 ++++++++++
.../seatunnel/hudi/sink/writer/HudiSinkWriter.java | 340 +++++++++++++++++++++
.../hudi/sink/writer/RowDataToAvroConverters.java | 294 ++++++++++++++++++
.../seatunnel/hudi/source/HudiSource.java | 164 ----------
.../seatunnel/hudi/source/HudiSourceFactory.java | 56 ----
.../seatunnel/hudi/source/HudiSourceReader.java | 141 ---------
.../hudi/source/HudiSourceSplitEnumerator.java | 144 ---------
.../hudi/state/HudiAggregatedCommitInfo.java} | 18 +-
.../HudiCommitInfo.java} | 22 +-
.../HudiSinkState.java} | 31 +-
.../connectors/seatunnel/hudi/HudiTest.java | 266 ++++++++++++++++
.../connector-hudi-e2e/pom.xml | 36 +++
.../seatunnel/e2e/connector/hudi/HudiIT.java | 129 ++++++++
.../src/test/resources/fake_to_hudi.conf | 52 ++++
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
28 files changed, 2021 insertions(+), 762 deletions(-)
diff --git a/docs/en/Connector-v2-release-state.md
b/docs/en/Connector-v2-release-state.md
index 308cb010b4..8705de7c76 100644
--- a/docs/en/Connector-v2-release-state.md
+++ b/docs/en/Connector-v2-release-state.md
@@ -38,7 +38,6 @@ SeaTunnel uses a grading system for connectors to help you
understand what to ex
| [Hive](connector-v2/source/Hive.md) | Source | GA
| 2.2.0-beta |
| [Http](connector-v2/sink/Http.md) | Sink | Beta
| 2.2.0-beta |
| [Http](connector-v2/source/Http.md) | Source | Beta
| 2.2.0-beta |
-| [Hudi](connector-v2/source/Hudi.md) | Source | Beta
| 2.2.0-beta |
| [Iceberg](connector-v2/source/Iceberg.md) | Source | Beta
| 2.2.0-beta |
| [InfluxDB](connector-v2/sink/InfluxDB.md) | Sink | Beta
| 2.3.0 |
| [InfluxDB](connector-v2/source/InfluxDB.md) | Source | Beta
| 2.3.0-beta |
diff --git a/docs/en/connector-v2/sink/Hudi.md
b/docs/en/connector-v2/sink/Hudi.md
new file mode 100644
index 0000000000..51c588e18f
--- /dev/null
+++ b/docs/en/connector-v2/sink/Hudi.md
@@ -0,0 +1,98 @@
+# Hudi
+
+> Hudi sink connector
+
+## Description
+
+Used to write data to Hudi.
+
+## Key features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+|----------------------------|--------|----------|---------------|
+| table_name | string | yes | - |
+| table_dfs_path | string | yes | - |
+| conf_files_path | string | no | - |
+| record_key_fields | string | no | - |
+| partition_fields | string | no | - |
+| table_type | enum | no | copy_on_write |
+| op_type | enum | no | insert |
+| batch_interval_ms | Int | no | 1000 |
+| insert_shuffle_parallelism | Int | no | 2 |
+| upsert_shuffle_parallelism | Int | no | 2 |
+| min_commits_to_keep | Int | no | 20 |
+| max_commits_to_keep | Int | no | 30 |
+| common-options | config | no | - |
+
+### table_name [string]
+
+`table_name` The name of hudi table.
+
+### table_dfs_path [string]
+
+`table_dfs_path` The dfs root path of hudi table,such as
'hdfs://nameserivce/data/hudi/hudi_table/'.
+
+### table_type [enum]
+
+`table_type` The type of hudi table. The value is 'copy_on_write' or
'merge_on_read'.
+
+### conf_files_path [string]
+
+`conf_files_path` The environment conf file path list(local path), which used
to init hdfs client to read hudi table file. The example is
'/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml'.
+
+### op_type [enum]
+
+`op_type` The operation type of hudi table. The value is 'insert' or 'upsert'
or 'bulk_insert'.
+
+### batch_interval_ms [Int]
+
+`batch_interval_ms` The interval time of batch write to hudi table.
+
+### insert_shuffle_parallelism [Int]
+
+`insert_shuffle_parallelism` The parallelism of insert data to hudi table.
+
+### upsert_shuffle_parallelism [Int]
+
+`upsert_shuffle_parallelism` The parallelism of upsert data to hudi table.
+
+### min_commits_to_keep [Int]
+
+`min_commits_to_keep` The min commits to keep of hudi table.
+
+### max_commits_to_keep [Int]
+
+`max_commits_to_keep` The max commits to keep of hudi table.
+
+### common options
+
+Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details.
+
+## Examples
+
+```hocon
+source {
+
+ Hudi {
+ table_dfs_path = "hdfs://nameserivce/data/hudi/hudi_table/"
+ table_type = "copy_on_write"
+ conf_files_path =
"/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml"
+ use.kerberos = true
+ kerberos.principal = "test_user@xxx"
+ kerberos.principal.file = "/home/test/test_user.keytab"
+ }
+
+}
+```
+
+## Changelog
+
+### 2.2.0-beta 2022-09-26
+
+- Add Hudi Source Connector
+
diff --git a/docs/en/connector-v2/source/Hudi.md
b/docs/en/connector-v2/source/Hudi.md
deleted file mode 100644
index 353142a8e4..0000000000
--- a/docs/en/connector-v2/source/Hudi.md
+++ /dev/null
@@ -1,90 +0,0 @@
-# Hudi
-
-> Hudi source connector
-
-## Support Those Engines
-
-> Spark<br/>
-> Flink<br/>
-> SeaTunnel Zeta<br/>
-
-## Key Features
-
-- [x] [batch](../../concept/connector-v2-features.md)
-- [ ] [stream](../../concept/connector-v2-features.md)
-- [x] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [column projection](../../concept/connector-v2-features.md)
-- [x] [parallelism](../../concept/connector-v2-features.md)
-- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-
-## Description
-
-Used to read data from Hudi. Currently, only supports hudi cow table and
Snapshot Query with Batch Mode.
-
-In order to use this connector, You must ensure your spark/flink cluster
already integrated hive. The tested hive version is 2.3.9.
-
-## Supported DataSource Info
-
-:::tip
-
-* Currently, only supports Hudi cow table and Snapshot Query with Batch Mode
-
-:::
-
-## Data Type Mapping
-
-| Hudi Data Type | Seatunnel Data Type |
-|----------------|---------------------|
-| ALL TYPE | STRING |
-
-## Source Options
-
-| Name | Type | Required | Default |
Description
|
-|-------------------------|--------|------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| table.path | String | Yes | - |
The hdfs root path of hudi table,such as
'hdfs://nameserivce/data/hudi/hudi_table/'.
|
-| table.type | String | Yes | - |
The type of hudi table. Now we only support 'cow', 'mor' is not support yet.
|
-| conf.files | String | Yes | - |
The environment conf file path list(local path), which used to init hdfs client
to read hudi table file. The example is
'/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml'. |
-| use.kerberos | bool | No | false |
Whether to enable Kerberos, default is false.
|
-| kerberos.principal | String | yes when use.kerberos = true | - |
When use kerberos, we should set kerberos principal such as 'test_user@xxx'.
|
-| kerberos.principal.file | string | yes when use.kerberos = true | - |
When use kerberos, we should set kerberos principal file such as
'/home/test/test_user.keytab'.
|
-| common-options | config | No | - |
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details.
|
-
-## Task Example
-
-### Simple:
-
-> This example reads from a Hudi COW table and configures Kerberos for the
environment, printing to the console.
-
-```hocon
-# Defining the runtime environment
-env {
- parallelism = 2
- job.mode = "BATCH"
-}
-source{
- Hudi {
- table.path = "hdfs://nameserivce/data/hudi/hudi_table/"
- table.type = "cow"
- conf.files =
"/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml"
- use.kerberos = true
- kerberos.principal = "test_user@xxx"
- kerberos.principal.file = "/home/test/test_user.keytab"
- }
-}
-
-transform {
- # If you would like to get more information about how to configure
seatunnel and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/transform-v2/sql/
-}
-
-sink {
- Console {}
-}
-```
-
-## Changelog
-
-### 2.2.0-beta 2022-09-26
-
-- Add Hudi Source Connector
-
diff --git a/docs/zh/Connector-v2-release-state.md
b/docs/zh/Connector-v2-release-state.md
index 46df6f2284..779394b703 100644
--- a/docs/zh/Connector-v2-release-state.md
+++ b/docs/zh/Connector-v2-release-state.md
@@ -38,7 +38,6 @@ SeaTunnel 使用连接器分级系统来帮助您了解连接器的期望:
| [Hive](../en/connector-v2/source/Hive.md) | Source |
GA | 2.2.0-beta |
| [Http](connector-v2/sink/Http.md) | Sink |
Beta | 2.2.0-beta |
| [Http](../en/connector-v2/source/Http.md) | Source |
Beta | 2.2.0-beta |
-| [Hudi](../en/connector-v2/source/Hudi.md) | Source |
Beta | 2.2.0-beta |
| [Iceberg](../en/connector-v2/source/Iceberg.md) | Source |
Beta | 2.2.0-beta |
| [InfluxDB](../en/connector-v2/sink/InfluxDB.md) | Sink |
Beta | 2.3.0 |
| [InfluxDB](../en/connector-v2/source/InfluxDB.md) | Source |
Beta | 2.3.0-beta |
diff --git a/docs/zh/connector-v2/sink/Hudi.md
b/docs/zh/connector-v2/sink/Hudi.md
new file mode 100644
index 0000000000..ab1fc43603
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Hudi.md
@@ -0,0 +1,92 @@
+# Hudi
+
+> Hudi 接收器连接器
+
+## 描述
+
+用于将数据写入 Hudi。
+
+## 主要特点
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
+
+## 选项
+
+| 名称 | 类型 | 是否必需 | 默认值 |
+|----------------------------|--------|------|---------------|
+| table_name | string | 是 | - |
+| table_dfs_path | string | 是 | - |
+| conf_files_path | string | 否 | - |
+| record_key_fields | string | 否 | - |
+| partition_fields | string | 否 | - |
+| table_type | enum | 否 | copy_on_write |
+| op_type | enum | 否 | insert |
+| batch_interval_ms | Int | 否 | 1000 |
+| insert_shuffle_parallelism | Int | 否 | 2 |
+| upsert_shuffle_parallelism | Int | 否 | 2 |
+| min_commits_to_keep | Int | 否 | 20 |
+| max_commits_to_keep | Int | 否 | 30 |
+| common-options | config | 否 | - |
+
+### table_name [string]
+
+`table_name` Hudi 表的名称。
+
+### table_dfs_path [string]
+
+`table_dfs_path` Hudi 表的 DFS 根路径,例如 "hdfs://nameservice/data/hudi/hudi_table/"。
+
+### table_type [enum]
+
+`table_type` Hudi 表的类型。
+
+### conf_files_path [string]
+
+`conf_files_path` 环境配置文件路径列表(本地路径),用于初始化 HDFS 客户端以读取 Hudi
表文件。示例:"/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml"。
+
+### op_type [enum]
+
+`op_type` Hudi 表的操作类型。值可以是 'insert'、'upsert' 或 'bulk_insert'。
+
+### batch_interval_ms [Int]
+
+`batch_interval_ms` 批量写入 Hudi 表的时间间隔。
+
+### insert_shuffle_parallelism [Int]
+
+`insert_shuffle_parallelism` 插入数据到 Hudi 表的并行度。
+
+### upsert_shuffle_parallelism [Int]
+
+`upsert_shuffle_parallelism` 更新插入数据到 Hudi 表的并行度。
+
+### min_commits_to_keep [Int]
+
+`min_commits_to_keep` Hudi 表保留的最少提交数。
+
+### max_commits_to_keep [Int]
+
+`max_commits_to_keep` Hudi 表保留的最多提交数。
+
+### 通用选项
+
+数据源插件的通用参数,请参考 [Source Common Options](common-options.md) 了解详细信息。
+
+## 示例
+
+```hocon
+source {
+
+ Hudi {
+ table_dfs_path = "hdfs://nameserivce/data/hudi/hudi_table/"
+ table_type = "cow"
+ conf_files_path =
"/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml"
+ use.kerberos = true
+ kerberos.principal = "test_user@xxx"
+ kerberos.principal.file = "/home/test/test_user.keytab"
+ }
+
+}
+```
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 25dc239f99..6304236ec3 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -52,7 +52,6 @@ seatunnel.sink.OssJindoFile = connector-file-jindo-oss
seatunnel.source.CosFile = connector-file-cos
seatunnel.sink.CosFile = connector-file-cos
seatunnel.source.Pulsar = connector-pulsar
-seatunnel.source.Hudi = connector-hudi
seatunnel.sink.DingTalk = connector-dingtalk
seatunnel.source.Elasticsearch = connector-elasticsearch
seatunnel.sink.Elasticsearch = connector-elasticsearch
@@ -119,6 +118,7 @@ seatunnel.source.AmazonSqs = connector-amazonsqs
seatunnel.sink.AmazonSqs = connector-amazonsqs
seatunnel.source.Paimon = connector-paimon
seatunnel.sink.Paimon = connector-paimon
+seatunnel.sink.hudi = connector-hudi
seatunnel.sink.Druid = connector-druid
seatunnel.source.Easysearch = connector-easysearch
seatunnel.sink.Easysearch = connector-easysearch
diff --git a/seatunnel-connectors-v2/connector-hudi/pom.xml
b/seatunnel-connectors-v2/connector-hudi/pom.xml
index 4a5e15ebef..ea4f1be639 100644
--- a/seatunnel-connectors-v2/connector-hudi/pom.xml
+++ b/seatunnel-connectors-v2/connector-hudi/pom.xml
@@ -30,86 +30,61 @@
<name>SeaTunnel : Connectors V2 : Hudi</name>
<properties>
- <hive.exec.version>2.3.9</hive.exec.version>
- <hudi.version>0.11.1</hudi.version>
+ <hudi.version>0.15.0</hudi.version>
<commons.lang3.version>3.4</commons.lang3.version>
+ <parquet.version>1.14.1</parquet.version>
+ <snappy.version>1.1.8.3</snappy.version>
+ <kryo.shaded.version>4.0.2</kryo.shaded.version>
</properties>
<dependencies>
<dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- <version>${hive.exec.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>org.pentaho</groupId>
- <artifactId>pentaho-aggdesigner-algorithm</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-1.2-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-web</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apapche.hadoop</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.github.joshelser</groupId>
-
<artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-java-client</artifactId>
+ <version>${hudi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
- <artifactId>hudi-hadoop-mr-bundle</artifactId>
+ <artifactId>hudi-client-common</artifactId>
<version>${hudi.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${commons.lang3.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>${parquet.version}</version>
<exclusions>
<exclusion>
- <groupId>org.glassfish</groupId>
- <artifactId>javax.el</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ <version>${parquet.version}</version>
+ </dependency>
<dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <version>${commons.lang3.version}</version>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>${snappy.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>kryo-shaded</artifactId>
+ <version>${kryo.shaded.version}</version>
+ </dependency>
+
</dependencies>
</project>
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiOptions.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiOptions.java
new file mode 100644
index 0000000000..443d06d907
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiOptions.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hudi.config;
+
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+
+public interface HudiOptions {
+
+ Option<String> CONF_FILES_PATH =
+ Options.key("conf_files_path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("hudi conf files");
+
+ Option<String> TABLE_NAME =
+
Options.key("table_name").stringType().noDefaultValue().withDescription("table_name");
+
+ Option<String> TABLE_DFS_PATH =
+ Options.key("table_dfs_path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("table_dfs_path");
+
+ Option<String> RECORD_KEY_FIELDS =
+ Options.key("record_key_fields")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("recordKeyFields");
+
+ Option<String> PARTITION_FIELDS =
+ Options.key("partition_fields")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("partitionFields");
+
+ Option<HoodieTableType> TABLE_TYPE =
+ Options.key("table_type")
+ .type(new TypeReference<HoodieTableType>() {})
+ .defaultValue(HoodieTableType.COPY_ON_WRITE)
+ .withDescription("table_type");
+ Option<WriteOperationType> OP_TYPE =
+ Options.key("op_type")
+ .type(new TypeReference<WriteOperationType>() {})
+ .defaultValue(WriteOperationType.INSERT)
+ .withDescription("op_type");
+
+ Option<Integer> BATCH_INTERVAL_MS =
+ Options.key("batch_interval_ms")
+ .intType()
+ .defaultValue(1000)
+ .withDescription("batch interval milliSecond");
+
+ Option<Integer> INSERT_SHUFFLE_PARALLELISM =
+ Options.key("insert_shuffle_parallelism")
+ .intType()
+ .defaultValue(2)
+ .withDescription("insert_shuffle_parallelism");
+
+ Option<Integer> UPSERT_SHUFFLE_PARALLELISM =
+ Options.key("upsert_shuffle_parallelism")
+ .intType()
+ .defaultValue(2)
+ .withDescription("upsert_shuffle_parallelism");
+
+ Option<Integer> MIN_COMMITS_TO_KEEP =
+ Options.key("min_commits_to_keep")
+ .intType()
+ .defaultValue(20)
+ .withDescription("hoodie.keep.min.commits");
+
+ Option<Integer> MAX_COMMITS_TO_KEEP =
+ Options.key("max_commits_to_keep")
+ .intType()
+ .defaultValue(30)
+ .withDescription("hoodie.keep.max.commits");
+}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkConfig.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkConfig.java
new file mode 100644
index 0000000000..51bd72ec61
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkConfig.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hudi.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+
+import lombok.Builder;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@Builder(builderClassName = "Builder")
+public class HudiSinkConfig implements Serializable {
+
+ private static final long serialVersionUID = 2L;
+
+ private String tableName;
+
+ private String tableDfsPath;
+
+ private int insertShuffleParallelism;
+
+ private int upsertShuffleParallelism;
+
+ private int minCommitsToKeep;
+
+ private int maxCommitsToKeep;
+
+ private HoodieTableType tableType;
+
+ private WriteOperationType opType;
+
+ private String confFilesPath;
+
+ private int batchIntervalMs;
+
+ private String recordKeyFields;
+
+ private String partitionFields;
+
+ public static HudiSinkConfig of(ReadonlyConfig config) {
+ HudiSinkConfig.Builder builder = HudiSinkConfig.builder();
+ builder.confFilesPath(config.get(HudiOptions.CONF_FILES_PATH));
+ builder.tableName(config.get(HudiOptions.TABLE_NAME));
+ builder.tableDfsPath(config.get(HudiOptions.TABLE_DFS_PATH));
+ builder.tableType(config.get(HudiOptions.TABLE_TYPE));
+ builder.opType(config.get(HudiOptions.OP_TYPE));
+
+ builder.batchIntervalMs(config.get(HudiOptions.BATCH_INTERVAL_MS));
+
+ builder.partitionFields(config.get(HudiOptions.PARTITION_FIELDS));
+
+ builder.recordKeyFields(config.get(HudiOptions.RECORD_KEY_FIELDS));
+
+
builder.insertShuffleParallelism(config.get(HudiOptions.INSERT_SHUFFLE_PARALLELISM));
+
+
builder.upsertShuffleParallelism(config.get(HudiOptions.UPSERT_SHUFFLE_PARALLELISM));
+
+ builder.minCommitsToKeep(config.get(HudiOptions.MIN_COMMITS_TO_KEEP));
+ builder.maxCommitsToKeep(config.get(HudiOptions.MAX_COMMITS_TO_KEEP));
+ return builder.build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSourceConfig.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSourceConfig.java
deleted file mode 100644
index 1ef530619a..0000000000
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSourceConfig.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hudi.config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-
-public class HudiSourceConfig {
- public static final Option<String> TABLE_PATH =
- Options.key("table.path")
- .stringType()
- .noDefaultValue()
- .withDescription("hudi table path");
-
- public static final Option<String> TABLE_TYPE =
- Options.key("table.type")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "hudi table type. default hudi table type is cow.
mor is not support yet");
-
- public static final Option<String> CONF_FILES =
- Options.key("conf.files")
- .stringType()
- .noDefaultValue()
- .withDescription("hudi conf files ");
-
- public static final Option<Boolean> USE_KERBEROS =
- Options.key("use.kerberos")
- .booleanType()
- .defaultValue(false)
- .withDescription("hudi use.kerberos");
-
- public static final Option<String> KERBEROS_PRINCIPAL =
- Options.key("kerberos.principal")
- .stringType()
- .noDefaultValue()
- .withDescription("hudi kerberos.principal");
-
- public static final Option<String> KERBEROS_PRINCIPAL_FILE =
- Options.key("kerberos.principal.file")
- .stringType()
- .noDefaultValue()
- .withDescription("hudi kerberos.principal.file ");
-}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
new file mode 100644
index 0000000000..9e6ddfee86
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hudi.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.hudi.sink.committer.HudiSinkAggregatedCommitter;
+import
org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.HudiSinkWriter;
+import
org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiSinkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public class HudiSink
+ implements SeaTunnelSink<
+ SeaTunnelRow, HudiSinkState, HudiCommitInfo,
HudiAggregatedCommitInfo>,
+ SupportMultiTableSink {
+
+ private HudiSinkConfig hudiSinkConfig;
+ private SeaTunnelRowType seaTunnelRowType;
+ private CatalogTable catalogTable;
+
+ public HudiSink(ReadonlyConfig config, CatalogTable table) {
+ this.hudiSinkConfig = HudiSinkConfig.of(config);
+ this.catalogTable = table;
+ this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+ }
+
+ @Override
+ public String getPluginName() {
+ return "Hudi";
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, HudiCommitInfo, HudiSinkState>
restoreWriter(
+ SinkWriter.Context context, List<HudiSinkState> states) throws
IOException {
+ return new HudiSinkWriter(context, seaTunnelRowType, hudiSinkConfig,
states);
+ }
+
+ @Override
+ public Optional<Serializer<HudiSinkState>> getWriterStateSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ @Override
+ public Optional<Serializer<HudiCommitInfo>> getCommitInfoSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ @Override
+ public Optional<SinkAggregatedCommitter<HudiCommitInfo,
HudiAggregatedCommitInfo>>
+ createAggregatedCommitter() throws IOException {
+ return Optional.of(new HudiSinkAggregatedCommitter(hudiSinkConfig,
seaTunnelRowType));
+ }
+
+ @Override
+ public Optional<Serializer<HudiAggregatedCommitInfo>>
getAggregatedCommitInfoSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, HudiCommitInfo, HudiSinkState>
createWriter(
+ SinkWriter.Context context) throws IOException {
+ return new HudiSinkWriter(context, seaTunnelRowType, hudiSinkConfig,
new ArrayList<>());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
new file mode 100644
index 0000000000..d38785de02
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hudi.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.BATCH_INTERVAL_MS;
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.CONF_FILES_PATH;
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.INSERT_SHUFFLE_PARALLELISM;
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.MAX_COMMITS_TO_KEEP;
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.MIN_COMMITS_TO_KEEP;
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.OP_TYPE;
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.PARTITION_FIELDS;
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.RECORD_KEY_FIELDS;
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.TABLE_DFS_PATH;
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.TABLE_NAME;
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.TABLE_TYPE;
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.UPSERT_SHUFFLE_PARALLELISM;
+
+@AutoService(Factory.class)
+public class HudiSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "Hudi";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(TABLE_DFS_PATH, TABLE_NAME)
+ .optional(
+ CONF_FILES_PATH,
+ RECORD_KEY_FIELDS,
+ PARTITION_FIELDS,
+ TABLE_TYPE,
+ OP_TYPE,
+ BATCH_INTERVAL_MS,
+ INSERT_SHUFFLE_PARALLELISM,
+ UPSERT_SHUFFLE_PARALLELISM,
+ MIN_COMMITS_TO_KEEP,
+ MAX_COMMITS_TO_KEEP)
+ .build();
+ }
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ CatalogTable catalogTable = context.getCatalogTable();
+ return () -> new HudiSink(context.getOptions(), catalogTable);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/committer/HudiSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/committer/HudiSinkAggregatedCommitter.java
new file mode 100644
index 0000000000..9df2490545
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/committer/HudiSinkAggregatedCommitter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hudi.sink.committer;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.AvroSchemaConverter.convertToSchema;
+
+@Slf4j
+public class HudiSinkAggregatedCommitter
+ implements SinkAggregatedCommitter<HudiCommitInfo,
HudiAggregatedCommitInfo> {
+
+ private HoodieJavaWriteClient<HoodieAvroPayload> writeClient;
+
+ private final HoodieWriteConfig cfg;
+
+ private final HadoopStorageConfiguration hudiStorageConfiguration;
+
+ public HudiSinkAggregatedCommitter(
+ HudiSinkConfig hudiSinkConfig, SeaTunnelRowType seaTunnelRowType) {
+
+ Configuration hadoopConf = new Configuration();
+ if (hudiSinkConfig.getConfFilesPath() != null) {
+ hadoopConf =
HudiUtil.getConfiguration(hudiSinkConfig.getConfFilesPath());
+ }
+ hudiStorageConfiguration = new HadoopStorageConfiguration(hadoopConf);
+ cfg =
+ HoodieWriteConfig.newBuilder()
+ .withEmbeddedTimelineServerEnabled(false)
+ .withEngineType(EngineType.JAVA)
+ .withPath(hudiSinkConfig.getTableDfsPath())
+
.withSchema(convertToSchema(seaTunnelRowType).toString())
+ .withParallelism(
+ hudiSinkConfig.getInsertShuffleParallelism(),
+ hudiSinkConfig.getUpsertShuffleParallelism())
+ .forTable(hudiSinkConfig.getTableName())
+ .withIndexConfig(
+ HoodieIndexConfig.newBuilder()
+
.withIndexType(HoodieIndex.IndexType.INMEMORY)
+ .build())
+ .withArchivalConfig(
+ HoodieArchivalConfig.newBuilder()
+ .archiveCommitsWith(
+
hudiSinkConfig.getMinCommitsToKeep(),
+
hudiSinkConfig.getMaxCommitsToKeep())
+ .build())
+ .withCleanConfig(
+ HoodieCleanConfig.newBuilder()
+ .withAutoClean(true)
+ .withAsyncClean(false)
+ .build())
+ .build();
+ }
+
+ @Override
+ public List<HudiAggregatedCommitInfo> commit(
+ List<HudiAggregatedCommitInfo> aggregatedCommitInfo) throws
IOException {
+ writeClient =
+ new HoodieJavaWriteClient<>(
+ new HoodieJavaEngineContext(hudiStorageConfiguration),
cfg);
+ aggregatedCommitInfo =
+ aggregatedCommitInfo.stream()
+ .filter(
+ commit ->
+ commit.getHudiCommitInfoList().stream()
+ .anyMatch(
+ aggreeCommit ->
+
!writeClient.commit(
+
aggreeCommit
+
.getInstantTime(),
+
aggreeCommit
+
.getWriteStatusList())))
+ .collect(Collectors.toList());
+
+ return aggregatedCommitInfo;
+ }
+
+ @Override
+ public HudiAggregatedCommitInfo combine(List<HudiCommitInfo> commitInfos) {
+ return new HudiAggregatedCommitInfo(commitInfos);
+ }
+
+ @Override
+ public void abort(List<HudiAggregatedCommitInfo> aggregatedCommitInfo)
throws Exception {
+ writeClient.rollbackFailedWrites();
+ }
+
+ @Override
+ public void close() {
+ if (writeClient != null) {
+ writeClient.close();
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/AvroSchemaConverter.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/AvroSchemaConverter.java
new file mode 100644
index 0000000000..147456fbbd
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/AvroSchemaConverter.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Converts an Avro schema into Seatunnel's type information. */
+public class AvroSchemaConverter {
+
+ private AvroSchemaConverter() {
+ // private
+ }
+
+ /**
+ * Converts Seatunnel {@link SeaTunnelDataType} (can be nested) into an
Avro schema.
+ *
+ * <p>Use "org.apache.seatunnel.avro.generated.record" as the type name.
+ *
+ * @param schema the schema type, usually it should be the top level
record type, e.g. not a
+ * nested type
+ * @return Avro's {@link Schema} matching this logical type.
+ */
+ public static Schema convertToSchema(SeaTunnelDataType<?> schema) {
+ return convertToSchema(schema,
"org.apache.seatunnel.avro.generated.record");
+ }
+
+ /**
+ * Converts Seatunnel {@link SeaTunnelDataType} (can be nested) into an
Avro schema.
+ *
+ * <p>The "{rowName}_" is used as the nested row type name prefix in order
to generate the right
+ * schema. Nested record type that only differs with type name is still
compatible.
+ *
+ * @param dataType logical type
+ * @param rowName the record name
+ * @return Avro's {@link Schema} matching this logical type.
+ */
+ public static Schema convertToSchema(SeaTunnelDataType<?> dataType, String
rowName) {
+ switch (dataType.getSqlType()) {
+ case BOOLEAN:
+ Schema bool = SchemaBuilder.builder().booleanType();
+ return nullableSchema(bool);
+ case TINYINT:
+ case SMALLINT:
+ case INT:
+ Schema integer = SchemaBuilder.builder().intType();
+ return nullableSchema(integer);
+ case BIGINT:
+ Schema bigint = SchemaBuilder.builder().longType();
+ return nullableSchema(bigint);
+ case FLOAT:
+ Schema f = SchemaBuilder.builder().floatType();
+ return nullableSchema(f);
+ case DOUBLE:
+ Schema d = SchemaBuilder.builder().doubleType();
+ return nullableSchema(d);
+ case STRING:
+ Schema str = SchemaBuilder.builder().stringType();
+ return nullableSchema(str);
+ case BYTES:
+ Schema binary = SchemaBuilder.builder().bytesType();
+ return nullableSchema(binary);
+ case TIMESTAMP:
+ // use long to represents Timestamp
+ LogicalType avroLogicalType;
+ avroLogicalType = LogicalTypes.timestampMillis();
+ Schema timestamp =
avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
+ return nullableSchema(timestamp);
+ case DATE:
+ // use int to represents Date
+ Schema date =
LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
+ return nullableSchema(date);
+ case TIME:
+ // use int to represents Time, we only support millisecond
when deserialization
+ Schema time =
+
LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
+ return nullableSchema(time);
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) dataType;
+ // store BigDecimal as byte[]
+ Schema decimal =
+ LogicalTypes.decimal(decimalType.getPrecision(),
decimalType.getScale())
+
.addToSchema(SchemaBuilder.builder().bytesType());
+ return nullableSchema(decimal);
+ case ROW:
+ SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
+ List<String> fieldNames =
Arrays.asList(rowType.getFieldNames());
+ // we have to make sure the record name is different in a
Schema
+ SchemaBuilder.FieldAssembler<Schema> builder =
+ SchemaBuilder.builder().record(rowName).fields();
+ for (int i = 0; i < fieldNames.size(); i++) {
+ String fieldName = fieldNames.get(i);
+ SeaTunnelDataType<?> fieldType = rowType.getFieldType(i);
+ SchemaBuilder.GenericDefault<Schema> fieldBuilder =
+ builder.name(fieldName)
+ .type(convertToSchema(fieldType, rowName +
"_" + fieldName));
+
+ builder = fieldBuilder.withDefault(null);
+ }
+ return builder.endRecord();
+ case MAP:
+ Schema map =
+ SchemaBuilder.builder()
+ .map()
+ .values(
+ convertToSchema(
+
extractValueTypeToAvroMap(dataType), rowName));
+ return nullableSchema(map);
+ case ARRAY:
+ ArrayType<?, ?> arrayType = (ArrayType<?, ?>) dataType;
+ Schema array =
+ SchemaBuilder.builder()
+ .array()
+
.items(convertToSchema(arrayType.getElementType(), rowName));
+ return nullableSchema(array);
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported to derive Schema for type: " + dataType);
+ }
+ }
+
+ public static SeaTunnelDataType<?>
extractValueTypeToAvroMap(SeaTunnelDataType<?> type) {
+ SeaTunnelDataType<?> keyType;
+ SeaTunnelDataType<?> valueType;
+ MapType<?, ?> mapType = (MapType<?, ?>) type;
+ keyType = mapType.getKeyType();
+ valueType = mapType.getValueType();
+ if (keyType.getSqlType() != SqlType.STRING) {
+ throw new UnsupportedOperationException(
+ "Avro format doesn't support non-string as key type of
map. "
+ + "The key type is: "
+ + keyType.getSqlType());
+ }
+ return valueType;
+ }
+
+ /** Returns schema with nullable true. */
+ private static Schema nullableSchema(Schema schema) {
+ return Schema.createUnion(SchemaBuilder.builder().nullType(), schema);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiSinkWriter.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiSinkWriter.java
new file mode 100644
index 0000000000..50effd6b44
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiSinkWriter.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiSinkState;
+import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.AvroSchemaConverter.convertToSchema;
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.RowDataToAvroConverters.createConverter;
+
+@Slf4j
+public class HudiSinkWriter
+ implements SinkWriter<SeaTunnelRow, HudiCommitInfo, HudiSinkState>,
+ SupportMultiTableSinkWriter<Void> {
+
+ public static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+ protected static final String DEFAULT_PARTITION_PATH = "default";
+ protected static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
+ protected static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
+ private final HoodieJavaWriteClient<HoodieAvroPayload> writeClient;
+ private final WriteOperationType opType;
+ private final Schema schema;
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final HudiSinkConfig hudiSinkConfig;
+ private final List<HoodieRecord<HoodieAvroPayload>> hoodieRecords;
+ private transient List<WriteStatus> writeStatusList;
+ private transient String instantTime;
+ private transient int batchCount = 0;
+ private transient volatile boolean closed = false;
+ private transient volatile Exception flushException;
+
+ public HudiSinkWriter(
+ SinkWriter.Context context,
+ SeaTunnelRowType seaTunnelRowType,
+ HudiSinkConfig hudiSinkConfig,
+ List<HudiSinkState> hudiSinkState)
+ throws IOException {
+
+ this.hoodieRecords = new ArrayList<>(30);
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.schema = new
Schema.Parser().parse(convertToSchema(seaTunnelRowType).toString());
+ this.opType = hudiSinkConfig.getOpType();
+ this.hudiSinkConfig = hudiSinkConfig;
+ Configuration hadoopConf = new Configuration();
+ if (hudiSinkConfig.getConfFilesPath() != null) {
+ hadoopConf =
HudiUtil.getConfiguration(hudiSinkConfig.getConfFilesPath());
+ }
+ HadoopStorageConfiguration hudiStorageConfiguration =
+ new HadoopStorageConfiguration(hadoopConf);
+
+ // initialize the table, if not done already
+ Path path = new Path(hudiSinkConfig.getTableDfsPath());
+ FileSystem fs =
+ HadoopFSUtils.getFs(hudiSinkConfig.getTableDfsPath(),
hudiStorageConfiguration);
+ HoodieTableMetaClient.withPropertyBuilder()
+ .setTableType(hudiSinkConfig.getTableType())
+ .setTableName(hudiSinkConfig.getTableName())
+ .setPayloadClassName(HoodieAvroPayload.class.getName())
+ .initTable(hudiStorageConfiguration,
hudiSinkConfig.getTableDfsPath());
+ HoodieWriteConfig cfg =
+ HoodieWriteConfig.newBuilder()
+ .withEmbeddedTimelineServerEnabled(false)
+ .withEngineType(EngineType.JAVA)
+ .withPath(hudiSinkConfig.getTableDfsPath())
+
.withSchema(convertToSchema(seaTunnelRowType).toString())
+ .withParallelism(
+ hudiSinkConfig.getInsertShuffleParallelism(),
+ hudiSinkConfig.getUpsertShuffleParallelism())
+ .forTable(hudiSinkConfig.getTableName())
+ .withIndexConfig(
+ HoodieIndexConfig.newBuilder()
+
.withIndexType(HoodieIndex.IndexType.INMEMORY)
+ .build())
+ .withArchivalConfig(
+ HoodieArchivalConfig.newBuilder()
+ .archiveCommitsWith(
+
hudiSinkConfig.getMinCommitsToKeep(),
+
hudiSinkConfig.getMaxCommitsToKeep())
+ .build())
+ .withAutoCommit(false)
+ .withCleanConfig(
+ HoodieCleanConfig.newBuilder()
+ .withAutoClean(true)
+ .withAsyncClean(false)
+ .build())
+ .build();
+
+ writeClient =
+ new HoodieJavaWriteClient<>(
+ new HoodieJavaEngineContext(hudiStorageConfiguration),
cfg);
+
+ if (!hudiSinkState.isEmpty()) {
+ writeClient.commit(
+ hudiSinkState.get(0).getHudiCommitInfo().getInstantTime(),
+
hudiSinkState.get(0).getHudiCommitInfo().getWriteStatusList());
+ }
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ checkFlushException();
+
+ batchCount++;
+ prepareRecords(element);
+
+ if (batchCount >= hudiSinkConfig.getMaxCommitsToKeep()) {
+ flush();
+ }
+ }
+
+ @Override
+ public Optional<HudiCommitInfo> prepareCommit() {
+ flush();
+ return Optional.of(new HudiCommitInfo(instantTime, writeStatusList));
+ }
+
+ @Override
+ public List<HudiSinkState> snapshotState(long checkpointId) throws
IOException {
+ return Collections.singletonList(
+ new HudiSinkState(checkpointId, new
HudiCommitInfo(instantTime, writeStatusList)));
+ }
+
+ @Override
+ public void abortPrepare() {}
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+
+ if (batchCount > 0) {
+ try {
+ flush();
+ } catch (Exception e) {
+ log.warn("Writing records to Hudi failed.", e);
+ throw new HudiConnectorException(
+ CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR,
+ "Writing records to hudi failed.",
+ e);
+ }
+ }
+ if (writeClient != null) {
+ writeClient.close();
+ }
+ closed = true;
+ checkFlushException();
+ }
+ }
+
+ private void prepareRecords(SeaTunnelRow element) {
+
+ hoodieRecords.add(convertRow(element));
+ }
+
+ private HoodieRecord<HoodieAvroPayload> convertRow(SeaTunnelRow element) {
+ GenericRecord rec = new GenericData.Record(schema);
+ for (int i = 0; i < seaTunnelRowType.getTotalFields(); i++) {
+ rec.put(
+ seaTunnelRowType.getFieldNames()[i],
+ createConverter(seaTunnelRowType.getFieldType(i))
+ .convert(
+
convertToSchema(seaTunnelRowType.getFieldType(i)),
+ element.getField(i)));
+ }
+ return new HoodieAvroRecord<>(
+ getHoodieKey(element, seaTunnelRowType), new
HoodieAvroPayload(Option.of(rec)));
+ }
+
+ private HoodieKey getHoodieKey(SeaTunnelRow element, SeaTunnelRowType
seaTunnelRowType) {
+ String partitionPath =
+ hudiSinkConfig.getPartitionFields() == null
+ ? ""
+ : getRecordPartitionPath(element, seaTunnelRowType);
+ String rowKey =
+ hudiSinkConfig.getRecordKeyFields() == null
+ &&
hudiSinkConfig.getOpType().equals(WriteOperationType.INSERT)
+ ? UUID.randomUUID().toString()
+ : getRecordKey(element, seaTunnelRowType);
+ return new HoodieKey(rowKey, partitionPath);
+ }
+
+ private String getRecordKey(SeaTunnelRow element, SeaTunnelRowType
seaTunnelRowType) {
+ boolean keyIsNullEmpty = true;
+ StringBuilder recordKey = new StringBuilder();
+ for (String recordKeyField :
hudiSinkConfig.getRecordKeyFields().split(",")) {
+ String recordKeyValue =
+ getNestedFieldValAsString(element, seaTunnelRowType,
recordKeyField);
+ recordKeyField = recordKeyField.toLowerCase();
+ if (recordKeyValue == null) {
+ recordKey
+ .append(recordKeyField)
+ .append(":")
+ .append(NULL_RECORDKEY_PLACEHOLDER)
+ .append(",");
+ } else if (recordKeyValue.isEmpty()) {
+ recordKey
+ .append(recordKeyField)
+ .append(":")
+ .append(EMPTY_RECORDKEY_PLACEHOLDER)
+ .append(",");
+ } else {
+
recordKey.append(recordKeyField).append(":").append(recordKeyValue).append(",");
+ keyIsNullEmpty = false;
+ }
+ }
+ recordKey.deleteCharAt(recordKey.length() - 1);
+ if (keyIsNullEmpty) {
+ throw new HoodieKeyException(
+ "recordKey values: \""
+ + recordKey
+ + "\" for fields: "
+ + hudiSinkConfig.getRecordKeyFields()
+ + " cannot be entirely null or empty.");
+ }
+ return recordKey.toString();
+ }
+
+ private String getRecordPartitionPath(SeaTunnelRow element,
SeaTunnelRowType seaTunnelRowType) {
+ if (hudiSinkConfig.getPartitionFields().isEmpty()) {
+ return "";
+ }
+
+ StringBuilder partitionPath = new StringBuilder();
+ String[] avroPartitionPathFields =
hudiSinkConfig.getPartitionFields().split(",");
+ for (String partitionPathField : avroPartitionPathFields) {
+ String fieldVal =
+ getNestedFieldValAsString(element, seaTunnelRowType,
partitionPathField);
+ if (fieldVal == null || fieldVal.isEmpty()) {
+
partitionPath.append(partitionPathField).append("=").append(DEFAULT_PARTITION_PATH);
+ } else {
+
partitionPath.append(partitionPathField).append("=").append(fieldVal);
+ }
+ partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
+ }
+ partitionPath.deleteCharAt(partitionPath.length() - 1);
+ return partitionPath.toString();
+ }
+
+ private String getNestedFieldValAsString(
+ SeaTunnelRow element, SeaTunnelRowType seaTunnelRowType, String
fieldName) {
+ Object value = null;
+
+ if (Arrays.stream(seaTunnelRowType.getFieldNames())
+ .collect(Collectors.toList())
+ .contains(fieldName)) {
+ value = element.getField(seaTunnelRowType.indexOf(fieldName));
+ }
+ return StringUtils.objToString(value);
+ }
+
+ public synchronized void flush() {
+ checkFlushException();
+ instantTime = writeClient.startCommit();
+ switch (opType) {
+ case INSERT:
+ writeStatusList = writeClient.insert(hoodieRecords,
instantTime);
+ break;
+ case UPSERT:
+ writeStatusList = writeClient.upsert(hoodieRecords,
instantTime);
+ break;
+ case BULK_INSERT:
+ writeStatusList = writeClient.bulkInsert(hoodieRecords,
instantTime);
+ break;
+ default:
+ throw new HudiConnectorException(
+ CommonErrorCode.OPERATION_NOT_SUPPORTED,
+ "Unsupported operation type: " + opType);
+ }
+ batchCount = 0;
+ }
+
+ private void checkFlushException() {
+ if (flushException != null) {
+ throw new HudiConnectorException(
+ CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR,
+ "Writing records to Hudi failed.",
+ flushException);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/RowDataToAvroConverters.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/RowDataToAvroConverters.java
new file mode 100644
index 0000000000..7cf50deea8
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/RowDataToAvroConverters.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.temporal.ChronoField;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.AvroSchemaConverter.extractValueTypeToAvroMap;
+
+/** Tool class used to convert from {@link SeaTunnelRow} to Avro {@link
GenericRecord}. */
+public class RowDataToAvroConverters {
+
+ //
--------------------------------------------------------------------------------
+ // Runtime Converters
+ //
--------------------------------------------------------------------------------
+
+ /**
+ * Runtime converter that converts objects of Seatunnel internal data
structures to
+ * corresponding Avro data structures.
+ */
+ @FunctionalInterface
+ public interface RowDataToAvroConverter extends Serializable {
+ Object convert(Schema schema, Object object);
+ }
+
+ /**
+ * Creates a runtime converter according to the given logical type that
converts objects of
+ * Seatunnel internal data structures to corresponding Avro data
structures.
+ */
+ public static RowDataToAvroConverter createConverter(SeaTunnelDataType<?>
dataType) {
+ final RowDataToAvroConverter converter;
+ switch (dataType.getSqlType()) {
+ case TINYINT:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object
object) {
+ return ((Byte) object).intValue();
+ }
+ };
+ break;
+ case SMALLINT:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object
object) {
+ return ((Short) object).intValue();
+ }
+ };
+ break;
+ case BOOLEAN: // boolean
+ case INT: // int
+ case BIGINT: // long
+ case FLOAT: // float
+ case DOUBLE: // double
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object
object) {
+ return object;
+ }
+ };
+ break;
+ case TIME: // int
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object
object) {
+ return ((LocalTime)
object).get(ChronoField.MILLI_OF_DAY);
+ }
+ };
+ break;
+ case DATE: // int
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object
object) {
+ return ((int) ((LocalDate)
object).toEpochDay());
+ }
+ };
+ break;
+ case STRING:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object
object) {
+ return new Utf8(object.toString());
+ }
+ };
+ break;
+ case BYTES:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object
object) {
+ return ByteBuffer.wrap((byte[]) object);
+ }
+ };
+ break;
+ case TIMESTAMP:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object
object) {
+ return ((LocalDateTime) object)
+ .toInstant(java.time.ZoneOffset.UTC)
+ .toEpochMilli();
+ }
+ };
+ break;
+ case DECIMAL:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object
object) {
+ return ByteBuffer.wrap(
+ ((BigDecimal)
object).unscaledValue().toByteArray());
+ }
+ };
+ break;
+ case ARRAY:
+ converter = createArrayConverter((ArrayType<?, ?>) dataType);
+ break;
+ case ROW:
+ converter = createRowConverter((SeaTunnelRowType) dataType);
+ break;
+ case MAP:
+ converter = createMapConverter(dataType);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " +
dataType);
+ }
+
+ // wrap into nullable converter
+ return new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ if (object == null) {
+ return null;
+ }
+
+ // get actual schema if it is a nullable schema
+ Schema actualSchema;
+ if (schema.getType() == Schema.Type.UNION) {
+ List<Schema> types = schema.getTypes();
+ int size = types.size();
+ if (size == 2 && types.get(1).getType() ==
Schema.Type.NULL) {
+ actualSchema = types.get(0);
+ } else if (size == 2 && types.get(0).getType() ==
Schema.Type.NULL) {
+ actualSchema = types.get(1);
+ } else {
+ throw new IllegalArgumentException(
+ "The Avro schema is not a nullable type: " +
schema);
+ }
+ } else {
+ actualSchema = schema;
+ }
+ return converter.convert(actualSchema, object);
+ }
+ };
+ }
+
+ private static RowDataToAvroConverter createRowConverter(SeaTunnelRowType
rowType) {
+ final RowDataToAvroConverter[] fieldConverters =
+ Arrays.stream(rowType.getFieldTypes())
+ .map(RowDataToAvroConverters::createConverter)
+ .toArray(RowDataToAvroConverter[]::new);
+ final SeaTunnelDataType<?>[] fieldTypes = rowType.getFieldTypes();
+
+ return new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ final SeaTunnelRow row = (SeaTunnelRow) object;
+ final List<Schema.Field> fields = schema.getFields();
+ final GenericRecord record = new GenericData.Record(schema);
+ for (int i = 0; i < fieldTypes.length; ++i) {
+ final Schema.Field schemaField = fields.get(i);
+ try {
+ Object avroObject =
+
fieldConverters[i].convert(schemaField.schema(), row.getField(i));
+ record.put(i, avroObject);
+ } catch (Throwable t) {
+ throw new RuntimeException(
+ String.format(
+ "Fail to serialize at field: %s.",
schemaField.name()),
+ t);
+ }
+ }
+ return record;
+ }
+ };
+ }
+
+ private static RowDataToAvroConverter createArrayConverter(ArrayType<?, ?>
arrayType) {
+ final RowDataToAvroConverter elementConverter =
createConverter(arrayType.getElementType());
+
+ return new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ final Schema elementSchema = schema.getElementType();
+ Object[] arrayData = (Object[]) object;
+ List<Object> list = new ArrayList<>();
+ for (Object arrayDatum : arrayData) {
+ list.add(elementConverter.convert(elementSchema,
arrayDatum));
+ }
+ return list;
+ }
+ };
+ }
+
+ private static RowDataToAvroConverter
createMapConverter(SeaTunnelDataType<?> type) {
+ SeaTunnelDataType<?> valueType = extractValueTypeToAvroMap(type);
+
+ final RowDataToAvroConverter valueConverter =
createConverter(valueType);
+
+ return new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ final Schema valueSchema = schema.getValueType();
+ final Map<String, Object> mapData = (Map) object;
+
+ final Map<Object, Object> map = new HashMap<>(mapData.size());
+
+ mapData.forEach(
+ (s, o) -> {
+ map.put(s, valueConverter.convert(valueSchema, o));
+ });
+
+ return map;
+ }
+ };
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
deleted file mode 100644
index a2bcda0a91..0000000000
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hudi.source;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.source.Boundedness;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceReader;
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.api.source.SupportParallelism;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import
org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiError;
-import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil;
-
-import com.google.auto.service.AutoService;
-
-import java.io.IOException;
-
-import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.CONF_FILES;
-import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.KERBEROS_PRINCIPAL;
-import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.KERBEROS_PRINCIPAL_FILE;
-import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.TABLE_PATH;
-import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.TABLE_TYPE;
-import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.USE_KERBEROS;
-
-@AutoService(SeaTunnelSource.class)
-public class HudiSource
- implements SeaTunnelSource<SeaTunnelRow, HudiSourceSplit,
HudiSourceState>,
- SupportParallelism {
-
- private SeaTunnelRowType typeInfo;
-
- private String filePath;
-
- private String tablePath;
-
- private String confFiles;
-
- private boolean useKerberos = false;
-
- @Override
- public String getPluginName() {
- return "Hudi";
- }
-
- @Override
- public void prepare(Config pluginConfig) {
- CheckResult result =
- CheckConfigUtil.checkAllExists(pluginConfig, TABLE_PATH.key(),
CONF_FILES.key());
- if (!result.isSuccess()) {
- throw new HudiConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
result.getMsg()));
- }
- // default hudi table type is cow
- // TODO: support hudi mor table
- // TODO: support Incremental Query and Read Optimized Query
- if (!"cow".equalsIgnoreCase(pluginConfig.getString(TABLE_TYPE.key())))
{
- throw new HudiConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(),
- PluginType.SOURCE,
- "Do not support hudi mor table yet!"));
- }
- try {
- this.confFiles = pluginConfig.getString(CONF_FILES.key());
- this.tablePath = pluginConfig.getString(TABLE_PATH.key());
- if (CheckConfigUtil.isValidParam(pluginConfig,
USE_KERBEROS.key())) {
- this.useKerberos = pluginConfig.getBoolean(USE_KERBEROS.key());
- if (this.useKerberos) {
- CheckResult kerberosCheckResult =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- KERBEROS_PRINCIPAL.key(),
- KERBEROS_PRINCIPAL_FILE.key());
- if (!kerberosCheckResult.isSuccess()) {
- throw new HudiConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s,
Message: %s",
- getPluginName(), PluginType.SOURCE,
result.getMsg()));
- }
- HudiUtil.initKerberosAuthentication(
- HudiUtil.getConfiguration(this.confFiles),
- pluginConfig.getString(KERBEROS_PRINCIPAL.key()),
-
pluginConfig.getString(KERBEROS_PRINCIPAL_FILE.key()));
- }
- }
- this.filePath = HudiUtil.getParquetFileByPath(this.confFiles,
tablePath);
- if (this.filePath == null) {
- throw HudiError.cannotFindParquetFile(tablePath);
- }
- // should read from config or read from hudi metadata( wait
catalog done)
- this.typeInfo = HudiUtil.getSeaTunnelRowTypeInfo(this.confFiles,
this.filePath);
- } catch (HudiConnectorException | IOException e) {
- throw new HudiConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
result.getMsg()));
- }
- }
-
- @Override
- public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
- return this.typeInfo;
- }
-
- @Override
- public SourceReader<SeaTunnelRow, HudiSourceSplit> createReader(
- SourceReader.Context readerContext) throws Exception {
- return new HudiSourceReader(this.confFiles, readerContext, typeInfo);
- }
-
- @Override
- public Boundedness getBoundedness() {
- // Only support Snapshot Query now.
- // After support Incremental Query and Read Optimized Query, we
should supoort UNBOUNDED.
- // TODO: support UNBOUNDED
- return Boundedness.BOUNDED;
- }
-
- @Override
- public SourceSplitEnumerator<HudiSourceSplit, HudiSourceState>
createEnumerator(
- SourceSplitEnumerator.Context<HudiSourceSplit> enumeratorContext)
throws Exception {
- return new HudiSourceSplitEnumerator(enumeratorContext, tablePath,
this.confFiles);
- }
-
- @Override
- public SourceSplitEnumerator<HudiSourceSplit, HudiSourceState>
restoreEnumerator(
- SourceSplitEnumerator.Context<HudiSourceSplit> enumeratorContext,
- HudiSourceState checkpointState)
- throws Exception {
- return new HudiSourceSplitEnumerator(
- enumeratorContext, tablePath, this.confFiles, checkpointState);
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceFactory.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceFactory.java
deleted file mode 100644
index 778efc62a3..0000000000
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hudi.source;
-
-import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(Factory.class)
-public class HudiSourceFactory implements TableSourceFactory {
-
- @Override
- public String factoryIdentifier() {
- return "Hudi";
- }
-
- @Override
- public OptionRule optionRule() {
- return OptionRule.builder()
- .required(
- HudiSourceConfig.TABLE_PATH,
- HudiSourceConfig.TABLE_TYPE,
- HudiSourceConfig.CONF_FILES)
- .optional(HudiSourceConfig.USE_KERBEROS)
- .conditional(
- HudiSourceConfig.USE_KERBEROS,
- true,
- HudiSourceConfig.KERBEROS_PRINCIPAL,
- HudiSourceConfig.KERBEROS_PRINCIPAL_FILE)
- .build();
- }
-
- @Override
- public Class<? extends SeaTunnelSource> getSourceClass() {
- return HudiSource.class;
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceReader.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceReader.java
deleted file mode 100644
index fb595ca495..0000000000
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceReader.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hudi.source;
-
-import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.source.SourceReader;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import
org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hudi.hadoop.HoodieParquetInputFormat;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Properties;
-import java.util.Set;
-
-public class HudiSourceReader implements SourceReader<SeaTunnelRow,
HudiSourceSplit> {
-
- private static final long THREAD_WAIT_TIME = 500L;
-
- private final String confPaths;
-
- private final Set<HudiSourceSplit> sourceSplits;
-
- private final SourceReader.Context context;
-
- private final SeaTunnelRowType seaTunnelRowType;
-
- public HudiSourceReader(
- String confPaths, SourceReader.Context context, SeaTunnelRowType
seaTunnelRowType) {
- this.confPaths = confPaths;
- this.context = context;
- this.sourceSplits = new HashSet<>();
- this.seaTunnelRowType = seaTunnelRowType;
- }
-
- @Override
- public void open() {}
-
- @Override
- public void close() {}
-
- @Override
- public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
- if (sourceSplits.isEmpty()) {
- Thread.sleep(THREAD_WAIT_TIME);
- return;
- }
- Configuration configuration =
HudiUtil.getConfiguration(this.confPaths);
- JobConf jobConf = HudiUtil.toJobConf(configuration);
- sourceSplits.forEach(
- source -> {
- try {
- HoodieParquetInputFormat inputFormat = new
HoodieParquetInputFormat();
- RecordReader<NullWritable, ArrayWritable> reader =
- inputFormat.getRecordReader(
- source.getInputSplit(), jobConf,
Reporter.NULL);
- ParquetHiveSerDe serde = new ParquetHiveSerDe();
- Properties properties = new Properties();
- List<String> types = new ArrayList<>();
- for (SeaTunnelDataType<?> type :
seaTunnelRowType.getFieldTypes()) {
- types.add(type.getSqlType().name());
- }
- String columns =
StringUtils.join(seaTunnelRowType.getFieldNames(), ",");
- String columnTypes = StringUtils.join(types,
",").toLowerCase(Locale.ROOT);
- properties.setProperty("columns", columns);
- properties.setProperty("columns.types", columnTypes);
- serde.initialize(jobConf, properties);
- StructObjectInspector inspector =
- (StructObjectInspector)
serde.getObjectInspector();
- List<? extends StructField> fields =
inspector.getAllStructFieldRefs();
- NullWritable key = reader.createKey();
- ArrayWritable value = reader.createValue();
- while (reader.next(key, value)) {
- Object[] datas = new Object[fields.size()];
- for (int i = 0; i < fields.size(); i++) {
- Object data =
inspector.getStructFieldData(value, fields.get(i));
- if (null != data) {
- datas[i] = String.valueOf(data);
- } else {
- datas[i] = null;
- }
- }
- output.collect(new SeaTunnelRow(datas));
- }
- reader.close();
- } catch (Exception e) {
- throw new HudiConnectorException(
-
CommonErrorCodeDeprecated.READER_OPERATION_FAILED, e);
- }
- });
- context.signalNoMoreElement();
- }
-
- @Override
- public List<HudiSourceSplit> snapshotState(long checkpointId) {
- return new ArrayList<>(sourceSplits);
- }
-
- @Override
- public void addSplits(List<HudiSourceSplit> splits) {
- sourceSplits.addAll(splits);
- }
-
- @Override
- public void handleNoMoreSplits() {}
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {}
-}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplitEnumerator.java
deleted file mode 100644
index 88874be28a..0000000000
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplitEnumerator.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hudi.source;
-
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hudi.hadoop.HoodieParquetInputFormat;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class HudiSourceSplitEnumerator
- implements SourceSplitEnumerator<HudiSourceSplit, HudiSourceState> {
-
- private final Context<HudiSourceSplit> context;
- private Set<HudiSourceSplit> pendingSplit;
- private Set<HudiSourceSplit> assignedSplit;
- private final String tablePath;
- private final String confPaths;
-
- public HudiSourceSplitEnumerator(
- SourceSplitEnumerator.Context<HudiSourceSplit> context,
- String tablePath,
- String confPaths) {
- this.context = context;
- this.tablePath = tablePath;
- this.confPaths = confPaths;
- }
-
- public HudiSourceSplitEnumerator(
- SourceSplitEnumerator.Context<HudiSourceSplit> context,
- String tablePath,
- String confPaths,
- HudiSourceState sourceState) {
- this(context, tablePath, confPaths);
- this.assignedSplit = sourceState.getAssignedSplit();
- }
-
- @Override
- public void open() {
- this.assignedSplit = new HashSet<>();
- this.pendingSplit = new HashSet<>();
- }
-
- @Override
- public void run() throws Exception {
- pendingSplit = getHudiSplit();
- assignSplit(context.registeredReaders());
- }
-
- private Set<HudiSourceSplit> getHudiSplit() throws IOException {
- Set<HudiSourceSplit> hudiSourceSplits = new HashSet<>();
- Path path = new Path(tablePath);
- Configuration configuration = HudiUtil.getConfiguration(confPaths);
- JobConf jobConf = HudiUtil.toJobConf(configuration);
- FileInputFormat.setInputPaths(jobConf, path);
- HoodieParquetInputFormat inputFormat = new HoodieParquetInputFormat();
- inputFormat.setConf(jobConf);
- for (InputSplit split : inputFormat.getSplits(jobConf, 0)) {
- hudiSourceSplits.add(new HudiSourceSplit(split.toString(), split));
- }
- return hudiSourceSplits;
- }
-
- @Override
- public void close() throws IOException {}
-
- @Override
- public void addSplitsBack(List<HudiSourceSplit> splits, int subtaskId) {
- if (!splits.isEmpty()) {
- pendingSplit.addAll(splits);
- assignSplit(Collections.singletonList(subtaskId));
- }
- }
-
- private void assignSplit(Collection<Integer> taskIdList) {
- Map<Integer, List<HudiSourceSplit>> readySplit = new
HashMap<>(Common.COLLECTION_SIZE);
- for (int taskId : taskIdList) {
- readySplit.computeIfAbsent(taskId, id -> new ArrayList<>());
- }
-
- pendingSplit.forEach(
- s -> readySplit.get(getSplitOwner(s.splitId(),
taskIdList.size())).add(s));
- readySplit.forEach(context::assignSplit);
- assignedSplit.addAll(pendingSplit);
- pendingSplit.clear();
- }
-
- private static int getSplitOwner(String tp, int numReaders) {
- return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
- }
-
- @Override
- public int currentUnassignedSplitSize() {
- return pendingSplit.size();
- }
-
- @Override
- public void registerReader(int subtaskId) {
- if (!pendingSplit.isEmpty()) {
- assignSplit(Collections.singletonList(subtaskId));
- }
- }
-
- @Override
- public HudiSourceState snapshotState(long checkpointId) {
- return new HudiSourceState(assignedSplit);
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {}
-
- @Override
- public void handleSplitRequest(int subtaskId) {}
-}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiFactoryTest.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiAggregatedCommitInfo.java
similarity index 69%
rename from
seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiFactoryTest.java
rename to
seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiAggregatedCommitInfo.java
index d9499aa861..76351baa71 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiAggregatedCommitInfo.java
@@ -15,17 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.hudi;
+package org.apache.seatunnel.connectors.seatunnel.hudi.state;
-import org.apache.seatunnel.connectors.seatunnel.hudi.source.HudiSourceFactory;
+import lombok.AllArgsConstructor;
+import lombok.Data;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import java.io.Serializable;
+import java.util.List;
-class HudiFactoryTest {
+@Data
+@AllArgsConstructor
+public class HudiAggregatedCommitInfo implements Serializable {
- @Test
- void optionRule() {
- Assertions.assertNotNull((new HudiSourceFactory()).optionRule());
- }
+ private final List<HudiCommitInfo> hudiCommitInfoList;
}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceState.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiCommitInfo.java
similarity index 67%
rename from
seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceState.java
rename to
seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiCommitInfo.java
index 2dbb0172a8..a3c679112b 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceState.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiCommitInfo.java
@@ -15,20 +15,20 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.hudi.source;
+package org.apache.seatunnel.connectors.seatunnel.hudi.state;
-import java.io.Serializable;
-import java.util.Set;
+import org.apache.hudi.client.WriteStatus;
-public class HudiSourceState implements Serializable {
+import lombok.AllArgsConstructor;
+import lombok.Data;
- private final Set<HudiSourceSplit> assignedSplit;
+import java.io.Serializable;
+import java.util.List;
- public HudiSourceState(Set<HudiSourceSplit> assignedSplit) {
- this.assignedSplit = assignedSplit;
- }
+@Data
+@AllArgsConstructor
+public class HudiCommitInfo implements Serializable {
- public Set<HudiSourceSplit> getAssignedSplit() {
- return assignedSplit;
- }
+ private final String instantTime;
+ private final List<WriteStatus> writeStatusList;
}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplit.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiSinkState.java
similarity index 55%
rename from
seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplit.java
rename to
seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiSinkState.java
index d3c8bbb4f6..d79fbaf0ed 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplit.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiSinkState.java
@@ -15,31 +15,18 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.hudi.source;
+package org.apache.seatunnel.connectors.seatunnel.hudi.state;
-import org.apache.seatunnel.api.source.SourceSplit;
+import lombok.AllArgsConstructor;
+import lombok.Data;
-import org.apache.hadoop.mapred.InputSplit;
+import java.io.Serializable;
-public class HudiSourceSplit implements SourceSplit {
+@Data
+@AllArgsConstructor
+public class HudiSinkState implements Serializable {
- private static final long serialVersionUID = -1L;
+ private long checkpointId;
- private final String splitId;
-
- private final InputSplit inputSplit;
-
- public HudiSourceSplit(String splitId, InputSplit inputSplit) {
- this.splitId = splitId;
- this.inputSplit = inputSplit;
- }
-
- @Override
- public String splitId() {
- return this.splitId;
- }
-
- public InputSplit getInputSplit() {
- return this.inputSplit;
- }
+ private HudiCommitInfo hudiCommitInfo;
}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiTest.java
b/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiTest.java
new file mode 100644
index 0000000000..1da165bdcb
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hudi;
+
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.AvroSchemaConverter.convertToSchema;
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.RowDataToAvroConverters.createConverter;
+
+public class HudiTest {
+
+ protected static @TempDir java.nio.file.Path tempDir;
+ private static final String tableName = "hudi";
+
+ protected static final String DEFAULT_PARTITION_PATH = "default";
+ public static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+ protected static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
+ protected static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
+
+ private static final String recordKeyFields = "int";
+
+ private static final String partitionFields = "date";
+
+ private static final SeaTunnelRowType seaTunnelRowType =
+ new SeaTunnelRowType(
+ new String[] {
+ "bool",
+ "int",
+ "longValue",
+ "float",
+ "name",
+ "date",
+ "time",
+ "timestamp3",
+ "map"
+ },
+ new SeaTunnelDataType[] {
+ BOOLEAN_TYPE,
+ INT_TYPE,
+ LONG_TYPE,
+ FLOAT_TYPE,
+ STRING_TYPE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_TIME_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ new MapType(STRING_TYPE, LONG_TYPE),
+ });
+
+ private String getSchema() {
+ return convertToSchema(seaTunnelRowType).toString();
+ }
+
+ @Test
+ void testSchema() {
+ Assertions.assertEquals(
+
"{\"type\":\"record\",\"name\":\"record\",\"namespace\":\"org.apache.seatunnel.avro.generated\",\"fields\":[{\"name\":\"bool\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"int\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"longValue\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"float\",\"type\":[\"null\",\"float\"],\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"date\",\"type\":
[...]
+ getSchema());
+ }
+
+ @Test
+ @DisabledOnOs(OS.WINDOWS)
+ void testWriteData() throws IOException {
+ String tablePath = tempDir.toString();
+ HoodieTableMetaClient.withPropertyBuilder()
+ .setTableType(HoodieTableType.COPY_ON_WRITE)
+ .setTableName(tableName)
+ .setPayloadClassName(HoodieAvroPayload.class.getName())
+ .initTable(new HadoopStorageConfiguration(new
Configuration()), tablePath);
+
+ HoodieWriteConfig cfg =
+ HoodieWriteConfig.newBuilder()
+ .withPath(tablePath)
+ .withSchema(getSchema())
+ .withParallelism(2, 2)
+ .withDeleteParallelism(2)
+ .forTable(tableName)
+ .withIndexConfig(
+ HoodieIndexConfig.newBuilder()
+
.withIndexType(HoodieIndex.IndexType.INMEMORY)
+ .build())
+ .withArchivalConfig(
+ HoodieArchivalConfig.newBuilder()
+ .archiveCommitsWith(11, 25)
+ .build())
+ .withAutoCommit(false)
+ .build();
+
+ try (HoodieJavaWriteClient<HoodieAvroPayload> javaWriteClient =
+ new HoodieJavaWriteClient<>(
+ new HoodieJavaEngineContext(
+ new HadoopStorageConfiguration(new
Configuration())),
+ cfg)) {
+ SeaTunnelRow expected = new SeaTunnelRow(12);
+ Timestamp timestamp3 = Timestamp.valueOf("1990-10-14
12:12:43.123");
+ expected.setField(0, true);
+ expected.setField(1, 45536);
+ expected.setField(2, 1238123899121L);
+ expected.setField(3, 33.333F);
+ expected.setField(4, "asdlkjasjkdla998y1122");
+ expected.setField(5, LocalDate.parse("1990-10-14"));
+ expected.setField(6, LocalTime.parse("12:12:43"));
+ expected.setField(7, timestamp3.toLocalDateTime());
+ Map<String, Long> map = new HashMap<>();
+ map.put("element", 123L);
+ expected.setField(9, map);
+ String instantTime = javaWriteClient.startCommit();
+ List<HoodieRecord<HoodieAvroPayload>> hoodieRecords = new
ArrayList<>();
+ hoodieRecords.add(convertRow(expected));
+ List<WriteStatus> insert = javaWriteClient.insert(hoodieRecords,
instantTime);
+
+ javaWriteClient.commit(instantTime, insert);
+ }
+ }
+
+ private HoodieRecord<HoodieAvroPayload> convertRow(SeaTunnelRow element) {
+ GenericRecord rec =
+ new GenericData.Record(
+ new
Schema.Parser().parse(convertToSchema(seaTunnelRowType).toString()));
+ for (int i = 0; i < seaTunnelRowType.getTotalFields(); i++) {
+ rec.put(
+ seaTunnelRowType.getFieldNames()[i],
+ createConverter(seaTunnelRowType.getFieldType(i))
+ .convert(
+
convertToSchema(seaTunnelRowType.getFieldType(i)),
+ element.getField(i)));
+ }
+
+ return new HoodieAvroRecord<>(
+ getHoodieKey(element, seaTunnelRowType), new
HoodieAvroPayload(Option.of(rec)));
+ }
+
+ private HoodieKey getHoodieKey(SeaTunnelRow element, SeaTunnelRowType
seaTunnelRowType) {
+ String partitionPath = getRecordPartitionPath(element,
seaTunnelRowType);
+ String rowKey = getRecordKey(element, seaTunnelRowType);
+ return new HoodieKey(rowKey, partitionPath);
+ }
+
+ private String getRecordKey(SeaTunnelRow element, SeaTunnelRowType
seaTunnelRowType) {
+ boolean keyIsNullEmpty = true;
+ StringBuilder recordKey = new StringBuilder();
+ for (String recordKeyField : recordKeyFields.split(",")) {
+ String recordKeyValue =
+ getNestedFieldValAsString(element, seaTunnelRowType,
recordKeyField);
+ recordKeyField = recordKeyField.toLowerCase();
+ if (recordKeyValue == null) {
+ recordKey
+ .append(recordKeyField)
+ .append(":")
+ .append(NULL_RECORDKEY_PLACEHOLDER)
+ .append(",");
+ } else if (recordKeyValue.isEmpty()) {
+ recordKey
+ .append(recordKeyField)
+ .append(":")
+ .append(EMPTY_RECORDKEY_PLACEHOLDER)
+ .append(",");
+ } else {
+
recordKey.append(recordKeyField).append(":").append(recordKeyValue).append(",");
+ keyIsNullEmpty = false;
+ }
+ }
+ recordKey.deleteCharAt(recordKey.length() - 1);
+ if (keyIsNullEmpty) {
+ throw new HoodieKeyException(
+ "recordKey values: \""
+ + recordKey
+ + "\" for fields: "
+ + recordKeyFields
+ + " cannot be entirely null or empty.");
+ }
+ return recordKey.toString();
+ }
+
+ private String getRecordPartitionPath(SeaTunnelRow element,
SeaTunnelRowType seaTunnelRowType) {
+
+ StringBuilder partitionPath = new StringBuilder();
+ String[] avroPartitionPathFields = partitionFields.split(",");
+ for (String partitionPathField : avroPartitionPathFields) {
+ String fieldVal =
+ getNestedFieldValAsString(element, seaTunnelRowType,
partitionPathField);
+ if (fieldVal == null || fieldVal.isEmpty()) {
+
partitionPath.append(partitionPathField).append("=").append(DEFAULT_PARTITION_PATH);
+ } else {
+
partitionPath.append(partitionPathField).append("=").append(fieldVal);
+ }
+ partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
+ }
+ partitionPath.deleteCharAt(partitionPath.length() - 1);
+ return partitionPath.toString();
+ }
+
+ private String getNestedFieldValAsString(
+ SeaTunnelRow element, SeaTunnelRowType seaTunnelRowType, String
fieldName) {
+ Object value = null;
+
+ if (Arrays.stream(seaTunnelRowType.getFieldNames())
+ .collect(Collectors.toList())
+ .contains(fieldName)) {
+ value = element.getField(seaTunnelRowType.indexOf(fieldName));
+ }
+ return StringUtils.objToString(value);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml
new file mode 100644
index 0000000000..bbe1e2187e
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-hudi-e2e</artifactId>
+ <name>SeaTunnel : E2E : Connector V2 : Hudi</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-hudi</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiIT.java
new file mode 100644
index 0000000000..b0cafa65bc
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiIT.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.hudi;
+
+import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.given;
+
+@DisabledOnContainer(
+ value = {TestContainerId.SPARK_2_4},
+ type = {},
+ disabledReason = "")
+@Slf4j
+public class HudiIT extends TestSuiteBase {
+
+ private static final String TABLE_PATH = "/tmp/hudi/";
+ private static final String NAMESPACE = "hudi";
+ private static final String NAMESPACE_TAR = "hudi.tar.gz";
+
+ protected final ContainerExtendedFactory containerExtendedFactory =
+ new ContainerExtendedFactory() {
+ @Override
+ public void extend(GenericContainer<?> container)
+ throws IOException, InterruptedException {
+ container.execInContainer(
+ "sh",
+ "-c",
+ "cd /tmp" + " && tar -czvf " + NAMESPACE_TAR + " "
+ NAMESPACE);
+ container.copyFileFromContainer(
+ "/tmp/" + NAMESPACE_TAR, "/tmp/" + NAMESPACE_TAR);
+
+ extractFiles();
+ }
+
+ private void extractFiles() {
+ ProcessBuilder processBuilder = new ProcessBuilder();
+ processBuilder.command(
+ "sh", "-c", "cd /tmp" + " && tar -zxvf " +
NAMESPACE_TAR);
+ try {
+ Process process = processBuilder.start();
+ // 等待命令执行完成
+ int exitCode = process.waitFor();
+ if (exitCode == 0) {
+ log.info("Extract files successful.");
+ } else {
+ log.error("Extract files failed with exit code " +
exitCode);
+ }
+ } catch (IOException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ @TestContainerExtension
+ protected final ContainerExtendedFactory extendedFactory =
+ container -> {
+ container.execInContainer("sh", "-c", "mkdir -p " +
TABLE_PATH);
+ container.execInContainer("sh", "-c", "chmod -R 777 " +
TABLE_PATH);
+ };
+
+ @TestTemplate
+ public void testWriteHudi(TestContainer container)
+ throws IOException, InterruptedException, URISyntaxException {
+ Container.ExecResult textWriteResult =
container.executeJob("/fake_to_hudi.conf");
+ Assertions.assertEquals(0, textWriteResult.getExitCode());
+ Configuration configuration = new Configuration();
+ Path inputPath = new Path(TABLE_PATH);
+
+ given().ignoreExceptions()
+ .await()
+ .atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ // copy hudi to local
+
container.executeExtraCommands(containerExtendedFactory);
+ ParquetReader<Group> reader =
+ ParquetReader.builder(new
GroupReadSupport(), inputPath)
+ .withConf(configuration)
+ .build();
+
+ long rowCount = 0;
+
+ // Read data and count rows
+ while (reader.read() != null) {
+ rowCount++;
+ }
+ Assertions.assertEquals(5, rowCount);
+ });
+ FileUtils.deleteFile(TABLE_PATH);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi.conf
new file mode 100644
index 0000000000..a02bb0fc72
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Hudi {
+ table_dfs_path = "/tmp/hudi"
+ table_name = "st_test"
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 9f452425af..47864f21c6 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -73,6 +73,7 @@
<module>connector-cdc-postgres-e2e</module>
<module>connector-cdc-oracle-e2e</module>
<module>connector-hive-e2e</module>
+ <module>connector-hudi-e2e</module>
</modules>
<dependencies>