This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f134d7e129 [Improve][Connector-Hudi] Add pre-combine field option for
hudi sink (#9496)
f134d7e129 is described below
commit f134d7e129a5c2b26435ffdfe23106bdc5e79125
Author: misi <[email protected]>
AuthorDate: Sun Jul 20 22:21:40 2025 +0800
[Improve][Connector-Hudi] Add pre-combine field option for hudi sink (#9496)
---
docs/en/connector-v2/sink/Hudi.md | 5 +++++
docs/zh/connector-v2/sink/Hudi.md | 5 +++++
.../connectors/seatunnel/hudi/catalog/HudiCatalog.java | 5 +++++
.../connectors/seatunnel/hudi/config/HudiSinkOptions.java | 6 ++++++
.../connectors/seatunnel/hudi/config/HudiTableConfig.java | 5 +++++
.../connectors/seatunnel/hudi/sink/HudiSinkFactory.java | 5 +++++
.../connectors/seatunnel/hudi/catalog/HudiCatalogTest.java | 14 ++++++++++++--
7 files changed, 43 insertions(+), 2 deletions(-)
diff --git a/docs/en/connector-v2/sink/Hudi.md
b/docs/en/connector-v2/sink/Hudi.md
index f491fccb25..c0962448c7 100644
--- a/docs/en/connector-v2/sink/Hudi.md
+++ b/docs/en/connector-v2/sink/Hudi.md
@@ -36,6 +36,7 @@ Table list configuration:
| op_type | enum | no | insert |
| record_key_fields | string | no | - |
| partition_fields | string | no | - |
+| precombine_field | string | no | - |
| batch_interval_ms | Int | no | 1000 |
| batch_size | Int | no | 1000 |
| insert_shuffle_parallelism | Int | no | 2 |
@@ -73,6 +74,10 @@ Note: When this configuration corresponds to a single table,
you can flatten the
`partition_fields` The partition key fields of hudi table, its are used to
generate partition.
+### precombine_field [string]
+
+`precombine_field` The precombine field of hudi table, its are used in
preCombining before actual write.
+
### index_type [string]
`index_type` The index type of hudi table. Currently, `BLOOM`, `SIMPLE`, and
`GLOBAL SIMPLE` are supported.
diff --git a/docs/zh/connector-v2/sink/Hudi.md
b/docs/zh/connector-v2/sink/Hudi.md
index 4052abd979..9488a9ac15 100644
--- a/docs/zh/connector-v2/sink/Hudi.md
+++ b/docs/zh/connector-v2/sink/Hudi.md
@@ -36,6 +36,7 @@ import ChangeLog from '../changelog/connector-hudi.md';
| op_type | enum | no | insert |
| record_key_fields | string | no | - |
| partition_fields | string | no | - |
+| precombine_field | string | no | - |
| batch_interval_ms | Int | no | 1000 |
| batch_size | Int | no | 1000 |
| insert_shuffle_parallelism | Int | no | 2 |
@@ -73,6 +74,10 @@ import ChangeLog from '../changelog/connector-hudi.md';
`partition_fields` Hudi 表的分区字段.
+### precombine_field [string]
+
+`precombine_field` Hudi 表的预合并字段,它用于在写入前进行预合并.
+
### index_type [string]
`index_type` Hudi 表的索引类型. 当前只支持`BLOOM`, `SIMPLE`, `GLOBAL SIMPLE`三种类型.
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java
index e87cfc31c6..9653443441 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java
@@ -55,6 +55,7 @@ import java.util.stream.Collectors;
import static
org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.CDC_ENABLED;
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.PRECOMBINE_FIELD;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.RECORD_KEY_FIELDS;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.TABLE_TYPE;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.sink.convert.AvroSchemaConverter.convertToSchema;
@@ -198,6 +199,9 @@ public class HudiCatalog implements Catalog {
RECORD_KEY_FIELDS.key(),
String.join(",", tableConfig.getRecordKeyFields().get()));
}
+ if (StringUtils.isNoneBlank(tableConfig.getPreCombineField())) {
+ options.put(PRECOMBINE_FIELD.key(),
tableConfig.getPreCombineField());
+ }
options.put(TABLE_TYPE.key(), tableType.name());
options.put(CDC_ENABLED.key(),
String.valueOf(tableConfig.isCDCEnabled()));
return CatalogTable.of(
@@ -233,6 +237,7 @@ public class HudiCatalog implements Catalog {
.setPayloadClassName(HoodieAvroPayload.class.getName())
.setCDCEnabled(
Boolean.parseBoolean(table.getOptions().get(CDC_ENABLED.key())))
+
.setPreCombineField(table.getOptions().get(PRECOMBINE_FIELD.key()))
.initTable(new HadoopStorageConfiguration(hadoopConf),
tablePathStr);
}
} catch (IOException e) {
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkOptions.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkOptions.java
index cb19ad5211..446c97c058 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkOptions.java
@@ -160,4 +160,10 @@ public class HudiSinkOptions {
.intType()
.defaultValue(30)
.withDescription("hoodie.keep.max.commits");
+
+ public static Option<String> PRECOMBINE_FIELD =
+ Options.key("precombine_field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the precombine field of hudi table");
}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableConfig.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableConfig.java
index d5546bb871..de5b766774 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableConfig.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableConfig.java
@@ -49,6 +49,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOpti
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.MIN_COMMITS_TO_KEEP;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.OP_TYPE;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.PARTITION_FIELDS;
+import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.PRECOMBINE_FIELD;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.RECORD_BYTE_SIZE;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.RECORD_KEY_FIELDS;
import static
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.TABLE_NAME;
@@ -82,6 +83,9 @@ public class HudiTableConfig implements Serializable {
@JsonProperty("partition_fields")
private String partitionFields;
+ @JsonProperty("precombine_field")
+ private String preCombineField;
+
@JsonProperty("index_type")
private HoodieIndex.IndexType indexType;
@@ -125,6 +129,7 @@ public class HudiTableConfig implements Serializable {
.opType(connectorConfig.get(OP_TYPE))
.recordKeyFields(connectorConfig.get(RECORD_KEY_FIELDS))
.partitionFields(connectorConfig.get(PARTITION_FIELDS))
+
.preCombineField(connectorConfig.get(PRECOMBINE_FIELD))
.indexType(connectorConfig.get(INDEX_TYPE))
.indexClassName(connectorConfig.get(INDEX_CLASS_NAME))
.recordByteSize(connectorConfig.get(RECORD_BYTE_SIZE))
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
index d382e0a14a..da150a7dc9 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
@@ -115,6 +115,11 @@ public class HudiSinkFactory implements TableSinkFactory {
.put(
HudiSinkOptions.CDC_ENABLED.key(),
String.valueOf(hudiTableConfig.isCdcEnabled()));
+
+ catalogTable
+ .getOptions()
+ .put(HudiSinkOptions.PRECOMBINE_FIELD.key(),
hudiTableConfig.getPreCombineField());
+
catalogTable =
CatalogTable.of(
newTableId,
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalogTest.java
b/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalogTest.java
index d3524c85c4..f0b9c15d18 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalogTest.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalogTest.java
@@ -31,11 +31,12 @@ import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
import java.util.Collections;
import java.util.HashMap;
@@ -45,7 +46,7 @@ import static
org.apache.seatunnel.api.table.type.LocalTimeType.LOCAL_DATE_TIME_
import static
org.apache.seatunnel.api.table.type.LocalTimeType.LOCAL_DATE_TYPE;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
-@Disabled
+@DisabledOnOs(OS.WINDOWS)
class HudiCatalogTest {
private static final String CATALOG_NAME = "seatunnel";
private static final String CATALOG_DIR = "/tmp/seatunnel/hudi";
@@ -124,6 +125,14 @@ class HudiCatalogTest {
@Test
@Order(8)
+ void testPrecombineField() {
+ CatalogTable table = hudicatalog.getTable(tablePath);
+ CatalogTable templateTable = buildAllTypesTable(tableIdentifier);
+ Assertions.assertEquals(table.toString(), templateTable.toString());
+ }
+
+ @Test
+ @Order(9)
void dropTable() {
hudicatalog.dropTable(tablePath, false);
Assertions.assertFalse(hudicatalog.tableExists(tablePath));
@@ -170,6 +179,7 @@ class HudiCatalogTest {
options.put("record_key_fields", "id,boolean_col");
options.put("cdc_enabled", "false");
options.put("table_type", "MERGE_ON_READ");
+ options.put("precombine_field", "integer_col");
return CatalogTable.of(
tableIdentifier, schema, options,
Collections.singletonList("dt_col"), "null");
}