This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 9f095b71ca2f [SPARK-46609][SQL] Avoid exponential explosion in PartitioningPreservingUnaryExecNode 9f095b71ca2f is described below commit 9f095b71ca2fab7211f84fbf3a16d2f9ffb3d957 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Fri Jan 5 11:23:23 2024 -0800 [SPARK-46609][SQL] Avoid exponential explosion in PartitioningPreservingUnaryExecNode ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/37525 . When expanding the output partitioning/ordering with aliases, we have a threshold to avoid exponential explosion. However, we missed to apply this threshold in one place. This PR fixes it. ### Why are the changes needed? to avoid OOM ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? no Closes #44614 from cloud-fan/oom. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> (cherry picked from commit f8115da1a2bb33e6344dd69cc38ca7a68c3654b1) Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../sql/execution/AliasAwareOutputExpression.scala | 4 +-- .../ProjectedOrderingAndPartitioningSuite.scala | 30 ++++++++++++++++++++-- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala index e1dcab80af30..428fe65501fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala @@ -30,7 +30,7 @@ trait PartitioningPreservingUnaryExecNode extends UnaryExecNode with AliasAwareOutputExpression { final override def outputPartitioning: Partitioning = { val partitionings: Seq[Partitioning] = if (hasAlias) { - flattenPartitioning(child.outputPartitioning).flatMap { + flattenPartitioning(child.outputPartitioning).iterator.flatMap { case e: Expression => // We need unique partitionings but if the input partitioning is // `HashPartitioning(Seq(id + id))` and we have `id -> a` and `id -> b` aliases then after @@ -44,7 +44,7 @@ trait PartitioningPreservingUnaryExecNode extends UnaryExecNode .take(aliasCandidateLimit) .asInstanceOf[Stream[Partitioning]] case o => Seq(o) - } + }.take(aliasCandidateLimit).toSeq } else { // Filter valid partitiongs (only reference output attributes of the current plan node) val outputSet = AttributeSet(outputExpressions.map(_.toAttribute)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala index f5839e997560..ec13d48d45f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala @@ -17,11 +17,14 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, PartitioningCollection, UnknownPartitioning} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StringType class ProjectedOrderingAndPartitioningSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { @@ -101,6 +104,22 @@ class ProjectedOrderingAndPartitioningSuite } } + test("SPARK-46609: Avoid exponential explosion in PartitioningPreservingUnaryExecNode") { + withSQLConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT.key -> "2") { + val output = Seq(AttributeReference("a", StringType)(), AttributeReference("b", StringType)()) + val plan = ProjectExec( + Seq( + Alias(output(0), "a1")(), + Alias(output(0), "a2")(), + Alias(output(1), "b1")(), + Alias(output(1), "b2")() + ), + DummyLeafPlanExec(output) + ) + assert(plan.outputPartitioning.asInstanceOf[PartitioningCollection].partitionings.length == 2) + } + } + test("SPARK-42049: Improve AliasAwareOutputExpression - multi-references to complex " + "expressions") { val df2 = spark.range(2).repartition($"id" + $"id").selectExpr("id + id as a", "id + id as b") @@ -192,3 +211,10 @@ class ProjectedOrderingAndPartitioningSuite assert(outputOrdering.head.sameOrderExpressions.size == 0) } } + +private case class DummyLeafPlanExec(output: Seq[Attribute]) extends LeafExecNode { + override protected def doExecute(): RDD[InternalRow] = null + override def outputPartitioning: Partitioning = { + PartitioningCollection(output.map(attr => HashPartitioning(Seq(attr), 4))) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org