This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 934434cc75 [Feature][Connector-V2] Support write cdc changelog event 
into hudi sink (#7845)
934434cc75 is described below

commit 934434cc75afc747339182df6468f89a5d0fb1b2
Author: happyboy1024 <[email protected]>
AuthorDate: Fri Oct 18 11:54:23 2024 +0800

    [Feature][Connector-V2] Support write cdc changelog event into hudi sink 
(#7845)
    
    Co-authored-by: happyboy1024 <[email protected]>
---
 docs/en/connector-v2/sink/Hudi.md                  |   8 +-
 docs/zh/connector-v2/sink/Hudi.md                  |   8 +-
 seatunnel-connectors-v2/connector-hudi/pom.xml     |  23 ++
 .../seatunnel/hudi/catalog/HudiCatalog.java        |  11 +-
 .../seatunnel/hudi/config/HudiOptions.java         |   6 -
 .../seatunnel/hudi/config/HudiSinkConfig.java      |   4 -
 .../seatunnel/hudi/config/HudiTableConfig.java     |   5 +
 .../seatunnel/hudi/config/HudiTableOptions.java    |   9 +-
 .../connectors/seatunnel/hudi/sink/HudiSink.java   |  21 +-
 .../seatunnel/hudi/sink/HudiSinkFactory.java       |   8 +-
 .../sink/commiter/HudiSinkAggregatedCommitter.java | 102 -----
 .../hudi/sink/convert/AvroSchemaConverter.java     |  23 +-
 .../hudi/sink/convert/HudiRecordConverter.java     |   8 +-
 .../hudi/sink/convert/RowDataToAvroConverters.java |   8 +-
 .../hudi/sink/state/HudiAggregatedCommitInfo.java  |  11 +-
 .../seatunnel/hudi/sink/state/HudiCommitInfo.java  |  18 +-
 .../hudi/sink/writer/HudiRecordWriter.java         | 178 ++++----
 .../seatunnel/hudi/sink/writer/HudiSinkWriter.java |  32 +-
 .../connectors/seatunnel/hudi/util/HudiUtil.java   |   9 +-
 .../connectors/seatunnel/hudi/HudiTest.java        |  30 +-
 .../seatunnel/hudi/catalog/HudiCatalogTest.java    |   1 +
 .../connector-hudi-e2e/pom.xml                     |  32 ++
 .../seatunnel/e2e/connector/hudi/HudiIT.java       |   4 +-
 .../e2e/connector/hudi/HudiMultiTableIT.java       |   3 +-
 .../hudi/HudiSeatunnelS3MultiTableIT.java          |   4 +-
 .../e2e/connector/hudi/HudiSinkCDCIT.java          | 453 +++++++++++++++++++++
 .../connector/hudi/HudiSparkS3MultiTableIT.java    |   4 +-
 .../src/test/resources/ddl/mysql_cdc.sql           |  75 ++++
 .../src/test/resources/{ => hudi}/core-site.xml    |   0
 .../test/resources/{ => hudi}/fake_to_hudi.conf    |   0
 .../fake_to_hudi_with_omit_config_item.conf        |   0
 .../resources/{ => hudi}/multi_fake_to_hudi.conf   |   0
 .../mysql_cdc_to_hudi.conf}                        |  44 +-
 .../test/resources/{ => hudi}/s3_fake_to_hudi.conf |   0
 .../src/test/resources/mysql/server-gtids/my.cnf   |  65 +++
 .../src/test/resources/mysql/setup.sql             |  27 ++
 36 files changed, 893 insertions(+), 341 deletions(-)

diff --git a/docs/en/connector-v2/sink/Hudi.md 
b/docs/en/connector-v2/sink/Hudi.md
index 6c424fde15..ea4c066d2f 100644
--- a/docs/en/connector-v2/sink/Hudi.md
+++ b/docs/en/connector-v2/sink/Hudi.md
@@ -8,7 +8,7 @@ Used to write data to Hudi.
 
 ## Key features
 
-- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
 - [x] [cdc](../../concept/connector-v2-features.md)
 - [x] [support multiple table write](../../concept/connector-v2-features.md)
 
@@ -21,7 +21,6 @@ Base configuration:
 | table_dfs_path             | string  | yes      | -                          
 |
 | conf_files_path            | string  | no       | -                          
 |
 | table_list                 | Array   | no       | -                          
 |
-| auto_commit                | boolean | no       | true                       
 |
 | schema_save_mode           | enum    | no       | 
CREATE_SCHEMA_WHEN_NOT_EXIST|
 | common-options             | Config  | no       | -                          
 |
 
@@ -44,6 +43,7 @@ Table list configuration:
 | index_type                 | enum   | no       | BLOOM         |
 | index_class_name           | string | no       | -             |
 | record_byte_size           | Int    | no       | 1024          |
+| cdc_enabled                | boolean| no       | false         |
 
 Note: When this configuration corresponds to a single table, you can flatten 
the configuration items in table_list to the outer layer.
 
@@ -115,9 +115,9 @@ Note: When this configuration corresponds to a single 
table, you can flatten the
 
 `max_commits_to_keep` The max commits to keep of hudi table.
 
-### auto_commit [boolean]
+### cdc_enabled [boolean]
 
-`auto_commit` Automatic transaction commit is enabled by default.
+`cdc_enabled` Whether to persist the CDC change log. When enable, persist the 
change data if necessary, and the table can be queried as a CDC query mode.
 
 ### schema_save_mode [Enum]
 
diff --git a/docs/zh/connector-v2/sink/Hudi.md 
b/docs/zh/connector-v2/sink/Hudi.md
index 2fbf027135..7d8007f6b0 100644
--- a/docs/zh/connector-v2/sink/Hudi.md
+++ b/docs/zh/connector-v2/sink/Hudi.md
@@ -8,7 +8,7 @@
 
 ## 主要特点
 
-- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
 - [x] [cdc](../../concept/connector-v2-features.md)
 - [x] [support multiple table write](../../concept/connector-v2-features.md)
 
@@ -21,7 +21,6 @@
 | table_dfs_path             | string | 是      | -                            |
 | conf_files_path            | string | 否      | -                            |
 | table_list                 | string | 否      | -                            |
-| auto_commit                | boolean| 否      | true                         |
 | schema_save_mode           | enum   | 否      | CREATE_SCHEMA_WHEN_NOT_EXIST |
 | common-options             | config | 否      | -                            |
 
@@ -44,6 +43,7 @@
 | index_type                 | enum   | no       | BLOOM         |
 | index_class_name           | string | no       | -             |
 | record_byte_size           | Int    | no       | 1024          |
+| cdc_enabled                | boolean| no       | false         |
 
 注意: 当此配置对应于单个表时,您可以将table_list中的配置项展平到外层。
 
@@ -115,9 +115,9 @@
 
 `max_commits_to_keep` Hudi 表保留的最多提交数。
 
-### auto_commit [boolean]
+### cdc_enabled [boolean]
 
-`auto_commit` 是否自动提交.
+`cdc_enabled` 是否持久化Hudi表的CDC变更日志。启用后,在必要时持久化更改数据,表可以作为CDC模式进行查询.
 
 ### schema_save_mode [Enum]
 
diff --git a/seatunnel-connectors-v2/connector-hudi/pom.xml 
b/seatunnel-connectors-v2/connector-hudi/pom.xml
index 35fc0b0459..1a11d34f47 100644
--- a/seatunnel-connectors-v2/connector-hudi/pom.xml
+++ b/seatunnel-connectors-v2/connector-hudi/pom.xml
@@ -102,4 +102,27 @@
 
     </dependencies>
 
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.apache.avro</pattern>
+                                    
<shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.avro</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java
index e0a25bfd85..0d238c193d 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -53,6 +54,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.CDC_ENABLED;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.RECORD_KEY_FIELDS;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.TABLE_TYPE;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.convert.AvroSchemaConverter.convertToSchema;
@@ -195,6 +197,7 @@ public class HudiCatalog implements Catalog {
                     String.join(",", tableConfig.getRecordKeyFields().get()));
         }
         options.put(TABLE_TYPE.key(), tableType.name());
+        options.put(CDC_ENABLED.key(), 
String.valueOf(tableConfig.isCDCEnabled()));
         return CatalogTable.of(
                 TableIdentifier.of(
                         catalogName, tablePath.getDatabaseName(), 
tablePath.getTableName()),
@@ -218,10 +221,16 @@ public class HudiCatalog implements Catalog {
                         .setTableType(table.getOptions().get(TABLE_TYPE.key()))
                         
.setRecordKeyFields(table.getOptions().get(RECORD_KEY_FIELDS.key()))
                         .setTableCreateSchema(
-                                
convertToSchema(table.getSeaTunnelRowType()).toString())
+                                convertToSchema(
+                                                table.getSeaTunnelRowType(),
+                                                
AvroSchemaUtils.getAvroRecordQualifiedName(
+                                                        
table.getTableId().getTableName()))
+                                        .toString())
                         .setTableName(tablePath.getTableName())
                         .setPartitionFields(String.join(",", 
table.getPartitionKeys()))
                         .setPayloadClassName(HoodieAvroPayload.class.getName())
+                        .setCDCEnabled(
+                                
Boolean.parseBoolean(table.getOptions().get(CDC_ENABLED.key())))
                         .initTable(new HadoopStorageConfiguration(hadoopConf), 
tablePathStr);
             }
         } catch (IOException e) {
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiOptions.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiOptions.java
index 38450e2dfd..745e78eaf9 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiOptions.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiOptions.java
@@ -44,12 +44,6 @@ public interface HudiOptions {
                     .noDefaultValue()
                     .withDescription("table_list");
 
-    Option<Boolean> AUTO_COMMIT =
-            Options.key("auto_commit")
-                    .booleanType()
-                    .defaultValue(true)
-                    .withDescription("auto commit");
-
     Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
             Options.key("schema_save_mode")
                     .enumType(SchemaSaveMode.class)
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkConfig.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkConfig.java
index 06650e87c0..bcb4efe77b 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkConfig.java
@@ -40,15 +40,12 @@ public class HudiSinkConfig implements Serializable {
 
     private String confFilesPath;
 
-    private boolean autoCommit;
-
     private SchemaSaveMode schemaSaveMode;
 
     private DataSaveMode dataSaveMode;
 
     public static HudiSinkConfig of(ReadonlyConfig config) {
         Builder builder = HudiSinkConfig.builder();
-        Optional<Boolean> optionalAutoCommit = 
config.getOptional(HudiOptions.AUTO_COMMIT);
         Optional<SchemaSaveMode> optionalSchemaSaveMode =
                 config.getOptional(HudiOptions.SCHEMA_SAVE_MODE);
         Optional<DataSaveMode> optionalDataSaveMode =
@@ -58,7 +55,6 @@ public class HudiSinkConfig implements Serializable {
         builder.confFilesPath(config.get(HudiOptions.CONF_FILES_PATH));
         builder.tableList(HudiTableConfig.of(config));
 
-        
builder.autoCommit(optionalAutoCommit.orElseGet(HudiOptions.AUTO_COMMIT::defaultValue));
         builder.schemaSaveMode(
                 
optionalSchemaSaveMode.orElseGet(HudiOptions.SCHEMA_SAVE_MODE::defaultValue));
         builder.dataSaveMode(
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableConfig.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableConfig.java
index ba0ae33efd..1ae612c9cb 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableConfig.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableConfig.java
@@ -40,6 +40,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.BATCH_INTERVAL_MS;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.BATCH_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.CDC_ENABLED;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.DATABASE;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.INDEX_CLASS_NAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.INDEX_TYPE;
@@ -108,6 +109,9 @@ public class HudiTableConfig implements Serializable {
     @JsonProperty("max_commits_to_keep")
     private int maxCommitsToKeep;
 
+    @JsonProperty("cdc_enabled")
+    private boolean cdcEnabled;
+
     public static List<HudiTableConfig> of(ReadonlyConfig connectorConfig) {
         List<HudiTableConfig> tableList;
         if (connectorConfig.getOptional(HudiOptions.TABLE_LIST).isPresent()) {
@@ -132,6 +136,7 @@ public class HudiTableConfig implements Serializable {
                                     
connectorConfig.get(UPSERT_SHUFFLE_PARALLELISM))
                             
.minCommitsToKeep(connectorConfig.get(MIN_COMMITS_TO_KEEP))
                             
.maxCommitsToKeep(connectorConfig.get(MAX_COMMITS_TO_KEEP))
+                            .cdcEnabled(connectorConfig.get(CDC_ENABLED))
                             .build();
             tableList = Collections.singletonList(hudiTableConfig);
         }
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableOptions.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableOptions.java
index e48ef7be56..2a2c7e01b3 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableOptions.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableOptions.java
@@ -46,6 +46,13 @@ public interface HudiTableOptions {
                     .defaultValue(HoodieTableType.COPY_ON_WRITE)
                     .withDescription("hudi table type");
 
+    Option<Boolean> CDC_ENABLED =
+            Options.key("cdc_enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "When enable, persist the change data if 
necessary, and can be queried as a CDC query mode.");
+
     Option<String> RECORD_KEY_FIELDS =
             Options.key("record_key_fields")
                     .stringType()
@@ -76,7 +83,7 @@ public interface HudiTableOptions {
             Options.key("record_byte_size")
                     .intType()
                     .defaultValue(1024)
-                    .withDescription("auto commit");
+                    .withDescription("The byte size of each record");
 
     Option<WriteOperationType> OP_TYPE =
             Options.key("op_type")
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
index 13c245336a..11a402ab10 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
@@ -24,7 +24,6 @@ import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
@@ -38,14 +37,12 @@ import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiConnectorException;
-import 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.commiter.HudiSinkAggregatedCommitter;
 import 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.state.HudiAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.state.HudiCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.hudi.sink.state.HudiSinkState;
 import 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.HudiSinkWriter;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 
@@ -82,15 +79,13 @@ public class HudiSink
 
     @Override
     public HudiSinkWriter createWriter(SinkWriter.Context context) throws 
IOException {
-        return new HudiSinkWriter(
-                context, seaTunnelRowType, hudiSinkConfig, hudiTableConfig, 
new ArrayList<>());
+        return new HudiSinkWriter(context, seaTunnelRowType, hudiSinkConfig, 
hudiTableConfig);
     }
 
     @Override
     public SinkWriter<SeaTunnelRow, HudiCommitInfo, HudiSinkState> 
restoreWriter(
             SinkWriter.Context context, List<HudiSinkState> states) throws 
IOException {
-        return new HudiSinkWriter(
-                context, seaTunnelRowType, hudiSinkConfig, hudiTableConfig, 
states);
+        return SeaTunnelSink.super.restoreWriter(context, states);
     }
 
     @Override
@@ -103,18 +98,6 @@ public class HudiSink
         return Optional.of(new DefaultSerializer<>());
     }
 
-    @Override
-    public Optional<SinkAggregatedCommitter<HudiCommitInfo, 
HudiAggregatedCommitInfo>>
-            createAggregatedCommitter() throws IOException {
-        return Optional.of(
-                new HudiSinkAggregatedCommitter(hudiTableConfig, 
hudiSinkConfig, seaTunnelRowType));
-    }
-
-    @Override
-    public Optional<Serializer<HudiAggregatedCommitInfo>> 
getAggregatedCommitInfoSerializer() {
-        return Optional.of(new DefaultSerializer<>());
-    }
-
     @Override
     public Optional<SaveModeHandler> getSaveModeHandler() {
         TablePath tablePath =
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
index ed21b15166..7e6d9826d9 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
@@ -37,12 +37,12 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.AUTO_COMMIT;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.CONF_FILES_PATH;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.TABLE_DFS_PATH;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.TABLE_LIST;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.BATCH_INTERVAL_MS;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.BATCH_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.CDC_ENABLED;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.INDEX_CLASS_NAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.INDEX_TYPE;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.INSERT_SHUFFLE_PARALLELISM;
@@ -85,7 +85,7 @@ public class HudiSinkFactory implements TableSinkFactory {
                         UPSERT_SHUFFLE_PARALLELISM,
                         MIN_COMMITS_TO_KEEP,
                         MAX_COMMITS_TO_KEEP,
-                        AUTO_COMMIT,
+                        CDC_ENABLED,
                         SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .build();
     }
@@ -121,6 +121,10 @@ public class HudiSinkFactory implements TableSinkFactory {
         }
         // table type
         catalogTable.getOptions().put(TABLE_TYPE.key(), 
hudiTableConfig.getTableType().name());
+        // cdc enabled
+        catalogTable
+                .getOptions()
+                .put(CDC_ENABLED.key(), 
String.valueOf(hudiTableConfig.isCdcEnabled()));
         catalogTable =
                 CatalogTable.of(
                         newTableId,
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/commiter/HudiSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/commiter/HudiSinkAggregatedCommitter.java
deleted file mode 100644
index beba719c76..0000000000
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/commiter/HudiSinkAggregatedCommitter.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hudi.sink.commiter;
-
-import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkConfig;
-import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.client.HudiWriteClientProvider;
-import 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.state.HudiAggregatedCommitInfo;
-import 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.state.HudiCommitInfo;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Stack;
-import java.util.stream.Collectors;
-
-@Slf4j
-public class HudiSinkAggregatedCommitter
-        implements SinkAggregatedCommitter<HudiCommitInfo, 
HudiAggregatedCommitInfo> {
-
-    private final HudiTableConfig tableConfig;
-
-    private final HudiWriteClientProvider writeClientProvider;
-
-    public HudiSinkAggregatedCommitter(
-            HudiTableConfig tableConfig,
-            HudiSinkConfig sinkConfig,
-            SeaTunnelRowType seaTunnelRowType) {
-        this.tableConfig = tableConfig;
-        this.writeClientProvider =
-                new HudiWriteClientProvider(
-                        sinkConfig, tableConfig.getTableName(), 
seaTunnelRowType);
-    }
-
-    @Override
-    public List<HudiAggregatedCommitInfo> commit(
-            List<HudiAggregatedCommitInfo> aggregatedCommitInfo) throws 
IOException {
-        aggregatedCommitInfo =
-                aggregatedCommitInfo.stream()
-                        .filter(
-                                commit ->
-                                        commit.getHudiCommitInfoList().stream()
-                                                .anyMatch(
-                                                        aggregateCommit ->
-                                                                
!aggregateCommit
-                                                                               
 .getWriteStatusList()
-                                                                               
 .isEmpty()
-                                                                        && 
!writeClientProvider
-                                                                               
 .getOrCreateClient()
-                                                                               
 .commit(
-                                                                               
         aggregateCommit
-                                                                               
                 .getWriteInstantTime(),
-                                                                               
         aggregateCommit
-                                                                               
                 .getWriteStatusList())))
-                        .collect(Collectors.toList());
-        log.debug(
-                "hudi records have been committed, error commit info are {}", 
aggregatedCommitInfo);
-        return aggregatedCommitInfo;
-    }
-
-    @Override
-    public HudiAggregatedCommitInfo combine(List<HudiCommitInfo> commitInfos) {
-        return new HudiAggregatedCommitInfo(commitInfos);
-    }
-
-    @Override
-    public void abort(List<HudiAggregatedCommitInfo> aggregatedCommitInfo) 
throws Exception {
-        writeClientProvider.getOrCreateClient().rollbackFailedWrites();
-        // rollback force commit
-        for (HudiAggregatedCommitInfo hudiAggregatedCommitInfo : 
aggregatedCommitInfo) {
-            for (HudiCommitInfo commitInfo : 
hudiAggregatedCommitInfo.getHudiCommitInfoList()) {
-                Stack<String> forceCommitTime = 
commitInfo.getForceCommitTime();
-                while (!forceCommitTime.isEmpty()) {
-                    
writeClientProvider.getOrCreateClient().rollback(forceCommitTime.pop());
-                }
-            }
-        }
-    }
-
-    @Override
-    public void close() {
-        writeClientProvider.close();
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/AvroSchemaConverter.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/AvroSchemaConverter.java
index addbf8491f..acb1021275 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/AvroSchemaConverter.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/AvroSchemaConverter.java
@@ -50,13 +50,13 @@ public class AvroSchemaConverter implements Serializable {
      * @return Avro's {@link Schema} matching this logical type.
      */
     public static Schema convertToSchema(SeaTunnelDataType<?> schema) {
-        return convertToSchema(schema, 
"org.apache.seatunnel.avro.generated.record");
+        return convertToSchema(schema, "record");
     }
 
     /**
      * Converts Seatunnel {@link SeaTunnelDataType} (can be nested) into an 
Avro schema.
      *
-     * <p>The "{rowName}_" is used as the nested row type name prefix in order 
to generate the right
+     * <p>The "{rowName}." is used as the nested row type name prefix in order 
to generate the right
      * schema. Nested record type that only differs with type name is still 
compatible.
      *
      * @param dataType logical type
@@ -105,10 +105,15 @@ public class AvroSchemaConverter implements Serializable {
                 return nullableSchema(time);
             case DECIMAL:
                 DecimalType decimalType = (DecimalType) dataType;
-                // store BigDecimal as byte[]
+                // store BigDecimal as Fixed
+                // for spark compatibility.
                 Schema decimal =
                         LogicalTypes.decimal(decimalType.getPrecision(), 
decimalType.getScale())
-                                
.addToSchema(SchemaBuilder.builder().bytesType());
+                                .addToSchema(
+                                        
SchemaBuilder.fixed(String.format("%s.fixed", rowName))
+                                                .size(
+                                                        
computeMinBytesForDecimalPrecision(
+                                                                
decimalType.getPrecision())));
                 return nullableSchema(decimal);
             case ROW:
                 SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
@@ -121,7 +126,7 @@ public class AvroSchemaConverter implements Serializable {
                     SeaTunnelDataType<?> fieldType = rowType.getFieldType(i);
                     SchemaBuilder.GenericDefault<Schema> fieldBuilder =
                             builder.name(fieldName)
-                                    .type(convertToSchema(fieldType, rowName + 
"_" + fieldName));
+                                    .type(convertToSchema(fieldType, rowName + 
"." + fieldName));
 
                     builder = fieldBuilder.withDefault(null);
                 }
@@ -166,4 +171,12 @@ public class AvroSchemaConverter implements Serializable {
     private static Schema nullableSchema(Schema schema) {
         return Schema.createUnion(SchemaBuilder.builder().nullType(), schema);
     }
+
+    private static int computeMinBytesForDecimalPrecision(int precision) {
+        int numBytes = 1;
+        while (Math.pow(2.0, 8 * numBytes - 1) < Math.pow(10.0, precision)) {
+            numBytes += 1;
+        }
+        return numBytes;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/HudiRecordConverter.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/HudiRecordConverter.java
index d6ed7e81eb..e8d23ab4ef 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/HudiRecordConverter.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/HudiRecordConverter.java
@@ -24,6 +24,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableConfig;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
@@ -62,7 +63,12 @@ public class HudiRecordConverter implements Serializable {
                     seaTunnelRowType.getFieldNames()[i],
                     createConverter(seaTunnelRowType.getFieldType(i))
                             .convert(
-                                    
convertToSchema(seaTunnelRowType.getFieldType(i)),
+                                    convertToSchema(
+                                            seaTunnelRowType.getFieldType(i),
+                                            
AvroSchemaUtils.getAvroRecordQualifiedName(
+                                                            
hudiTableConfig.getTableName())
+                                                    + "."
+                                                    + 
seaTunnelRowType.getFieldNames()[i]),
                                     element.getField(i)));
         }
         return new HoodieAvroRecord<>(
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/RowDataToAvroConverters.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/RowDataToAvroConverters.java
index a48179fdb7..5c06362669 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/RowDataToAvroConverters.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/RowDataToAvroConverters.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
+import org.apache.avro.Conversions;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -45,6 +46,8 @@ import static 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.convert.AvroSc
 /** Tool class used to convert from {@link SeaTunnelRow} to Avro {@link 
GenericRecord}. */
 public class RowDataToAvroConverters implements Serializable {
 
+    private static final Conversions.DecimalConversion DECIMAL_CONVERSION =
+            new Conversions.DecimalConversion();
     // 
--------------------------------------------------------------------------------
     // Runtime Converters
     // 
--------------------------------------------------------------------------------
@@ -166,8 +169,9 @@ public class RowDataToAvroConverters implements 
Serializable {
 
                             @Override
                             public Object convert(Schema schema, Object 
object) {
-                                return ByteBuffer.wrap(
-                                        ((BigDecimal) 
object).unscaledValue().toByteArray());
+                                BigDecimal javaDecimal = (BigDecimal) object;
+                                return DECIMAL_CONVERSION.toFixed(
+                                        javaDecimal, schema, 
schema.getLogicalType());
                             }
                         };
                 break;
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/state/HudiAggregatedCommitInfo.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/state/HudiAggregatedCommitInfo.java
index 348a040be6..065fed72ad 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/state/HudiAggregatedCommitInfo.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/state/HudiAggregatedCommitInfo.java
@@ -17,15 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hudi.sink.state;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
 import java.io.Serializable;
-import java.util.List;
-
-@Data
-@AllArgsConstructor
-public class HudiAggregatedCommitInfo implements Serializable {
 
-    private final List<HudiCommitInfo> hudiCommitInfoList;
-}
+public class HudiAggregatedCommitInfo implements Serializable {}
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/state/HudiCommitInfo.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/state/HudiCommitInfo.java
index 0357931bb0..808cc4d942 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/state/HudiCommitInfo.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/state/HudiCommitInfo.java
@@ -17,22 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hudi.sink.state;
 
-import org.apache.hudi.client.WriteStatus;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
 import java.io.Serializable;
-import java.util.List;
-import java.util.Stack;
-
-@Data
-@AllArgsConstructor
-public class HudiCommitInfo implements Serializable {
-
-    private final String writeInstantTime;
-
-    private final List<WriteStatus> writeStatusList;
 
-    private final Stack<String> forceCommitTime;
-}
+public class HudiCommitInfo implements Serializable {}
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiRecordWriter.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiRecordWriter.java
index 7eb3ab546b..b98e222870 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiRecordWriter.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiRecordWriter.java
@@ -17,23 +17,23 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer;
 
+import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.client.WriteClientProvider;
 import 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.convert.HudiRecordConverter;
-import 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.state.HudiCommitInfo;
 
 import org.apache.avro.Schema;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.client.HoodieJavaWriteClient;
-import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.StringUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,10 +42,10 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Stack;
+import java.util.Map;
+import java.util.Set;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.convert.AvroSchemaConverter.convertToSchema;
 
@@ -64,60 +64,44 @@ public class HudiRecordWriter implements Serializable {
 
     private final SeaTunnelRowType seaTunnelRowType;
 
-    private final boolean autoCommit;
-
     private Schema schema;
 
     private transient int batchCount = 0;
 
     private final List<HoodieRecord<HoodieAvroPayload>> writeRecords;
 
-    private Stack<String> forceCommitTime;
-
-    private String writeInstantTime;
+    private final List<HoodieKey> deleteRecordKeys;
 
-    private List<WriteStatus> writeStatusList;
+    private final LinkedHashMap<HoodieKey, Pair<Boolean, 
HoodieRecord<HoodieAvroPayload>>> buffer =
+            new LinkedHashMap<>();
 
     private transient volatile boolean closed = false;
 
     private transient volatile Exception flushException;
 
     public HudiRecordWriter(
-            HudiSinkConfig hudiSinkConfig,
             HudiTableConfig hudiTableConfig,
             WriteClientProvider clientProvider,
             SeaTunnelRowType seaTunnelRowType) {
         this.hudiTableConfig = hudiTableConfig;
-        this.autoCommit = hudiSinkConfig.isAutoCommit();
         this.clientProvider = clientProvider;
         this.seaTunnelRowType = seaTunnelRowType;
         this.writeRecords = new ArrayList<>();
-        this.writeStatusList = new ArrayList<>();
-        this.forceCommitTime = new Stack<>();
+        this.deleteRecordKeys = new ArrayList<>();
         this.recordConverter = new HudiRecordConverter();
     }
 
-    public HudiRecordWriter(
-            HudiSinkConfig sinkConfig,
-            HudiTableConfig tableConfig,
-            WriteClientProvider writeClientProvider,
-            SeaTunnelRowType seaTunnelRowType,
-            HudiCommitInfo hudiCommitInfo) {
-        this(sinkConfig, tableConfig, writeClientProvider, seaTunnelRowType);
-        this.writeInstantTime = hudiCommitInfo.getWriteInstantTime();
-        this.writeStatusList = hudiCommitInfo.getWriteStatusList();
-    }
-
     public void open() {
-        this.schema = new 
Schema.Parser().parse(convertToSchema(seaTunnelRowType).toString());
+        this.schema =
+                new Schema.Parser()
+                        .parse(
+                                convertToSchema(
+                                                seaTunnelRowType,
+                                                
AvroSchemaUtils.getAvroRecordQualifiedName(
+                                                        
hudiTableConfig.getTableName()))
+                                        .toString());
         try {
-            HoodieJavaWriteClient<HoodieAvroPayload> writeClient =
-                    clientProvider.getOrCreateClient();
-            if (StringUtils.nonEmpty(writeInstantTime) && 
Objects.nonNull(writeStatusList)) {
-                if (!writeClient.commit(writeInstantTime, writeStatusList)) {
-                    LOG.warn("Failed to commit history data.");
-                }
-            }
+            clientProvider.getOrCreateClient();
         } catch (Exception e) {
             throw new HudiConnectorException(
                     CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED,
@@ -133,7 +117,7 @@ public class HudiRecordWriter implements Serializable {
             batchCount++;
             if (hudiTableConfig.getBatchSize() > 0
                     && batchCount >= hudiTableConfig.getBatchSize()) {
-                flush(true);
+                flush();
             }
         } catch (Exception e) {
             throw new HudiConnectorException(
@@ -143,92 +127,89 @@ public class HudiRecordWriter implements Serializable {
         }
     }
 
-    public synchronized void flush(boolean isNeedForceCommit) {
+    public synchronized void flush() {
         if (batchCount == 0) {
             log.debug("No data needs to be refreshed, waiting for incoming 
data.");
             return;
         }
         checkFlushException();
-        HoodieJavaWriteClient<HoodieAvroPayload> writeClient = 
clientProvider.getOrCreateClient();
-        if (autoCommit || writeInstantTime == null) {
-            writeInstantTime = writeClient.startCommit();
+        Boolean preChangeFlag = null;
+        Set<Map.Entry<HoodieKey, Pair<Boolean, 
HoodieRecord<HoodieAvroPayload>>>> entries =
+                buffer.entrySet();
+        for (Map.Entry<HoodieKey, Pair<Boolean, 
HoodieRecord<HoodieAvroPayload>>> entry : entries) {
+            boolean currentChangeFlag = entry.getValue().getKey();
+            if (currentChangeFlag) {
+                if (preChangeFlag != null && !preChangeFlag) {
+                    executeDelete();
+                }
+                writeRecords.add(entry.getValue().getValue());
+            } else {
+                if (preChangeFlag != null && preChangeFlag) {
+                    executeWrite();
+                }
+                deleteRecordKeys.add(entry.getKey());
+            }
+            preChangeFlag = currentChangeFlag;
         }
-        List<WriteStatus> currentWriteStatusList;
+
+        if (preChangeFlag != null) {
+            if (preChangeFlag) {
+                executeWrite();
+            } else {
+                executeDelete();
+            }
+        }
+        batchCount = 0;
+        buffer.clear();
+    }
+
+    private void executeWrite() {
+        HoodieJavaWriteClient<HoodieAvroPayload> writeClient = 
clientProvider.getOrCreateClient();
+        String writeInstantTime = writeClient.startCommit();
         // write records
         switch (hudiTableConfig.getOpType()) {
             case INSERT:
-                currentWriteStatusList = writeClient.insert(writeRecords, 
writeInstantTime);
+                writeClient.insert(writeRecords, writeInstantTime);
                 break;
             case UPSERT:
-                currentWriteStatusList = writeClient.upsert(writeRecords, 
writeInstantTime);
+                writeClient.upsert(writeRecords, writeInstantTime);
                 break;
             case BULK_INSERT:
-                currentWriteStatusList = writeClient.bulkInsert(writeRecords, 
writeInstantTime);
+                writeClient.bulkInsert(writeRecords, writeInstantTime);
                 break;
             default:
                 throw new HudiConnectorException(
                         HudiErrorCode.UNSUPPORTED_OPERATION,
                         "Unsupported operation type: " + 
hudiTableConfig.getOpType());
         }
-        if (!autoCommit) {
-            this.writeStatusList.addAll(currentWriteStatusList);
-        }
-        /**
-         * when the batch size of temporary records is reached, commit is 
forced here, even if
-         * configured not to be auto commit. because a timeline supports only 
one commit.
-         */
-        forceCommit(isNeedForceCommit, autoCommit);
         writeRecords.clear();
-        batchCount = 0;
-    }
-
-    public Optional<HudiCommitInfo> prepareCommit() {
-        flush(false);
-        if (!autoCommit) {
-            return Optional.of(
-                    new HudiCommitInfo(writeInstantTime, writeStatusList, 
forceCommitTime));
-        }
-        return Optional.empty();
-    }
-
-    private void commit() {
-        if (StringUtils.nonEmpty(writeInstantTime) && 
!writeStatusList.isEmpty()) {
-            log.debug(
-                    "Commit hudi records, the instant time is {} and write 
status are {}",
-                    writeInstantTime,
-                    writeStatusList);
-            clientProvider.getOrCreateClient().commit(writeInstantTime, 
writeStatusList);
-            resetUpsertCommitInfo();
-        }
-    }
-
-    private void forceCommit(boolean isNeedForceCommit, boolean isAutoCommit) {
-        if (isNeedForceCommit && !isAutoCommit) {
-            clientProvider.getOrCreateClient().commit(writeInstantTime, 
writeStatusList);
-            forceCommitTime.add(writeInstantTime);
-            resetUpsertCommitInfo();
-        }
     }
 
-    public HudiCommitInfo snapshotState() {
-        HudiCommitInfo hudiCommitInfo =
-                new HudiCommitInfo(writeInstantTime, writeStatusList, 
forceCommitTime);
-        // reset commit info in here, because the commit info will be 
committed in committer.
-        resetUpsertCommitInfo();
-        // reset the force commit stack.
-        forceCommitTime = new Stack<>();
-        return hudiCommitInfo;
-    }
-
-    protected void resetUpsertCommitInfo() {
-        writeInstantTime = null;
-        writeStatusList = new ArrayList<>();
+    private void executeDelete() {
+        HoodieJavaWriteClient<HoodieAvroPayload> writeClient = 
clientProvider.getOrCreateClient();
+        writeClient.delete(deleteRecordKeys, writeClient.startCommit());
+        deleteRecordKeys.clear();
     }
 
     protected void prepareRecords(SeaTunnelRow element) {
         HoodieRecord<HoodieAvroPayload> hoodieAvroPayloadHoodieRecord =
                 recordConverter.convertRow(schema, seaTunnelRowType, element, 
hudiTableConfig);
-        writeRecords.add(hoodieAvroPayloadHoodieRecord);
+        HoodieKey recordKey = hoodieAvroPayloadHoodieRecord.getKey();
+        boolean changeFlag = changeFlag(element.getRowKind());
+        buffer.put(recordKey, Pair.of(changeFlag, 
hoodieAvroPayloadHoodieRecord));
+    }
+
+    private boolean changeFlag(RowKind rowKind) {
+        switch (rowKind) {
+            case DELETE:
+            case UPDATE_BEFORE:
+                return false;
+            case INSERT:
+            case UPDATE_AFTER:
+                return true;
+            default:
+                throw new UnsupportedOperationException("Unknown row kind: " + 
rowKind);
+        }
     }
 
     protected void checkFlushException() {
@@ -245,10 +226,7 @@ public class HudiRecordWriter implements Serializable {
         if (!closed) {
             closed = true;
             try {
-                flush(false);
-                if (!autoCommit) {
-                    commit();
-                }
+                flush();
             } catch (Exception e) {
                 LOG.warn("Flush records to Hudi failed.", e);
                 flushException =
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiSinkWriter.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiSinkWriter.java
index 317215861a..130a79adab 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiSinkWriter.java
@@ -35,8 +35,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.state.HudiSinkState;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
 import java.util.Optional;
 
 @Slf4j
@@ -60,27 +58,15 @@ public class HudiSinkWriter
             Context context,
             SeaTunnelRowType seaTunnelRowType,
             HudiSinkConfig sinkConfig,
-            HudiTableConfig tableConfig,
-            List<HudiSinkState> hudiSinkState) {
+            HudiTableConfig tableConfig) {
         this.sinkConfig = sinkConfig;
         this.tableConfig = tableConfig;
         this.seaTunnelRowType = seaTunnelRowType;
         this.writeClientProvider =
                 new HudiWriteClientProvider(
                         sinkConfig, tableConfig.getTableName(), 
seaTunnelRowType);
-        if (!hudiSinkState.isEmpty()) {
-            this.hudiRecordWriter =
-                    new HudiRecordWriter(
-                            sinkConfig,
-                            tableConfig,
-                            writeClientProvider,
-                            seaTunnelRowType,
-                            hudiSinkState.get(0).getHudiCommitInfo());
-        } else {
-            this.hudiRecordWriter =
-                    new HudiRecordWriter(
-                            sinkConfig, tableConfig, writeClientProvider, 
seaTunnelRowType);
-        }
+        this.hudiRecordWriter =
+                new HudiRecordWriter(tableConfig, writeClientProvider, 
seaTunnelRowType);
     }
 
     @Override
@@ -89,16 +75,11 @@ public class HudiSinkWriter
         hudiRecordWriter.writeRecord(element);
     }
 
-    @Override
-    public List<HudiSinkState> snapshotState(long checkpointId) throws 
IOException {
-        return Collections.singletonList(
-                new HudiSinkState(checkpointId, 
hudiRecordWriter.snapshotState()));
-    }
-
     @Override
     public Optional<HudiCommitInfo> prepareCommit() throws IOException {
         tryOpen();
-        return hudiRecordWriter.prepareCommit();
+        hudiRecordWriter.flush();
+        return Optional.empty();
     }
 
     @Override
@@ -128,8 +109,7 @@ public class HudiSinkWriter
                         queueIndex,
                         tableConfig.getTableName());
         this.hudiRecordWriter =
-                new HudiRecordWriter(
-                        sinkConfig, tableConfig, writeClientProvider, 
seaTunnelRowType);
+                new HudiRecordWriter(tableConfig, writeClientProvider, 
seaTunnelRowType);
     }
 
     private void tryOpen() {
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/util/HudiUtil.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/util/HudiUtil.java
index fe6cbe3e20..ef49c28a21 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/util/HudiUtil.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/util/HudiUtil.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.client.HoodieJavaWriteClient;
 import org.apache.hudi.client.common.HoodieJavaEngineContext;
 import org.apache.hudi.common.config.HoodieStorageConfig;
@@ -173,7 +174,12 @@ public class HudiUtil {
                                         hudiSinkConfig.getTableDfsPath(),
                                         hudiTable.getDatabase(),
                                         hudiTable.getTableName()))
-                        
.withSchema(convertToSchema(seaTunnelRowType).toString())
+                        .withSchema(
+                                convertToSchema(
+                                                seaTunnelRowType,
+                                                
AvroSchemaUtils.getAvroRecordQualifiedName(
+                                                        tableName))
+                                        .toString())
                         .withParallelism(
                                 hudiTable.getInsertShuffleParallelism(),
                                 hudiTable.getUpsertShuffleParallelism())
@@ -184,7 +190,6 @@ public class HudiUtil {
                                                 
hudiTable.getMinCommitsToKeep(),
                                                 
hudiTable.getMaxCommitsToKeep())
                                         .build())
-                        .withAutoCommit(hudiSinkConfig.isAutoCommit())
                         .withCleanConfig(
                                 HoodieCleanConfig.newBuilder()
                                         .withAutoClean(true)
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiTest.java
 
b/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiTest.java
index 82e85fcf4e..7dbfc402b6 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiTest.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hudi;
 
+import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -27,6 +28,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.client.HoodieJavaWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieJavaEngineContext;
@@ -52,6 +54,7 @@ import org.junit.jupiter.api.condition.OS;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalTime;
@@ -95,7 +98,8 @@ public class HudiTest {
                         "date",
                         "time",
                         "timestamp3",
-                        "map"
+                        "map",
+                        "decimal"
                     },
                     new SeaTunnelDataType[] {
                         BOOLEAN_TYPE,
@@ -107,16 +111,19 @@ public class HudiTest {
                         LocalTimeType.LOCAL_TIME_TYPE,
                         LocalTimeType.LOCAL_DATE_TIME_TYPE,
                         new MapType(STRING_TYPE, LONG_TYPE),
+                        new DecimalType(10, 5),
                     });
 
     private String getSchema() {
-        return convertToSchema(seaTunnelRowType).toString();
+        return convertToSchema(
+                        seaTunnelRowType, 
AvroSchemaUtils.getAvroRecordQualifiedName(tableName))
+                .toString();
     }
 
     @Test
     void testSchema() {
         Assertions.assertEquals(
-                
"{\"type\":\"record\",\"name\":\"record\",\"namespace\":\"org.apache.seatunnel.avro.generated\",\"fields\":[{\"name\":\"bool\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"int\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"longValue\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"float\",\"type\":[\"null\",\"float\"],\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"date\",\"type\":
 [...]
+                
"{\"type\":\"record\",\"name\":\"hudi_record\",\"namespace\":\"hoodie.hudi\",\"fields\":[{\"name\":\"bool\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"int\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"longValue\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"float\",\"type\":[\"null\",\"float\"],\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"date\",\"type\":[\"null\",{\"type\"
 [...]
                 getSchema());
     }
 
@@ -165,7 +172,8 @@ public class HudiTest {
             expected.setField(7, timestamp3.toLocalDateTime());
             Map<String, Long> map = new HashMap<>();
             map.put("element", 123L);
-            expected.setField(9, map);
+            expected.setField(8, map);
+            expected.setField(9, BigDecimal.valueOf(10.121));
             String instantTime = javaWriteClient.startCommit();
             List<HoodieRecord<HoodieAvroPayload>> hoodieRecords = new 
ArrayList<>();
             hoodieRecords.add(convertRow(expected));
@@ -178,13 +186,23 @@ public class HudiTest {
     private HoodieRecord<HoodieAvroPayload> convertRow(SeaTunnelRow element) {
         GenericRecord rec =
                 new GenericData.Record(
-                        new 
Schema.Parser().parse(convertToSchema(seaTunnelRowType).toString()));
+                        new Schema.Parser()
+                                .parse(
+                                        convertToSchema(
+                                                        seaTunnelRowType,
+                                                        
AvroSchemaUtils.getAvroRecordQualifiedName(
+                                                                tableName))
+                                                .toString()));
         for (int i = 0; i < seaTunnelRowType.getTotalFields(); i++) {
             rec.put(
                     seaTunnelRowType.getFieldNames()[i],
                     createConverter(seaTunnelRowType.getFieldType(i))
                             .convert(
-                                    
convertToSchema(seaTunnelRowType.getFieldType(i)),
+                                    convertToSchema(
+                                            seaTunnelRowType.getFieldType(i),
+                                            
AvroSchemaUtils.getAvroRecordQualifiedName(tableName)
+                                                    + "."
+                                                    + 
seaTunnelRowType.getFieldNames()[i]),
                                     element.getField(i)));
         }
 
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalogTest.java
 
b/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalogTest.java
index 7be81e89ba..d3524c85c4 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalogTest.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalogTest.java
@@ -168,6 +168,7 @@ class HudiCatalogTest {
         TableSchema schema = builder.build();
         HashMap<String, String> options = new HashMap<>();
         options.put("record_key_fields", "id,boolean_col");
+        options.put("cdc_enabled", "false");
         options.put("table_type", "MERGE_ON_READ");
         return CatalogTable.of(
                 tableIdentifier, schema, options, 
Collections.singletonList("dt_col"), "null");
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml
index 583b16a162..7fe8cc8523 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml
@@ -30,6 +30,18 @@
         <minio.version>8.5.6</minio.version>
     </properties>
 
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.seatunnel</groupId>
+                <artifactId>connector-jdbc</artifactId>
+                <version>${project.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
     <dependencies>
         <!-- minio containers -->
         <dependency>
@@ -60,5 +72,25 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-cdc-mysql</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>mysql</artifactId>
+            <version>${testcontainer.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiIT.java
index 28f2eb3f53..642b94471d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiIT.java
@@ -108,7 +108,7 @@ public class HudiIT extends TestSuiteBase {
             disabledReason = "FLINK do not support local file catalog in 
hudi.")
     public void testWriteHudi(TestContainer container)
             throws IOException, InterruptedException, URISyntaxException {
-        Container.ExecResult textWriteResult = 
container.executeJob("/fake_to_hudi.conf");
+        Container.ExecResult textWriteResult = 
container.executeJob("/hudi/fake_to_hudi.conf");
         Assertions.assertEquals(0, textWriteResult.getExitCode());
         Configuration configuration = new Configuration();
         configuration.set("fs.defaultFS", LocalFileSystem.DEFAULT_FS);
@@ -146,7 +146,7 @@ public class HudiIT extends TestSuiteBase {
     public void testWriteHudiWithOmitConfigItem(TestContainer container)
             throws IOException, InterruptedException, URISyntaxException {
         Container.ExecResult textWriteResult =
-                
container.executeJob("/fake_to_hudi_with_omit_config_item.conf");
+                
container.executeJob("/hudi/fake_to_hudi_with_omit_config_item.conf");
         Assertions.assertEquals(0, textWriteResult.getExitCode());
         Configuration configuration = new Configuration();
         configuration.set("fs.defaultFS", LocalFileSystem.DEFAULT_FS);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiMultiTableIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiMultiTableIT.java
index 0a9c4555ad..c240b85da7 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiMultiTableIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiMultiTableIT.java
@@ -105,7 +105,8 @@ public class HudiMultiTableIT extends TestSuiteBase {
             type = {EngineType.FLINK},
             disabledReason = "FLINK do not support local file catalog in 
hudi.")
     public void testMultiWrite(TestContainer container) throws IOException, 
InterruptedException {
-        Container.ExecResult textWriteResult = 
container.executeJob("/multi_fake_to_hudi.conf");
+        Container.ExecResult textWriteResult =
+                container.executeJob("/hudi/multi_fake_to_hudi.conf");
         Assertions.assertEquals(0, textWriteResult.getExitCode());
         Configuration configuration = new Configuration();
         configuration.set("fs.defaultFS", LocalFileSystem.DEFAULT_FS);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSeatunnelS3MultiTableIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSeatunnelS3MultiTableIT.java
index 67f3e9e884..237fd100d2 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSeatunnelS3MultiTableIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSeatunnelS3MultiTableIT.java
@@ -134,8 +134,8 @@ public class HudiSeatunnelS3MultiTableIT extends 
SeaTunnelContainer {
 
     @Test
     public void testS3MultiWrite() throws IOException, InterruptedException {
-        copyFileToContainer("/core-site.xml", 
"/tmp/seatunnel/config/core-site.xml");
-        Container.ExecResult textWriteResult = 
executeSeaTunnelJob("/s3_fake_to_hudi.conf");
+        copyFileToContainer("/hudi/core-site.xml", 
"/tmp/seatunnel/config/core-site.xml");
+        Container.ExecResult textWriteResult = 
executeSeaTunnelJob("/hudi/s3_fake_to_hudi.conf");
         Assertions.assertEquals(0, textWriteResult.getExitCode());
         Configuration configuration = new Configuration();
         configuration.set("fs.defaultFS", LocalFileSystem.DEFAULT_FS);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSinkCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSinkCDCIT.java
new file mode 100644
index 0000000000..1610618fef
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSinkCDCIT.java
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.hudi;
+
+import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static java.lang.Thread.sleep;
+import static org.awaitility.Awaitility.given;
+
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.FLINK, EngineType.SPARK},
+        disabledReason =
+                "FLINK do not support local file catalog in hudi and Currently 
SPARK do not support cdc")
+@Slf4j
+public class HudiSinkCDCIT extends TestSuiteBase implements TestResource {
+
+    // mysql
+    private static final String MYSQL_HOST = "mysql_cdc_e2e";
+    private static final String MYSQL_USER_NAME = "st_user";
+    private static final String MYSQL_USER_PASSWORD = "seatunnel";
+    private static final String MYSQL_DATABASE = "mysql_cdc";
+    private static final MySqlContainer MYSQL_CONTAINER = 
createMySqlContainer(MySqlVersion.V8_0);
+    private static final String SOURCE_TABLE = "mysql_cdc_e2e_source_table";
+
+    private static final String MYSQL_DRIVER =
+            
"https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";;
+
+    private static final String DATABASE = "st";
+    private static final String TABLE_NAME = "st_test";
+    private static final String TABLE_PATH = "/tmp/hudi/";
+    private static final String NAMESPACE = "hudi";
+    private static final String NAMESPACE_TAR = "hudi.tar.gz";
+
+    private final UniqueDatabase inventoryDatabase =
+            new UniqueDatabase(
+                    MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw", 
MYSQL_DATABASE);
+
+    private final Map<Integer, Record> records = new HashMap<>();
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        return new MySqlContainer(version)
+                .withConfigurationOverride("mysql/server-gtids/my.cnf")
+                .withSetupSQL("mysql/setup.sql")
+                .withNetwork(NETWORK)
+                .withNetworkAliases(MYSQL_HOST)
+                .withDatabaseName(MYSQL_DATABASE)
+                .withUsername(MYSQL_USER_NAME)
+                .withPassword(MYSQL_USER_PASSWORD)
+                .withLogConsumer(
+                        new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-mysql-image")));
+    }
+
+    protected final ContainerExtendedFactory containerExtendedFactory =
+            new ContainerExtendedFactory() {
+                @Override
+                public void extend(GenericContainer<?> container)
+                        throws IOException, InterruptedException {
+                    container.execInContainer(
+                            "sh",
+                            "-c",
+                            "cd /tmp" + " && tar -czvf " + NAMESPACE_TAR + " " 
+ NAMESPACE);
+                    container.copyFileFromContainer(
+                            "/tmp/" + NAMESPACE_TAR, "/tmp/" + NAMESPACE_TAR);
+
+                    extractFiles();
+                }
+
+                private void extractFiles() {
+                    ProcessBuilder processBuilder = new ProcessBuilder();
+                    processBuilder.command(
+                            "sh", "-c", "cd /tmp" + " && tar -zxvf " + 
NAMESPACE_TAR);
+                    try {
+                        Process process = processBuilder.start();
+                        int exitCode = process.waitFor();
+                        if (exitCode == 0) {
+                            log.info("Extract files successful.");
+                        } else {
+                            log.error("Extract files failed with exit code " + 
exitCode);
+                        }
+                    } catch (IOException | InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+
+    @TestContainerExtension
+    protected final ContainerExtendedFactory extendedFactory =
+            container -> {
+                container.execInContainer("sh", "-c", "mkdir -p " + 
TABLE_PATH);
+                container.execInContainer("sh", "-c", "chmod -R 777  " + 
TABLE_PATH);
+                Container.ExecResult extraCommands =
+                        container.execInContainer(
+                                "sh",
+                                "-c",
+                                "mkdir -p /tmp/seatunnel/plugins/MySQL-CDC/lib 
&& cd /tmp/seatunnel/plugins/MySQL-CDC/lib && wget "
+                                        + MYSQL_DRIVER);
+                Assertions.assertEquals(0, extraCommands.getExitCode(), 
extraCommands.getStderr());
+            };
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        log.info("The second stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        log.info("Mysql Containers are started");
+        inventoryDatabase.createAndInitialize();
+        log.info("Mysql ddl execution is complete");
+    }
+
+    private void insertRecord(Record record) {
+        Integer id = record.getId();
+        records.put(id, record);
+    }
+
+    private void deleteRecord(int id) {
+        records.remove(id);
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        // close Container
+        if (MYSQL_CONTAINER != null) {
+            MYSQL_CONTAINER.close();
+        }
+    }
+
+    @TestTemplate
+    public void testMysqlCdc2Hudi(TestContainer container)
+            throws IOException, InterruptedException {
+        // Clear related content to ensure that multiple operations are not 
affected
+        clearTable(MYSQL_DATABASE, SOURCE_TABLE);
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        container.executeJob("/hudi/mysql_cdc_to_hudi.conf");
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                    return null;
+                });
+        // insert data and check
+        insertAndCheckData(container);
+        // upsert/delete data and check
+        upsertAndCheckData(container);
+    }
+
+    private void insertAndCheckData(TestContainer container) throws 
InterruptedException {
+        // Init table data
+        initSourceTableData(MYSQL_DATABASE, SOURCE_TABLE);
+        // Waiting 30s for source capture data
+        sleep(30000);
+        Configuration configuration = new Configuration();
+        configuration.set("fs.defaultFS", LocalFileSystem.DEFAULT_FS);
+
+        given().ignoreExceptions()
+                .await()
+                .atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            // copy hudi to local
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            Path newestCommitFilePath =
+                                    getNewestCommitFilePath(
+                                            new File(
+                                                    TABLE_PATH
+                                                            + File.separator
+                                                            + DATABASE
+                                                            + File.separator
+                                                            + TABLE_NAME));
+                            ParquetReader<Group> reader =
+                                    ParquetReader.builder(
+                                                    new GroupReadSupport(), 
newestCommitFilePath)
+                                            .withConf(configuration)
+                                            .build();
+
+                            // Read data and count rows
+                            long rowCount = 0;
+                            Group read = reader.read();
+                            while (read != null) {
+                                checkData(read);
+                                read = reader.read();
+                                rowCount++;
+                            }
+                            Assertions.assertEquals(3, rowCount);
+                        });
+        FileUtils.deleteFile(TABLE_PATH);
+    }
+
+    private void upsertAndCheckData(TestContainer container)
+            throws InterruptedException, IOException {
+        upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE);
+        // Waiting 30s for source capture data
+        sleep(30000);
+        Configuration configuration = new Configuration();
+        configuration.set("fs.defaultFS", LocalFileSystem.DEFAULT_FS);
+
+        given().ignoreExceptions()
+                .await()
+                .atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            // copy hudi to local
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            Path newestCommitFilePath =
+                                    getNewestCommitFilePath(
+                                            new File(
+                                                    TABLE_PATH
+                                                            + File.separator
+                                                            + DATABASE
+                                                            + File.separator
+                                                            + TABLE_NAME));
+                            ParquetReader<Group> reader =
+                                    ParquetReader.builder(
+                                                    new GroupReadSupport(), 
newestCommitFilePath)
+                                            .withConf(configuration)
+                                            .build();
+                            // Read data and count rows
+                            long rowCount = 0;
+                            Group read = reader.read();
+                            while (read != null) {
+                                checkData(read);
+                                read = reader.read();
+                                rowCount++;
+                            }
+                            Assertions.assertEquals(4, rowCount);
+                        });
+        FileUtils.deleteFile(TABLE_PATH);
+    }
+
+    public static Path getNewestCommitFilePath(File tablePathDir) throws 
IOException {
+        File[] files = FileUtil.listFiles(tablePathDir);
+        Long newestCommitTime =
+                Arrays.stream(files)
+                        .filter(file -> file.getName().endsWith(".parquet"))
+                        .map(
+                                file ->
+                                        Long.parseLong(
+                                                file.getName()
+                                                        .substring(
+                                                                
file.getName().lastIndexOf("_") + 1,
+                                                                file.getName()
+                                                                        
.lastIndexOf(".parquet"))))
+                        .max(Long::compareTo)
+                        .orElseThrow(
+                                () ->
+                                        new IllegalArgumentException(
+                                                "Not found parquet file in " + 
tablePathDir));
+        for (File file : files) {
+            if (file.getName().endsWith(newestCommitTime + ".parquet")) {
+                return new Path(file.toURI());
+            }
+        }
+        throw new IllegalArgumentException("Not found parquet file in " + 
tablePathDir);
+    }
+
+    private void checkData(Group readRecord) {
+        Integer id = readRecord.getInteger("id", 0);
+        Record record = records.get(id);
+        Assertions.assertNotNull(record);
+        String f_json = readRecord.getString("f_json", 0);
+        Long f_bigint = readRecord.getLong("f_bigint", 0);
+        Assertions.assertEquals(
+                JsonUtils.parseObject(record.getJson()), 
(JsonUtils.parseObject(f_json)));
+        Assertions.assertEquals(record.getBigInt(), f_bigint);
+    }
+
+    private void clearTable(String database, String tableName) {
+        executeSql("truncate table " + database + "." + tableName);
+    }
+
+    // Execute SQL
+    private void executeSql(String sql) {
+        try (Connection connection = getJdbcConnection()) {
+            connection.createStatement().execute(sql);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Connection getJdbcConnection() throws SQLException {
+        return DriverManager.getConnection(
+                MYSQL_CONTAINER.getJdbcUrl(),
+                MYSQL_CONTAINER.getUsername(),
+                MYSQL_CONTAINER.getPassword());
+    }
+
+    private void initSourceTableData(String database, String tableName) {
+        executeSql(
+                "INSERT INTO "
+                        + database
+                        + "."
+                        + tableName
+                        + " ( id, f_binary, f_blob, f_long_varbinary, 
f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
+                        + "                                         
f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, 
f_integer,\n"
+                        + "                                         
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, 
f_double,\n"
+                        + "                                         
f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, 
f_date, f_datetime,\n"
+                        + "                                         
f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, 
f_real, f_time,\n"
+                        + "                                         f_tinyint, 
f_tinyint_unsigned, f_json, f_year )\n"
+                        + "VALUES ( 1, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+                        + "         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
+                        + "         0x74696E79626C6F62, 
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 
1234567, 7654321,\n"
+                        + "         123456789, 987654321, 123, 789, 12.34, 
56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
+                        + "         'This is a text field', 'This is a tiny 
text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',\n"
+                        + "         '2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',\n"
+                        + "         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field',\n"
+                        + "         12.345, '14:30:00', -128, 255, '{ \"key\": 
\"value\" }', 2022 ),\n"
+                        + "       ( 2, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+                        + "         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, 
0x74696E79626C6F62,\n"
+                        + "         0x48656C6C6F20776F726C64, 12345, 54321, 
123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321,\n"
+                        + "         123, 789, 12.34, 56.78, 90.12, 'This is a 
long text field', 'This is a medium text field', 'This is a text field',\n"
+                        + "         'This is a tiny text field', 'This is a 
varchar field', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40',\n"
+                        + "         1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',\n"
+                        + "         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field',\n"
+                        + "         112.345, '14:30:00', -128, 22, '{ \"key\": 
\"value\" }', 2013 ),\n"
+                        + "       ( 3, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+                        + "         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, 
0x74696E79626C6F62,\n"
+                        + "         0x48656C6C6F20776F726C64, 12345, 54321, 
123456, 654321, 1234567, 7654321, 1234567, 7654321, 123456789, 987654321, 
123,\n"
+                        + "         789, 12.34, 56.78, 90.12, 'This is a long 
text field', 'This is a medium text field', 'This is a text field',\n"
+                        + "         'This is a tiny text field', 'This is a 
varchar field', '2022-04-27', '2022-04-27 14:30:00', '2023-04-27 11:08:40',\n"
+                        + "         1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',\n"
+                        + "         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field', 112.345,\n"
+                        + "         '14:30:00', -128, 22, '{ \"key\": 
\"value\" }', 2021 )");
+        insertRecord(new Record(1, 123456789L, "{ \"key\": \"value\" }"));
+        insertRecord(new Record(2, 123456789L, "{ \"key\": \"value\" }"));
+        insertRecord(new Record(3, 123456789L, "{ \"key\": \"value\" }"));
+    }
+
+    private void upsertDeleteSourceTable(String database, String tableName) {
+        executeSql(
+                "INSERT INTO "
+                        + database
+                        + "."
+                        + tableName
+                        + " ( id, f_binary, f_blob, f_long_varbinary, 
f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
+                        + "                                         
f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, 
f_integer,\n"
+                        + "                                         
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, 
f_double,\n"
+                        + "                                         
f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, 
f_date, f_datetime,\n"
+                        + "                                         
f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, 
f_real, f_time,\n"
+                        + "                                         f_tinyint, 
f_tinyint_unsigned, f_json, f_year )\n"
+                        + "VALUES ( 4, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+                        + "         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
+                        + "         0x74696E79626C6F62, 
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 
1234567, 7654321,\n"
+                        + "         1234567890, 987654321, 123, 789, 12.34, 
56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
+                        + "         'This is a text field', 'This is a tiny 
text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',\n"
+                        + "         '2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',\n"
+                        + "         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field',\n"
+                        + "         12.345, '14:30:00', -128, 255, '{ \"key\": 
\"value\" }', 1992 )");
+        insertRecord(new Record(4, 1234567890L, "{ \"key\": \"value\" }"));
+
+        executeSql(
+                "INSERT INTO "
+                        + database
+                        + "."
+                        + tableName
+                        + " ( id, f_binary, f_blob, f_long_varbinary, 
f_longblob, f_tinyblob, f_varbinary, f_smallint,\n"
+                        + "                                         
f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, 
f_integer,\n"
+                        + "                                         
f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, 
f_double,\n"
+                        + "                                         
f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, 
f_date, f_datetime,\n"
+                        + "                                         
f_timestamp, f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, 
f_real, f_time,\n"
+                        + "                                         f_tinyint, 
f_tinyint_unsigned, f_json, f_year )\n"
+                        + "VALUES ( 5, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,\n"
+                        + "         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,\n"
+                        + "         0x74696E79626C6F62, 
0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 7654321, 
1234567, 7654321,\n"
+                        + "         123456789, 987654321, 123, 789, 12.34, 
56.78, 90.12, 'This is a long text field', 'This is a medium text field',\n"
+                        + "         'This is a text field', 'This is a tiny 
text field', 'This is a varchar field', '2022-04-27', '2022-04-27 14:30:00',\n"
+                        + "         '2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',\n"
+                        + "         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field',\n"
+                        + "         12.345, '14:30:00', -128, 255, '{ \"key\": 
\"value\" }', 1999 )");
+        insertRecord(new Record(5, 123456789L, "{ \"key\": \"value\" }"));
+
+        executeSql("DELETE FROM " + database + "." + tableName + " where id = 
2");
+        deleteRecord(2);
+
+        executeSql(
+                "UPDATE "
+                        + database
+                        + "."
+                        + tableName
+                        + " SET f_bigint = 10000, f_json = '{ \"key\": 
\"value1\" }' where id = 3");
+        insertRecord(new Record(3, 10000L, "{ \"key\": \"value1\" }"));
+    }
+
+    @Data
+    @AllArgsConstructor
+    static class Record {
+        private Integer id;
+        private Long bigInt;
+        private String json;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSparkS3MultiTableIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSparkS3MultiTableIT.java
index db43348aef..f91f340f3c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSparkS3MultiTableIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiSparkS3MultiTableIT.java
@@ -115,8 +115,8 @@ public class HudiSparkS3MultiTableIT extends TestSuiteBase 
implements TestResour
             disabledReason =
                     "The hadoop version in current flink image is not 
compatible with the aws version and default container of seatunnel not support 
s3.")
     public void testS3MultiWrite(TestContainer container) throws IOException, 
InterruptedException {
-        container.copyFileToContainer("/core-site.xml", 
"/tmp/seatunnel/config/core-site.xml");
-        Container.ExecResult textWriteResult = 
container.executeJob("/s3_fake_to_hudi.conf");
+        container.copyFileToContainer("/hudi/core-site.xml", 
"/tmp/seatunnel/config/core-site.xml");
+        Container.ExecResult textWriteResult = 
container.executeJob("/hudi/s3_fake_to_hudi.conf");
         Assertions.assertEquals(0, textWriteResult.getExitCode());
         Configuration configuration = new Configuration();
         configuration.set("fs.defaultFS", LocalFileSystem.DEFAULT_FS);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/ddl/mysql_cdc.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/ddl/mysql_cdc.sql
new file mode 100644
index 0000000000..5f42375263
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/ddl/mysql_cdc.sql
@@ -0,0 +1,75 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- 
----------------------------------------------------------------------------------------------------------------
+-- DATABASE:  inventory
+-- 
----------------------------------------------------------------------------------------------------------------
+CREATE DATABASE IF NOT EXISTS `mysql_cdc`;
+
+use mysql_cdc;
+-- Create a mysql data source table
+CREATE TABLE mysql_cdc_e2e_source_table
+(
+    `id`                   int       NOT NULL AUTO_INCREMENT,
+    `f_binary`             binary(64)                     DEFAULT NULL,
+    `f_blob`               blob,
+    `f_long_varbinary`     mediumblob,
+    `f_longblob`           longblob,
+    `f_tinyblob`           tinyblob,
+    `f_varbinary`          varbinary(100)                 DEFAULT NULL,
+    `f_smallint`           smallint                       DEFAULT NULL,
+    `f_smallint_unsigned`  smallint unsigned              DEFAULT NULL,
+    `f_mediumint`          mediumint                      DEFAULT NULL,
+    `f_mediumint_unsigned` mediumint unsigned             DEFAULT NULL,
+    `f_int`                int                            DEFAULT NULL,
+    `f_int_unsigned`       int unsigned                   DEFAULT NULL,
+    `f_integer`            int                            DEFAULT NULL,
+    `f_integer_unsigned`   int unsigned                   DEFAULT NULL,
+    `f_bigint`             bigint                         DEFAULT NULL,
+    `f_bigint_unsigned`    bigint unsigned                DEFAULT NULL,
+    `f_numeric`            decimal(10, 0)                 DEFAULT NULL,
+    `f_decimal`            decimal(10, 0)                 DEFAULT NULL,
+    `f_float`              float                          DEFAULT NULL,
+    `f_double`             double                         DEFAULT NULL,
+    `f_double_precision`   double                         DEFAULT NULL,
+    `f_longtext`           longtext,
+    `f_mediumtext`         mediumtext,
+    `f_text`               text,
+    `f_tinytext`           tinytext,
+    `f_varchar`            varchar(100)                   DEFAULT NULL,
+    `f_date`               date                           DEFAULT NULL,
+    `f_datetime`           datetime                       DEFAULT NULL,
+    `f_timestamp`          timestamp NULL                 DEFAULT NULL,
+    `f_bit1`               bit(1)                         DEFAULT NULL,
+    `f_bit64`              bit(64)                        DEFAULT NULL,
+    `f_char`               char(1)                        DEFAULT NULL,
+    `f_enum`               enum ('enum1','enum2','enum3') DEFAULT NULL,
+    `f_mediumblob`         mediumblob,
+    `f_long_varchar`       mediumtext,
+    `f_real`               double                         DEFAULT NULL,
+    `f_time`               time                           DEFAULT NULL,
+    `f_tinyint`            tinyint                        DEFAULT NULL,
+    `f_tinyint_unsigned`   tinyint unsigned               DEFAULT NULL,
+    `f_json`               json                           DEFAULT NULL,
+    `f_year`               year                           DEFAULT NULL,
+    PRIMARY KEY (`id`)
+) ENGINE = InnoDB
+  AUTO_INCREMENT = 2
+  DEFAULT CHARSET = utf8mb4
+  COLLATE = utf8mb4_0900_ai_ci;
+
+truncate table mysql_cdc_e2e_source_table;
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/core-site.xml
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/core-site.xml
similarity index 100%
rename from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/core-site.xml
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/core-site.xml
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/fake_to_hudi.conf
similarity index 100%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/fake_to_hudi.conf
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi_with_omit_config_item.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/fake_to_hudi_with_omit_config_item.conf
similarity index 100%
rename from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi_with_omit_config_item.conf
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/fake_to_hudi_with_omit_config_item.conf
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/multi_fake_to_hudi.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/multi_fake_to_hudi.conf
similarity index 100%
rename from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/multi_fake_to_hudi.conf
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/multi_fake_to_hudi.conf
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/mysql_cdc_to_hudi.conf
similarity index 63%
rename from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi.conf
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/mysql_cdc_to_hudi.conf
index 894f119114..310ee72ba8 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/mysql_cdc_to_hudi.conf
@@ -14,36 +14,34 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
 
 env {
-  execution.parallelism = 1
-  job.mode = "BATCH"
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
 }
 
 source {
-  FakeSource {
-    schema = {
-      fields {
-        c_map = "map<string, string>"
-        c_array = "array<int>"
-        c_string = string
-        c_boolean = boolean
-        c_tinyint = tinyint
-        c_smallint = smallint
-        c_int = int
-        c_bigint = bigint
-        c_float = float
-        c_double = double
-        c_decimal = "decimal(30, 8)"
-        c_bytes = bytes
-        c_date = date
-        c_timestamp = timestamp
-      }
+  MySQL-CDC {
+    result_table_name="customer_result_table"
+    catalog {
+      factory = Mysql
     }
-    result_table_name = "fake"
+    database-names=["mysql_cdc"]
+    table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
+    format=DEFAULT
+    username = "st_user"
+    password = "seatunnel"
+    base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
   }
 }
 
+transform {
+}
+
 sink {
   Hudi {
     op_type="UPSERT"
@@ -51,6 +49,8 @@ sink {
     database = "st"
     table_name = "st_test"
     table_type="COPY_ON_WRITE"
-    record_key_fields="c_bigint"
+    record_key_fields="id"
+    cdc_enabled = true
   }
 }
+
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/s3_fake_to_hudi.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/s3_fake_to_hudi.conf
similarity index 100%
rename from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/s3_fake_to_hudi.conf
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/hudi/s3_fake_to_hudi.conf
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/mysql/server-gtids/my.cnf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/mysql/server-gtids/my.cnf
new file mode 100644
index 0000000000..a390897885
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/mysql/server-gtids/my.cnf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# For advice on how to change settings please see
+# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
+
+[mysqld]
+#
+# Remove leading # and set to the amount of RAM for the most important data
+# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
+# innodb_buffer_pool_size = 128M
+#
+# Remove leading # to turn on a very important data integrity option: logging
+# changes to the binary log between backups.
+# log_bin
+#
+# Remove leading # to set options mainly useful for reporting servers.
+# The server defaults are faster for transactions and fast SELECTs.
+# Adjust sizes as needed, experiment to find the optimal values.
+# join_buffer_size = 128M
+# sort_buffer_size = 2M
+# read_rnd_buffer_size = 2M
+skip-host-cache
+skip-name-resolve
+#datadir=/var/lib/mysql
+#socket=/var/lib/mysql/mysql.sock
+secure-file-priv=/var/lib/mysql
+user=mysql
+
+# Disabling symbolic-links is recommended to prevent assorted security risks
+symbolic-links=0
+
+#log-error=/var/log/mysqld.log
+#pid-file=/var/run/mysqld/mysqld.pid
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but 
would
+# be longer on a production system. Row-level info is required for ingest to 
work.
+# Server ID is required, but this will vary on production systems
+server-id         = 223344
+log_bin           = mysql-bin
+expire_logs_days  = 1
+binlog_format     = row
+
+# enable gtid mode
+gtid_mode = on
+enforce_gtid_consistency = on
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/mysql/setup.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/mysql/setup.sql
new file mode 100644
index 0000000000..aa4534e0ad
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/mysql/setup.sql
@@ -0,0 +1,27 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- In production you would almost certainly limit the replication user must be 
on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For 
example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant 2 users different privileges:
+--
+-- 1) 'st_user' - all privileges required by the snapshot reader AND binlog 
reader (used for testing)
+-- 2) 'mysqluser' - all privileges
+--
+GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, 
DROP, LOCK TABLES  ON *.* TO 'st_user'@'%';
+CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
+GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';

Reply via email to