This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b711e39c9400c8595733d57e889eecb0e88a4b99
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Tue Feb 7 07:41:28 2023 -0800

    [MINOR] Added safety-net check to catch any potential issue to deduce 
parallelism from the incoming `Dataset` appropriately (#7873)
---
 .../scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index a6488b07b51..e239db1b5a5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -203,6 +203,17 @@ object HoodieDatasetBulkInsertHelper
       .values
   }
 
+  override protected def deduceShuffleParallelism(input: DataFrame, 
configuredParallelism: Int): Int = {
+    val deduceParallelism = super.deduceShuffleParallelism(input, 
configuredParallelism)
+    // NOTE: In case parallelism deduction failed to accurately deduce 
parallelism level of the
+    //       incoming dataset we fallback to default parallelism level set for 
this Spark session
+    if (deduceParallelism > 0) {
+      deduceParallelism
+    } else {
+      input.sparkSession.sparkContext.defaultParallelism
+    }
+  }
+
   private def dropPartitionColumns(df: DataFrame, config: HoodieWriteConfig): 
DataFrame = {
     val partitionPathFields = getPartitionPathFields(config).toSet
     val nestedPartitionPathFields = partitionPathFields.filter(f => 
f.contains('.'))

Reply via email to