This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4771638b3a68 [SPARK-50229] Reduce memory usage on driver for wide
schemas by reducing the lifetime of AttributeReference objects created during
logical planning
4771638b3a68 is described below
commit 4771638b3a6873e329bb3bdde5abeeeac059acb8
Author: Utkarsh <[email protected]>
AuthorDate: Tue Nov 5 20:01:21 2024 -0800
[SPARK-50229] Reduce memory usage on driver for wide schemas by reducing
the lifetime of AttributeReference objects created during logical planning
### What changes were proposed in this pull request?
This PR changes the `allAttributes` in `QueryPlan` to be a `def` instead of
a `lazy val` to avoid holding on to a copy of `AttributeReferences` in the
`QueryPlan` object. This change should not result in a performance penalty as
`allAttributes` is used only once during canonicalization in `doCanonicalize`
(which only happens once since it itself is a `lazy val`).
## Context
The allAttributes method in QueryPlan
([code](https://github.com/apache/spark/blob/57f6824e78e2e615778827ddebce9d7fcaae1698/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L2101))
unions the output of all of its children. Although this is okay in an
optimized plan, in a pre-optimized analyzed plan, these attributes add up
multiplicatively with the size of the plan. Because output is usually defined
as a def, each node’s allAttributes [...]
Here is a simple example with TPC-DS Q42. SQL:
```
select dt.d_year, item.i_category_id, item.i_category,
sum(ss_ext_sales_price)
from date_dim dt, store_sales, item
where dt.d_date_sk = store_sales.ss_sold_date_sk
and store_sales.ss_item_sk = item.i_item_sk
and item.i_manager_id = 1
and dt.d_moy=11
and dt.d_year=2000
group by dt.d_year
,item.i_category_id
,item.i_category
order by sum(ss_ext_sales_price) desc,dt.d_year
,item.i_category_id
,item.i_category
limit 100
```
If we print out the size of each operator’s output and the size of its
allAttributes:
```
GlobalLimit: allAttrs: 4, output: 4
LocalLimit: allAttrs: 4, output: 4
Sort: allAttrs: 4, output: 4
Aggregate: allAttrs: 73, output: 4
Filter: allAttrs: 73, output: 73
Join: allAttrs: 73, output: 73
Join: allAttrs: 51, output: 51
SubqueryAlias: allAttrs: 28, output: 28
SubqueryAlias: allAttrs: 28, output: 28
LogicalRelation: allAttrs: 0, output: 28
SubqueryAlias: allAttrs: 23, output: 23
LogicalRelation: allAttrs: 0, output: 23
SubqueryAlias: allAttrs: 22, output: 22
LogicalRelation: allAttrs: 0, output: 22
```
Note how the joins and aggregate have 73 attributes each, by adding the
width of each relation. For prod queries with wide schemas, this issue is much
worse. Optimized plans after column pruning look far better:
```
Aggregate: allAttrs: 0, output: 1
Project: allAttrs: 2, output: 0
Join: allAttrs: 2, output: 2
Project: allAttrs: 3, output: 1
Join: allAttrs: 3, output: 3
Project: allAttrs: 4, output: 2
Join: allAttrs: 4, output: 4
Project: allAttrs: 5, output: 3
Join: allAttrs: 5, output: 5
Project: allAttrs: 23, output: 4
Filter: allAttrs: 23, output: 23
LogicalRelation: allAttrs: 0, output: 23
Project: allAttrs: 29, output: 1
Filter: allAttrs: 29, output: 29
LogicalRelation: allAttrs: 0, output: 29
Project: allAttrs: 13, output: 1
Filter: allAttrs: 13, output: 13
LogicalRelation: allAttrs: 0, output: 13
Project: allAttrs: 22, output: 1
Filter: allAttrs: 22, output: 22
LogicalRelation: allAttrs: 0, output: 22
Project: allAttrs: 18, output: 1
Filter: allAttrs: 18, output: 18
LogicalRelation: allAttrs: 0, output: 18
```
### Why are the changes needed?
Reduce driver's heap usage.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48762 from utkarsh39/SPARK-50229.
Authored-by: Utkarsh <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 15 +++++++++++----
.../catalyst/plans/logical/basicLogicalOperators.scala | 2 +-
.../spark/sql/execution/aggregate/HashAggregateExec.scala | 2 +-
.../sql/execution/aggregate/ObjectHashAggregateExec.scala | 2 +-
4 files changed, 14 insertions(+), 7 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 9418bf298b29..23813d94c549 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -637,22 +637,23 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
protected def doCanonicalize(): PlanType = {
val canonicalizedChildren = children.map(_.canonicalized)
var id = -1
+ val allAttributesSeq = this.allAttributes
mapExpressions {
case a: Alias =>
id += 1
// As the root of the expression, Alias will always take an arbitrary
exprId, we need to
// normalize that for equality testing, by assigning expr id from 0
incrementally. The
// alias name doesn't matter and should be erased.
- val normalizedChild = QueryPlan.normalizeExpressions(a.child,
allAttributes)
+ val normalizedChild = QueryPlan.normalizeExpressions(a.child,
allAttributesSeq)
Alias(normalizedChild, "")(ExprId(id), a.qualifier)
- case ar: AttributeReference if allAttributes.indexOf(ar.exprId) == -1 =>
+ case ar: AttributeReference if allAttributesSeq.indexOf(ar.exprId) == -1
=>
// Top level `AttributeReference` may also be used for output like
`Alias`, we should
// normalize the exprId too.
id += 1
ar.withExprId(ExprId(id)).canonicalized
- case other => QueryPlan.normalizeExpressions(other, allAttributes)
+ case other => QueryPlan.normalizeExpressions(other, allAttributesSeq)
}.withNewChildren(canonicalizedChildren)
}
@@ -678,8 +679,14 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
/**
* All the attributes that are used for this plan.
+ *
+ * `def` instead of a `lazy val` to avoid holding references to a large
number of
+ * attributes, thereby reducing memory pressure on the driver. The number of
attributes
+ * referenced here can be very large, especially for logical plans with wide
schemas where the
+ * column pruning hasn't happened yet. Holding references to all of them can
lead to
+ * significant memory overhead on the driver.
*/
- lazy val allAttributes: AttributeSeq = children.flatMap(_.output)
+ def allAttributes: AttributeSeq = children.flatMap(_.output)
}
object QueryPlan extends PredicateHelper {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 58c57a1692d8..15f52e856bef 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -2098,7 +2098,7 @@ case class LateralJoin(
joinType: JoinType,
condition: Option[Expression]) extends UnaryNode {
- override lazy val allAttributes: AttributeSeq = left.output ++
right.plan.output
+ override def allAttributes: AttributeSeq = left.output ++ right.plan.output
require(Seq(Inner, LeftOuter, Cross).contains(joinType),
s"Unsupported lateral join type $joinType")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 750b74aab384..469f42dcc0af 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -63,7 +63,7 @@ case class HashAggregateExec(
require(Aggregate.supportsHashAggregate(aggregateBufferAttributes,
groupingExpressions))
- override lazy val allAttributes: AttributeSeq =
+ override def allAttributes: AttributeSeq =
child.output ++ aggregateBufferAttributes ++ aggregateAttributes ++
aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
index e6530e94701f..7e8ce3e884a3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
@@ -69,7 +69,7 @@ case class ObjectHashAggregateExec(
child: SparkPlan)
extends BaseAggregateExec {
- override lazy val allAttributes: AttributeSeq =
+ override def allAttributes: AttributeSeq =
child.output ++ aggregateBufferAttributes ++ aggregateAttributes ++
aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]