This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f134d7e129 [Improve][Connector-Hudi] Add pre-combine field option for 
hudi sink (#9496)
f134d7e129 is described below

commit f134d7e129a5c2b26435ffdfe23106bdc5e79125
Author: misi <[email protected]>
AuthorDate: Sun Jul 20 22:21:40 2025 +0800

    [Improve][Connector-Hudi] Add pre-combine field option for hudi sink (#9496)
---
 docs/en/connector-v2/sink/Hudi.md                          |  5 +++++
 docs/zh/connector-v2/sink/Hudi.md                          |  5 +++++
 .../connectors/seatunnel/hudi/catalog/HudiCatalog.java     |  5 +++++
 .../connectors/seatunnel/hudi/config/HudiSinkOptions.java  |  6 ++++++
 .../connectors/seatunnel/hudi/config/HudiTableConfig.java  |  5 +++++
 .../connectors/seatunnel/hudi/sink/HudiSinkFactory.java    |  5 +++++
 .../connectors/seatunnel/hudi/catalog/HudiCatalogTest.java | 14 ++++++++++++--
 7 files changed, 43 insertions(+), 2 deletions(-)

diff --git a/docs/en/connector-v2/sink/Hudi.md 
b/docs/en/connector-v2/sink/Hudi.md
index f491fccb25..c0962448c7 100644
--- a/docs/en/connector-v2/sink/Hudi.md
+++ b/docs/en/connector-v2/sink/Hudi.md
@@ -36,6 +36,7 @@ Table list configuration:
 | op_type                    | enum   | no       | insert        |
 | record_key_fields          | string | no       | -             |
 | partition_fields           | string | no       | -             |
+| precombine_field           | string | no       | -             |
 | batch_interval_ms          | Int    | no       | 1000          |
 | batch_size                 | Int    | no       | 1000          |
 | insert_shuffle_parallelism | Int    | no       | 2             |
@@ -73,6 +74,10 @@ Note: When this configuration corresponds to a single table, 
you can flatten the
 
 `partition_fields` The partition key fields of hudi table, its are used to 
generate partition.
 
+### precombine_field [string]
+
+`precombine_field` The precombine field of hudi table, its are used in 
preCombining before actual write. 
+
 ### index_type [string]
 
 `index_type` The index type of hudi table. Currently, `BLOOM`, `SIMPLE`, and 
`GLOBAL SIMPLE` are supported.
diff --git a/docs/zh/connector-v2/sink/Hudi.md 
b/docs/zh/connector-v2/sink/Hudi.md
index 4052abd979..9488a9ac15 100644
--- a/docs/zh/connector-v2/sink/Hudi.md
+++ b/docs/zh/connector-v2/sink/Hudi.md
@@ -36,6 +36,7 @@ import ChangeLog from '../changelog/connector-hudi.md';
 | op_type                    | enum   | no       | insert        |
 | record_key_fields          | string | no       | -             |
 | partition_fields           | string | no       | -             |
+| precombine_field           | string | no       | -             |
 | batch_interval_ms          | Int    | no       | 1000          |
 | batch_size                 | Int    | no       | 1000          |
 | insert_shuffle_parallelism | Int    | no       | 2             |
@@ -73,6 +74,10 @@ import ChangeLog from '../changelog/connector-hudi.md';
 
 `partition_fields` Hudi 表的分区字段.
 
+### precombine_field [string]
+
+`precombine_field` Hudi 表的预合并字段,它用于在写入前进行预合并.
+
 ### index_type [string]
 
 `index_type` Hudi 表的索引类型. 当前只支持`BLOOM`, `SIMPLE`, `GLOBAL SIMPLE`三种类型.
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java
index e87cfc31c6..9653443441 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java
@@ -55,6 +55,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.CDC_ENABLED;
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.PRECOMBINE_FIELD;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.RECORD_KEY_FIELDS;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.TABLE_TYPE;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.convert.AvroSchemaConverter.convertToSchema;
@@ -198,6 +199,9 @@ public class HudiCatalog implements Catalog {
                     RECORD_KEY_FIELDS.key(),
                     String.join(",", tableConfig.getRecordKeyFields().get()));
         }
+        if (StringUtils.isNoneBlank(tableConfig.getPreCombineField())) {
+            options.put(PRECOMBINE_FIELD.key(), 
tableConfig.getPreCombineField());
+        }
         options.put(TABLE_TYPE.key(), tableType.name());
         options.put(CDC_ENABLED.key(), 
String.valueOf(tableConfig.isCDCEnabled()));
         return CatalogTable.of(
@@ -233,6 +237,7 @@ public class HudiCatalog implements Catalog {
                         .setPayloadClassName(HoodieAvroPayload.class.getName())
                         .setCDCEnabled(
                                 
Boolean.parseBoolean(table.getOptions().get(CDC_ENABLED.key())))
+                        
.setPreCombineField(table.getOptions().get(PRECOMBINE_FIELD.key()))
                         .initTable(new HadoopStorageConfiguration(hadoopConf), 
tablePathStr);
             }
         } catch (IOException e) {
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkOptions.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkOptions.java
index cb19ad5211..446c97c058 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkOptions.java
@@ -160,4 +160,10 @@ public class HudiSinkOptions {
                     .intType()
                     .defaultValue(30)
                     .withDescription("hoodie.keep.max.commits");
+
+    public static Option<String> PRECOMBINE_FIELD =
+            Options.key("precombine_field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("the precombine field of hudi table");
 }
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableConfig.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableConfig.java
index d5546bb871..de5b766774 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableConfig.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableConfig.java
@@ -49,6 +49,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOpti
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.MIN_COMMITS_TO_KEEP;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.OP_TYPE;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.PARTITION_FIELDS;
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.PRECOMBINE_FIELD;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.RECORD_BYTE_SIZE;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.RECORD_KEY_FIELDS;
 import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.TABLE_NAME;
@@ -82,6 +83,9 @@ public class HudiTableConfig implements Serializable {
     @JsonProperty("partition_fields")
     private String partitionFields;
 
+    @JsonProperty("precombine_field")
+    private String preCombineField;
+
     @JsonProperty("index_type")
     private HoodieIndex.IndexType indexType;
 
@@ -125,6 +129,7 @@ public class HudiTableConfig implements Serializable {
                             .opType(connectorConfig.get(OP_TYPE))
                             
.recordKeyFields(connectorConfig.get(RECORD_KEY_FIELDS))
                             
.partitionFields(connectorConfig.get(PARTITION_FIELDS))
+                            
.preCombineField(connectorConfig.get(PRECOMBINE_FIELD))
                             .indexType(connectorConfig.get(INDEX_TYPE))
                             
.indexClassName(connectorConfig.get(INDEX_CLASS_NAME))
                             
.recordByteSize(connectorConfig.get(RECORD_BYTE_SIZE))
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
index d382e0a14a..da150a7dc9 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
@@ -115,6 +115,11 @@ public class HudiSinkFactory implements TableSinkFactory {
                 .put(
                         HudiSinkOptions.CDC_ENABLED.key(),
                         String.valueOf(hudiTableConfig.isCdcEnabled()));
+
+        catalogTable
+                .getOptions()
+                .put(HudiSinkOptions.PRECOMBINE_FIELD.key(), 
hudiTableConfig.getPreCombineField());
+
         catalogTable =
                 CatalogTable.of(
                         newTableId,
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalogTest.java
 
b/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalogTest.java
index d3524c85c4..f0b9c15d18 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalogTest.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalogTest.java
@@ -31,11 +31,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.MethodOrderer;
 import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -45,7 +46,7 @@ import static 
org.apache.seatunnel.api.table.type.LocalTimeType.LOCAL_DATE_TIME_
 import static 
org.apache.seatunnel.api.table.type.LocalTimeType.LOCAL_DATE_TYPE;
 
 @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
-@Disabled
+@DisabledOnOs(OS.WINDOWS)
 class HudiCatalogTest {
     private static final String CATALOG_NAME = "seatunnel";
     private static final String CATALOG_DIR = "/tmp/seatunnel/hudi";
@@ -124,6 +125,14 @@ class HudiCatalogTest {
 
     @Test
     @Order(8)
+    void testPrecombineField() {
+        CatalogTable table = hudicatalog.getTable(tablePath);
+        CatalogTable templateTable = buildAllTypesTable(tableIdentifier);
+        Assertions.assertEquals(table.toString(), templateTable.toString());
+    }
+
+    @Test
+    @Order(9)
     void dropTable() {
         hudicatalog.dropTable(tablePath, false);
         Assertions.assertFalse(hudicatalog.tableExists(tablePath));
@@ -170,6 +179,7 @@ class HudiCatalogTest {
         options.put("record_key_fields", "id,boolean_col");
         options.put("cdc_enabled", "false");
         options.put("table_type", "MERGE_ON_READ");
+        options.put("precombine_field", "integer_col");
         return CatalogTable.of(
                 tableIdentifier, schema, options, 
Collections.singletonList("dt_col"), "null");
     }

Reply via email to