This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 68f940a04df8 [HUDI-9665] Control the write status RDD parallelism for 
MDT record preparation (#13649)
68f940a04df8 is described below

commit 68f940a04df8e55df4af2bb3ca0cde6792691f5b
Author: Lin Liu <[email protected]>
AuthorDate: Tue Aug 5 09:18:58 2025 -0700

    [HUDI-9665] Control the write status RDD parallelism for MDT record 
preparation (#13649)
    
    
    
    * Address comments
---
 .../SparkHoodieBackedTableMetadataWriter.java      |  6 +++
 ...ieBackedTableMetadataWriterTableVersionSix.java |  6 +++
 .../hudi/common/config/HoodieMetadataConfig.java   | 13 +++++
 .../common/config/TestHoodieMetadataConfig.java    | 62 ++++++++++++++++++++++
 4 files changed, 87 insertions(+)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index dfa6f2b1514e..909d86b4d2c0 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -187,6 +187,12 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
 
   @Override
   protected void upsertAndCommit(BaseHoodieWriteClient<?, 
JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient, String 
instantTime, JavaRDD<HoodieRecord> preppedRecordInputs) {
+    // When specified, reduce the parallelism of input record RDD to improve 
write performance.
+    int parallelism = 
dataWriteConfig.getMetadataConfig().getRecordPreparationParallelism();
+    if (parallelism > 0 && preppedRecordInputs.getNumPartitions() > 
parallelism) {
+      preppedRecordInputs = preppedRecordInputs.coalesce(parallelism);
+    }
+
     JavaRDD<WriteStatus> writeStatusJavaRDD = 
writeClient.upsertPreppedRecords(preppedRecordInputs, instantTime);
     writeClient.commit(instantTime, writeStatusJavaRDD, Option.empty(), 
DELTA_COMMIT_ACTION, Collections.emptyMap());
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
index 6377d841cd26..587759829f83 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
@@ -141,6 +141,12 @@ public class 
SparkHoodieBackedTableMetadataWriterTableVersionSix extends HoodieB
 
   @Override
   protected void upsertAndCommit(BaseHoodieWriteClient<?, 
JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient, String 
instantTime, JavaRDD<HoodieRecord> preppedRecordInputs) {
+    // When specified, reduce the parallelism of input record RDD to improve 
write performance.
+    int parallelism = 
dataWriteConfig.getMetadataConfig().getRecordPreparationParallelism();
+    if (parallelism > 0 && preppedRecordInputs.getNumPartitions() > 
parallelism) {
+      preppedRecordInputs = preppedRecordInputs.coalesce(parallelism);
+    }
+
     JavaRDD<WriteStatus> writeStatusJavaRDD = 
writeClient.upsertPreppedRecords(preppedRecordInputs, instantTime);
     writeClient.commit(instantTime, writeStatusJavaRDD, Option.empty(), 
DELTA_COMMIT_ACTION, Collections.emptyMap());
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index e8d6de3d9e1f..ffac2fddc025 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -576,6 +576,15 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
           + "bloom filter row for the files in the metadata table. Only 
applies if the filter "
           + "type (" + BLOOM_FILTER_TYPE.key() + " ) is 
BloomFilterTypeCode.DYNAMIC_V0.");
 
+  public static final ConfigProperty<Integer> RECORD_PREPARATION_PARALLELISM = 
ConfigProperty
+      .key(METADATA_PREFIX + ".record.preparation.parallelism")
+      .defaultValue(0)
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("when set to positive number, metadata table record 
preparation stages "
+          + "honor the set value for number of tasks. If not, number of write 
status's from data "
+          + "table writes will be used for metadata table record preparation");
+
   public long getMaxLogFileSize() {
     return getLong(MAX_LOG_FILE_SIZE_BYTES_PROP);
   }
@@ -845,6 +854,10 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
     return getInt(METADATA_FILE_CACHE_MAX_SIZE_MB);
   }
 
+  public int getRecordPreparationParallelism() {
+    return getIntOrDefault(RECORD_PREPARATION_PARALLELISM);
+  }
+
   /**
    * Checks if a specific metadata index is marked for dropping based on the 
metadata configuration.
    * NOTE: Only applicable for secondary indexes (SI) or expression indexes 
(EI).
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java
new file mode 100644
index 000000000000..eb97a4c4a345
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests {@link HoodieMetadataConfig}.
+ */
+class TestHoodieMetadataConfig {
+  @Test
+  void testGetRecordPreparationParallelism() {
+    // Test default value
+    HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().build();
+    assertEquals(0, config.getRecordPreparationParallelism());
+
+    // Test custom value
+    Properties props = new Properties();
+    props.put(HoodieMetadataConfig.RECORD_PREPARATION_PARALLELISM.key(), 
"100");
+    HoodieMetadataConfig configWithCustomValue = 
HoodieMetadataConfig.newBuilder()
+        .fromProperties(props)
+        .build();
+    assertEquals(100, configWithCustomValue.getRecordPreparationParallelism());
+
+    // Test zero value
+    Properties propsZero = new Properties();
+    propsZero.put(HoodieMetadataConfig.RECORD_PREPARATION_PARALLELISM.key(), 
"0");
+    HoodieMetadataConfig configWithZeroValue = 
HoodieMetadataConfig.newBuilder()
+        .fromProperties(propsZero)
+        .build();
+    assertEquals(0, configWithZeroValue.getRecordPreparationParallelism());
+
+    // Test negative value
+    Properties propsNegative = new Properties();
+    
propsNegative.put(HoodieMetadataConfig.RECORD_PREPARATION_PARALLELISM.key(), 
"-50");
+    HoodieMetadataConfig configWithNegativeValue = 
HoodieMetadataConfig.newBuilder()
+        .fromProperties(propsNegative)
+        .build();
+    assertEquals(-50, 
configWithNegativeValue.getRecordPreparationParallelism());
+  }
+}

Reply via email to