This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 68f940a04df8 [HUDI-9665] Control the write status RDD parallelism for
MDT record preparation (#13649)
68f940a04df8 is described below
commit 68f940a04df8e55df4af2bb3ca0cde6792691f5b
Author: Lin Liu <[email protected]>
AuthorDate: Tue Aug 5 09:18:58 2025 -0700
[HUDI-9665] Control the write status RDD parallelism for MDT record
preparation (#13649)
* Address comments
---
.../SparkHoodieBackedTableMetadataWriter.java | 6 +++
...ieBackedTableMetadataWriterTableVersionSix.java | 6 +++
.../hudi/common/config/HoodieMetadataConfig.java | 13 +++++
.../common/config/TestHoodieMetadataConfig.java | 62 ++++++++++++++++++++++
4 files changed, 87 insertions(+)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index dfa6f2b1514e..909d86b4d2c0 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -187,6 +187,12 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
@Override
protected void upsertAndCommit(BaseHoodieWriteClient<?,
JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient, String
instantTime, JavaRDD<HoodieRecord> preppedRecordInputs) {
+ // When specified, reduce the parallelism of input record RDD to improve
write performance.
+ int parallelism =
dataWriteConfig.getMetadataConfig().getRecordPreparationParallelism();
+ if (parallelism > 0 && preppedRecordInputs.getNumPartitions() >
parallelism) {
+ preppedRecordInputs = preppedRecordInputs.coalesce(parallelism);
+ }
+
JavaRDD<WriteStatus> writeStatusJavaRDD =
writeClient.upsertPreppedRecords(preppedRecordInputs, instantTime);
writeClient.commit(instantTime, writeStatusJavaRDD, Option.empty(),
DELTA_COMMIT_ACTION, Collections.emptyMap());
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
index 6377d841cd26..587759829f83 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
@@ -141,6 +141,12 @@ public class
SparkHoodieBackedTableMetadataWriterTableVersionSix extends HoodieB
@Override
protected void upsertAndCommit(BaseHoodieWriteClient<?,
JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient, String
instantTime, JavaRDD<HoodieRecord> preppedRecordInputs) {
+ // When specified, reduce the parallelism of input record RDD to improve
write performance.
+ int parallelism =
dataWriteConfig.getMetadataConfig().getRecordPreparationParallelism();
+ if (parallelism > 0 && preppedRecordInputs.getNumPartitions() >
parallelism) {
+ preppedRecordInputs = preppedRecordInputs.coalesce(parallelism);
+ }
+
JavaRDD<WriteStatus> writeStatusJavaRDD =
writeClient.upsertPreppedRecords(preppedRecordInputs, instantTime);
writeClient.commit(instantTime, writeStatusJavaRDD, Option.empty(),
DELTA_COMMIT_ACTION, Collections.emptyMap());
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index e8d6de3d9e1f..ffac2fddc025 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -576,6 +576,15 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
+ "bloom filter row for the files in the metadata table. Only
applies if the filter "
+ "type (" + BLOOM_FILTER_TYPE.key() + " ) is
BloomFilterTypeCode.DYNAMIC_V0.");
+ public static final ConfigProperty<Integer> RECORD_PREPARATION_PARALLELISM =
ConfigProperty
+ .key(METADATA_PREFIX + ".record.preparation.parallelism")
+ .defaultValue(0)
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("when set to positive number, metadata table record
preparation stages "
+ + "honor the set value for number of tasks. If not, number of write
status's from data "
+ + "table writes will be used for metadata table record preparation");
+
public long getMaxLogFileSize() {
return getLong(MAX_LOG_FILE_SIZE_BYTES_PROP);
}
@@ -845,6 +854,10 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return getInt(METADATA_FILE_CACHE_MAX_SIZE_MB);
}
+ public int getRecordPreparationParallelism() {
+ return getIntOrDefault(RECORD_PREPARATION_PARALLELISM);
+ }
+
/**
* Checks if a specific metadata index is marked for dropping based on the
metadata configuration.
* NOTE: Only applicable for secondary indexes (SI) or expression indexes
(EI).
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java
b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java
new file mode 100644
index 000000000000..eb97a4c4a345
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests {@link HoodieMetadataConfig}.
+ */
+class TestHoodieMetadataConfig {
+ @Test
+ void testGetRecordPreparationParallelism() {
+ // Test default value
+ HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().build();
+ assertEquals(0, config.getRecordPreparationParallelism());
+
+ // Test custom value
+ Properties props = new Properties();
+ props.put(HoodieMetadataConfig.RECORD_PREPARATION_PARALLELISM.key(),
"100");
+ HoodieMetadataConfig configWithCustomValue =
HoodieMetadataConfig.newBuilder()
+ .fromProperties(props)
+ .build();
+ assertEquals(100, configWithCustomValue.getRecordPreparationParallelism());
+
+ // Test zero value
+ Properties propsZero = new Properties();
+ propsZero.put(HoodieMetadataConfig.RECORD_PREPARATION_PARALLELISM.key(),
"0");
+ HoodieMetadataConfig configWithZeroValue =
HoodieMetadataConfig.newBuilder()
+ .fromProperties(propsZero)
+ .build();
+ assertEquals(0, configWithZeroValue.getRecordPreparationParallelism());
+
+ // Test negative value
+ Properties propsNegative = new Properties();
+
propsNegative.put(HoodieMetadataConfig.RECORD_PREPARATION_PARALLELISM.key(),
"-50");
+ HoodieMetadataConfig configWithNegativeValue =
HoodieMetadataConfig.newBuilder()
+ .fromProperties(propsNegative)
+ .build();
+ assertEquals(-50,
configWithNegativeValue.getRecordPreparationParallelism());
+ }
+}