This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit b711e39c9400c8595733d57e889eecb0e88a4b99 Author: Alexey Kudinkin <[email protected]> AuthorDate: Tue Feb 7 07:41:28 2023 -0800 [MINOR] Added safety-net check to catch any potential issue to deduce parallelism from the incoming `Dataset` appropriately (#7873) --- .../scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala index a6488b07b51..e239db1b5a5 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala @@ -203,6 +203,17 @@ object HoodieDatasetBulkInsertHelper .values } + override protected def deduceShuffleParallelism(input: DataFrame, configuredParallelism: Int): Int = { + val deduceParallelism = super.deduceShuffleParallelism(input, configuredParallelism) + // NOTE: In case parallelism deduction failed to accurately deduce parallelism level of the + // incoming dataset we fallback to default parallelism level set for this Spark session + if (deduceParallelism > 0) { + deduceParallelism + } else { + input.sparkSession.sparkContext.defaultParallelism + } + } + private def dropPartitionColumns(df: DataFrame, config: HoodieWriteConfig): DataFrame = { val partitionPathFields = getPartitionPathFields(config).toSet val nestedPartitionPathFields = partitionPathFields.filter(f => f.contains('.'))
