This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4609ed156b7 [MINOR] Added safety-net check to catch any potential
issue to deduce parallelism from the incoming `Dataset` appropriately (#7873)
4609ed156b7 is described below
commit 4609ed156b75f2ee9f55b56669a233bcf5a60e7e
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Tue Feb 7 07:41:28 2023 -0800
[MINOR] Added safety-net check to catch any potential issue to deduce
parallelism from the incoming `Dataset` appropriately (#7873)
---
.../scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index a6488b07b51..e239db1b5a5 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -203,6 +203,17 @@ object HoodieDatasetBulkInsertHelper
.values
}
+ override protected def deduceShuffleParallelism(input: DataFrame,
configuredParallelism: Int): Int = {
+ val deduceParallelism = super.deduceShuffleParallelism(input,
configuredParallelism)
+ // NOTE: In case parallelism deduction failed to accurately deduce
parallelism level of the
+ // incoming dataset we fallback to default parallelism level set for
this Spark session
+ if (deduceParallelism > 0) {
+ deduceParallelism
+ } else {
+ input.sparkSession.sparkContext.defaultParallelism
+ }
+ }
+
private def dropPartitionColumns(df: DataFrame, config: HoodieWriteConfig):
DataFrame = {
val partitionPathFields = getPartitionPathFields(config).toSet
val nestedPartitionPathFields = partitionPathFields.filter(f =>
f.contains('.'))