This is an automated email from the ASF dual-hosted git repository.

akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a08bdc3f97 [HUDI-5363] Removing default value for shuffle parallelism 
configs (#7723)
3a08bdc3f97 is described below

commit 3a08bdc3f971b3534e8fb6f34772340cfdf055a9
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Wed Jan 25 19:29:42 2023 -0800

    [HUDI-5363] Removing default value for shuffle parallelism configs (#7723)
    
    
    Currently, we always override the parallelism (translating into the # of 
partitions in Spark for ex) of the incoming datasets no matter whether user 
requested that or not:
    
     1. If user specified shuffle parallelism explicitly, we'd use it to 
override the original one
     2. If user did NOT specify shuffle parallelism, we'd use default value of 
200
    
    Second case is problematic: we're blindly overriding parallelism of the 
data deduced by Spark (determined based on the source of the data) replacing it 
with _static_ value (having nothing to do w/ the data itself).
    
    Instead, we should only be overriding the parallelism in cases when 
corresponding configuration has been explicitly provide by the user.
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 10 +++---
 .../table/action/commit/BaseBulkInsertHelper.java  |  7 +++-
 .../hudi/table/action/commit/BaseDeleteHelper.java |  7 +++-
 .../hudi/table/action/commit/BaseWriteHelper.java  | 14 ++++++--
 .../table/action/commit/HoodieDeleteHelper.java    | 35 ++++++++++--------
 .../table/action/commit/HoodieWriteHelper.java     |  3 +-
 .../table/action/commit/ParallelismHelper.java     | 42 ++++++++++++++++++++++
 .../table/action/commit/FlinkDeleteHelper.java     |  1 +
 .../hudi/table/action/commit/FlinkWriteHelper.java |  3 +-
 .../table/action/commit/JavaBulkInsertHelper.java  | 12 ++++---
 .../hudi/table/action/commit/JavaDeleteHelper.java |  1 +
 .../hudi/table/action/commit/JavaWriteHelper.java  |  1 +
 .../table/action/commit/SparkBulkInsertHelper.java | 24 ++++++++-----
 .../hudi/HoodieDatasetBulkInsertHelper.scala       | 16 ++++++---
 .../scala/org/apache/hudi/util/JFunction.scala     |  7 +++-
 .../org/apache/spark/sql/HoodieUnsafeUtils.scala   | 11 ++++++
 16 files changed, 148 insertions(+), 46 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index fcee1b4b0d6..2f36aa725e3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -228,12 +228,12 @@ public class HoodieWriteConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> INSERT_PARALLELISM_VALUE = 
ConfigProperty
       .key("hoodie.insert.shuffle.parallelism")
-      .defaultValue("200")
+      .defaultValue("0")
       .withDocumentation("Parallelism for inserting records into the table. 
Inserts can shuffle data before writing to tune file sizes and optimize the 
storage layout.");
 
   public static final ConfigProperty<String> BULKINSERT_PARALLELISM_VALUE = 
ConfigProperty
       .key("hoodie.bulkinsert.shuffle.parallelism")
-      .defaultValue("200")
+      .defaultValue("0")
       .withDocumentation("For large initial imports using bulk_insert 
operation, controls the parallelism to use for sort modes or custom 
partitioning done"
           + "before writing records to the table.");
 
@@ -252,13 +252,13 @@ public class HoodieWriteConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> UPSERT_PARALLELISM_VALUE = 
ConfigProperty
       .key("hoodie.upsert.shuffle.parallelism")
-      .defaultValue("200")
+      .defaultValue("0")
       .withDocumentation("Parallelism to use for upsert operation on the 
table. Upserts can shuffle data to perform index lookups, file sizing, bin 
packing records optimally"
           + "into file groups.");
 
   public static final ConfigProperty<String> DELETE_PARALLELISM_VALUE = 
ConfigProperty
       .key("hoodie.delete.shuffle.parallelism")
-      .defaultValue("200")
+      .defaultValue("0")
       .withDocumentation("Parallelism used for “delete” operation. Delete 
operations also performs shuffles, similar to upsert operation.");
 
   public static final ConfigProperty<String> ROLLBACK_PARALLELISM_VALUE = 
ConfigProperty
@@ -1156,7 +1156,7 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public int getDeleteShuffleParallelism() {
-    return Math.max(getInt(DELETE_PARALLELISM_VALUE), 1);
+    return getInt(DELETE_PARALLELISM_VALUE);
   }
 
   public int getRollbackParallelism() {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
index b5599385679..451feab4e23 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.action.commit;
 
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.WriteHandleFactory;
@@ -25,7 +26,11 @@ import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
-public abstract class BaseBulkInsertHelper<T, I, K, O, R> {
+public abstract class BaseBulkInsertHelper<T, I, K, O, R> extends 
ParallelismHelper<I> {
+
+  protected BaseBulkInsertHelper(SerializableFunctionUnchecked<I, Integer> 
partitionNumberExtractor) {
+    super(partitionNumberExtractor);
+  }
 
   /**
    * Mark instant as inflight, write input records, update index and return 
result.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseDeleteHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseDeleteHelper.java
index ceeb2aeb70d..527320a6b6a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseDeleteHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseDeleteHelper.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -28,7 +29,11 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
  *
  * @param <T>
  */
-public abstract class BaseDeleteHelper<T, I, K, O, R> {
+public abstract class BaseDeleteHelper<T, I, K, O, R> extends 
ParallelismHelper<I> {
+
+  protected BaseDeleteHelper(SerializableFunctionUnchecked<I, Integer> 
partitionNumberExtractor) {
+    super(partitionNumberExtractor);
+  }
 
   /**
    * Deduplicate Hoodie records, using the given deduplication function.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index adef1c44591..6e1c1f8d9ff 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -33,20 +34,27 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
 
-public abstract class BaseWriteHelper<T, I, K, O, R> {
+public abstract class BaseWriteHelper<T, I, K, O, R> extends 
ParallelismHelper<I> {
+
+  protected BaseWriteHelper(SerializableFunctionUnchecked<I, Integer> 
partitionNumberExtractor) {
+    super(partitionNumberExtractor);
+  }
 
   public HoodieWriteMetadata<O> write(String instantTime,
                                       I inputRecords,
                                       HoodieEngineContext context,
                                       HoodieTable<T, I, K, O> table,
                                       boolean shouldCombine,
-                                      int shuffleParallelism,
+                                      int configuredShuffleParallelism,
                                       BaseCommitActionExecutor<T, I, K, O, R> 
executor,
                                       WriteOperationType operationType) {
     try {
+      int targetParallelism =
+          deduceShuffleParallelism(inputRecords, configuredShuffleParallelism);
+
       // De-dupe/merge if needed
       I dedupedRecords =
-          combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, 
table);
+          combineOnCondition(shouldCombine, inputRecords, targetParallelism, 
table);
 
       Instant lookupBegin = Instant.now();
       I taggedRecords = dedupedRecords;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
index 0d212555ab2..917c1858b45 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
@@ -48,7 +48,9 @@ import java.util.HashMap;
 @SuppressWarnings("checkstyle:LineLength")
 public class HoodieDeleteHelper<T, R> extends
     BaseDeleteHelper<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>, R> {
+
   private HoodieDeleteHelper() {
+    super(HoodieData::getNumPartitions);
   }
 
   private static class DeleteHelperHolder {
@@ -77,24 +79,18 @@ public class HoodieDeleteHelper<T, R> extends
                                                               HoodieTable<T, 
HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> 
table,
                                                               
BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>, R> deleteExecutor) {
     try {
-      HoodieData<HoodieKey> dedupedKeys = keys;
-      final int parallelism = config.getDeleteShuffleParallelism();
-      if (config.shouldCombineBeforeDelete()) {
-        // De-dupe/merge if needed
-        dedupedKeys = deduplicateKeys(keys, table, parallelism);
-      } else if (!keys.isEmpty()) {
-        dedupedKeys = keys.repartition(parallelism);
-      }
+      int targetParallelism =
+          deduceShuffleParallelism((HoodieData) keys, 
config.getDeleteShuffleParallelism());
 
-      HoodieData dedupedRecords;
-      HoodieRecordType recordType = config.getRecordMerger().getRecordType();
-      if (recordType == HoodieRecordType.AVRO) {
-        // For BWC, will remove when HoodieRecordPayload removed
-        dedupedRecords =
-            dedupedKeys.map(key -> new HoodieAvroRecord(key, new 
EmptyHoodieRecordPayload()));
+      HoodieData<HoodieKey> dedupedKeys;
+      if (config.shouldCombineBeforeDelete()) {
+        dedupedKeys = deduplicateKeys(keys, table, targetParallelism);
       } else {
-        dedupedRecords = dedupedKeys.map(key -> new HoodieEmptyRecord<>(key, 
recordType));
+        dedupedKeys = keys.repartition(targetParallelism);
       }
+
+      HoodieData dedupedRecords = createPhonyRecords(config, dedupedKeys);
+
       Instant beginTag = Instant.now();
       // perform index loop up to get existing location of records
       HoodieData<HoodieRecord<T>> taggedRecords = 
table.getIndex().tagLocation(dedupedRecords, context, table);
@@ -122,4 +118,13 @@ public class HoodieDeleteHelper<T, R> extends
     }
   }
 
+  private static HoodieData createPhonyRecords(HoodieWriteConfig config, 
HoodieData<HoodieKey> keys) {
+    HoodieRecordType recordType = config.getRecordMerger().getRecordType();
+    if (recordType == HoodieRecordType.AVRO) {
+      return keys.map(key -> new HoodieAvroRecord(key, new 
EmptyHoodieRecordPayload()));
+    } else {
+      return keys.map(key -> new HoodieEmptyRecord<>(key, recordType));
+    }
+  }
+
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index 6557f83b241..b8e761acb02 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -35,7 +35,9 @@ import java.io.IOException;
 
 public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, 
HoodieData<HoodieRecord<T>>,
     HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
+
   private HoodieWriteHelper() {
+    super(HoodieData::getNumPartitions);
   }
 
   private static class WriteHelperHolder {
@@ -79,5 +81,4 @@ public class HoodieWriteHelper<T, R> extends 
BaseWriteHelper<T, HoodieData<Hoodi
       return reducedRecord.newInstance(reducedKey);
     }, reduceParallelism).map(Pair::getRight);
   }
-
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/ParallelismHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/ParallelismHelper.java
new file mode 100644
index 00000000000..20ec4b61fc0
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/ParallelismHelper.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
+
+public abstract class ParallelismHelper<I> {
+
+  private final SerializableFunctionUnchecked<I, Integer> 
partitionNumberExtractor;
+
+  protected ParallelismHelper(SerializableFunctionUnchecked<I, Integer> 
partitionNumberExtractor) {
+    this.partitionNumberExtractor = partitionNumberExtractor;
+  }
+
+  protected int deduceShuffleParallelism(I input, int configuredParallelism) {
+    // NOTE: In case parallelism was configured by the user we will always
+    //       honor that setting.
+    //       Otherwise, we'd keep parallelism of the incoming dataset
+    if (configuredParallelism > 0) {
+      return configuredParallelism;
+    }
+
+    return partitionNumberExtractor.apply(input);
+  }
+
+}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java
index f6b172e125b..a8e2937ed3b 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java
@@ -50,6 +50,7 @@ public class FlinkDeleteHelper<R> extends
     BaseDeleteHelper<EmptyHoodieRecordPayload, 
List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>, 
List<WriteStatus>, R> {
 
   private FlinkDeleteHelper() {
+    super(ignored -> -1);
   }
 
   private static class DeleteHelperHolder {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index 88554576849..6cdb17715de 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -55,6 +55,7 @@ public class FlinkWriteHelper<T, R> extends 
BaseWriteHelper<T, List<HoodieRecord
     List<HoodieKey>, List<WriteStatus>, R> {
 
   private FlinkWriteHelper() {
+    super(ignored -> -1);
   }
 
   private static class WriteHelperHolder {
@@ -67,7 +68,7 @@ public class FlinkWriteHelper<T, R> extends 
BaseWriteHelper<T, List<HoodieRecord
 
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> write(String instantTime, 
List<HoodieRecord<T>> inputRecords, HoodieEngineContext context,
-                                                      HoodieTable<T, 
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, boolean 
shouldCombine, int shuffleParallelism,
+                                                      HoodieTable<T, 
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, boolean 
shouldCombine, int configuredShuffleParallelism,
                                                       
BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>, R> executor, WriteOperationType operationType) {
     try {
       Instant lookupBegin = Instant.now();
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
index 0c76ea168a7..c10b7d1752d 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
@@ -48,6 +48,7 @@ public class JavaBulkInsertHelper<T, R> extends 
BaseBulkInsertHelper<T, List<Hoo
     List<HoodieKey>, List<WriteStatus>, R> {
 
   private JavaBulkInsertHelper() {
+    super(ignored -> -1);
   }
 
   private static class BulkInsertHelperHolder {
@@ -94,18 +95,21 @@ public class JavaBulkInsertHelper<T, R> extends 
BaseBulkInsertHelper<T, List<Hoo
                                       boolean performDedupe,
                                       BulkInsertPartitioner partitioner,
                                       boolean useWriterSchema,
-                                      int parallelism,
+                                      int configuredParallelism,
                                       WriteHandleFactory writeHandleFactory) {
 
     // De-dupe/merge if needed
     List<HoodieRecord<T>> dedupedRecords = inputRecords;
 
+    int targetParallelism = deduceShuffleParallelism(inputRecords, 
configuredParallelism);
+
     if (performDedupe) {
-      dedupedRecords = (List<HoodieRecord<T>>) 
JavaWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(),
 inputRecords,
-          parallelism, table);
+      dedupedRecords = (List<HoodieRecord<T>>) JavaWriteHelper.newInstance()
+          .combineOnCondition(config.shouldCombineBeforeInsert(), 
inputRecords, targetParallelism, table);
     }
 
-    final List<HoodieRecord<T>> repartitionedRecords = (List<HoodieRecord<T>>) 
partitioner.repartitionRecords(dedupedRecords, parallelism);
+    final List<HoodieRecord<T>> repartitionedRecords =
+        (List<HoodieRecord<T>>) partitioner.repartitionRecords(dedupedRecords, 
targetParallelism);
 
     FileIdPrefixProvider fileIdPrefixProvider = (FileIdPrefixProvider) 
ReflectionUtils.loadClass(
         config.getFileIdPrefixProviderClassName(),
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java
index 57d796c9252..4cee88f6bf4 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java
@@ -47,6 +47,7 @@ public class JavaDeleteHelper<R> extends
     BaseDeleteHelper<EmptyHoodieRecordPayload, 
List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>, 
List<WriteStatus>, R> {
 
   private JavaDeleteHelper() {
+    super(ignored -> -1);
   }
 
   private static class DeleteHelperHolder {
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index dc109f81030..f3046bf22e2 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -41,6 +41,7 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, 
List<HoodieRecord<T
     List<HoodieKey>, List<WriteStatus>, R> {
 
   private JavaWriteHelper() {
+    super(ignored -> -1);
   }
 
   private static class WriteHelperHolder {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
index 51407eb026d..fc4b8bf1006 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
@@ -48,6 +48,7 @@ public class SparkBulkInsertHelper<T, R> extends 
BaseBulkInsertHelper<T, HoodieD
     HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
 
   private SparkBulkInsertHelper() {
+    super(HoodieData::getNumPartitions);
   }
 
   private static class BulkInsertHelperHolder {
@@ -68,17 +69,19 @@ public class SparkBulkInsertHelper<T, R> extends 
BaseBulkInsertHelper<T, HoodieD
                                                                  final 
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
     HoodieWriteMetadata result = new HoodieWriteMetadata();
 
-    //transition bulk_insert state to inflight
+    // Transition bulk_insert state to inflight
     table.getActiveTimeline().transitionRequestedToInflight(new 
HoodieInstant(HoodieInstant.State.REQUESTED,
             executor.getCommitActionType(), instantTime), Option.empty(),
         config.shouldAllowMultiWriteOnSameInstant());
 
     BulkInsertPartitioner partitioner = 
userDefinedBulkInsertPartitioner.orElse(BulkInsertInternalPartitionerFactory.get(table,
 config));
 
-    // write new files
-    HoodieData<WriteStatus> writeStatuses = bulkInsert(inputRecords, 
instantTime, table, config, performDedupe, partitioner, false,
-        config.getBulkInsertShuffleParallelism(), new 
CreateHandleFactory(false));
-    //update index
+    // Write new files
+    HoodieData<WriteStatus> writeStatuses =
+        bulkInsert(inputRecords, instantTime, table, config, performDedupe, 
partitioner, false,
+            config.getBulkInsertShuffleParallelism(), new 
CreateHandleFactory(false));
+
+    // Update index
     ((BaseSparkCommitActionExecutor) 
executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
     return result;
   }
@@ -105,19 +108,22 @@ public class SparkBulkInsertHelper<T, R> extends 
BaseBulkInsertHelper<T, HoodieD
                                             boolean performDedupe,
                                             BulkInsertPartitioner partitioner,
                                             boolean useWriterSchema,
-                                            int parallelism,
+                                            int configuredParallelism,
                                             WriteHandleFactory 
writeHandleFactory) {
 
     // De-dupe/merge if needed
     HoodieData<HoodieRecord<T>> dedupedRecords = inputRecords;
 
+    int targetParallelism = deduceShuffleParallelism(inputRecords, 
configuredParallelism);
+
     if (performDedupe) {
-      dedupedRecords = (HoodieData<HoodieRecord<T>>) 
HoodieWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(),
 inputRecords,
-          parallelism, table);
+      dedupedRecords = (HoodieData<HoodieRecord<T>>) 
HoodieWriteHelper.newInstance()
+          .combineOnCondition(config.shouldCombineBeforeInsert(), 
inputRecords, targetParallelism, table);
     }
 
     // only JavaRDD is supported for Spark partitioner, but it is not enforced 
by BulkInsertPartitioner API. To improve this, TODO HUDI-3463
-    final HoodieData<HoodieRecord<T>> repartitionedRecords = 
HoodieJavaRDD.of((JavaRDD<HoodieRecord<T>>) 
partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords), 
parallelism));
+    final HoodieData<HoodieRecord<T>> repartitionedRecords =
+        HoodieJavaRDD.of((JavaRDD<HoodieRecord<T>>) 
partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords), 
targetParallelism));
 
     JavaRDD<WriteStatus> writeStatusRDD = 
HoodieJavaRDD.getJavaRDD(repartitionedRecords)
         .mapPartitionsWithIndex(new BulkInsertMapFunction<>(instantTime,
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index ffd2d6071d1..238fed526a1 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -22,26 +22,29 @@ import org.apache.hudi.client.model.HoodieInternalRow
 import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.data.HoodieData
 import org.apache.hudi.common.engine.TaskContextSupplier
-import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload}
+import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.util.ReflectionUtils
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.index.SparkHoodieIndexFactory
 import org.apache.hudi.keygen.{BuiltinKeyGenerator, SparkKeyGeneratorInterface}
+import 
org.apache.hudi.table.action.commit.{BulkInsertDataInternalWriterHelper, 
ParallelismHelper}
 import org.apache.hudi.table.{BulkInsertPartitioner, HoodieTable}
-import org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper
+import org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, 
getNestedInternalRowValue}
+import org.apache.spark.sql.HoodieUnsafeUtils.getOutputPartitioning
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
 import org.apache.spark.sql.catalyst.plans.logical.Project
-import org.apache.spark.sql.types.{DataType, StringType, StructField, 
StructType}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeUtils, Row}
 import org.apache.spark.unsafe.types.UTF8String
 
 import scala.collection.JavaConverters.{asScalaBufferConverter, 
seqAsJavaListConverter}
 
-object HoodieDatasetBulkInsertHelper extends Logging {
+object HoodieDatasetBulkInsertHelper
+  extends ParallelismHelper[DataFrame](toJavaSerializableFunctionUnchecked(df 
=> getOutputPartitioning(df).numPartitions)) with Logging {
 
   /**
    * Prepares [[DataFrame]] for bulk-insert into Hudi table, taking following 
steps:
@@ -115,7 +118,10 @@ object HoodieDatasetBulkInsertHelper extends Logging {
       updatedDF
     }
 
-    partitioner.repartitionRecords(trimmedDF, 
config.getBulkInsertShuffleParallelism)
+    val targetParallelism =
+      deduceShuffleParallelism(trimmedDF, 
config.getBulkInsertShuffleParallelism)
+
+    partitioner.repartitionRecords(trimmedDF, targetParallelism)
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
index 0c379c27dfc..3517d641448 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
@@ -17,7 +17,7 @@
 
 package org.apache.hudi.util
 
-import org.apache.hudi.common.function.{SerializableFunction, 
SerializablePairFunction}
+import org.apache.hudi.common.function.{SerializableFunction, 
SerializableFunctionUnchecked, SerializablePairFunction}
 import org.apache.hudi.common.util.collection
 
 import scala.language.implicitConversions
@@ -53,6 +53,11 @@ object JFunction {
       override def apply(t: T): R = f.apply(t)
     }
 
+  implicit def toJavaSerializableFunctionUnchecked[T, R](f: Function[T, R]): 
SerializableFunctionUnchecked[T, R] =
+    new SerializableFunctionUnchecked[T, R] {
+      override def apply(t: T): R = f.apply(t)
+    }
+
   implicit def toJavaSerializablePairFunction[T, K, V](f: Function[T, 
collection.Pair[K, V]]): SerializablePairFunction[T, K, V] =
     new SerializablePairFunction[T, K, V] {
       override def call(t: T): collection.Pair[K, V] = f.apply(t)
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
index c981cd8113c..dfd416b6f52 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
@@ -22,6 +22,7 @@ import org.apache.hudi.HoodieUnsafeRDD
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.MutablePair
 
@@ -30,6 +31,16 @@ import org.apache.spark.util.MutablePair
  */
 object HoodieUnsafeUtils {
 
+  /**
+   * Fetches expected output [[Partitioning]] of the provided [[DataFrame]]
+   *
+   * NOTE: Invoking [[QueryExecution#executedPlan]] wouldn't actually execute 
the query (ie start pumping the data)
+   *       but instead will just execute Spark resolution, optimization and 
actual execution planning stages
+   *       returning instance of [[SparkPlan]] ready for execution
+   */
+  def getOutputPartitioning(df: DataFrame): Partitioning =
+    df.queryExecution.executedPlan.outputPartitioning
+
   /**
    * Creates [[DataFrame]] from provided [[plan]]
    *

Reply via email to