This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f24f8489f80 [SPARK-41106][SQL] Reduce collection conversion when create AttributeMap f24f8489f80 is described below commit f24f8489f8096c5324a6e7084437ee2238311103 Author: YangJie <yangji...@baidu.com> AuthorDate: Thu Nov 17 10:53:04 2022 -0600 [SPARK-41106][SQL] Reduce collection conversion when create AttributeMap ### What changes were proposed in this pull request? This pr aims to reduce collection conversion when create AttributeMap as following ways: 1. Add a new `apply` method to `AttributeMap` ``` def apply[A](kvs: Iterable[(Attribute, A)]): AttributeMap[A] = { new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap) } ``` and use it in applicable scenarios to avoid additional collection conversion. Although the new `apply` method is more generic, I did not delete the old ones for forward compatibility. 2. For the following 2 scenarios, `leftStats.attributeStats ++ rightStats.attributeStats` is `AttributeMap ++ AttributeMap`, will return a new `AttributeMap`, so this pr remove the redundant collection conversion. https://github.com/apache/spark/blob/7d320d784a2d637fd1a8fd0798da3d2a39b4d7cd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala#L86 https://github.com/apache/spark/blob/7d320d784a2d637fd1a8fd0798da3d2a39b4d7cd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala#L148 3. For the following scenario, `attributePercentiles` is a `Map` and there is a corresponding `apply` method can accept `Map` input, so remove the redundant `toSeq` in this pr https://github.com/apache/spark/blob/7d320d784a2d637fd1a8fd0798da3d2a39b4d7cd/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala#L323 ### Why are the changes needed? Minor performance improvements, reducing collection conversion ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - Manual test ``` dev/change-scala-version.sh 2.13 build/mvn clean install -Phadoop-3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pscala-2.13 -fn ``` All Test passed Closes #38610 from LuciferYang/AttributeMap. Lead-authored-by: YangJie <yangji...@baidu.com> Co-authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../org/apache/spark/sql/catalyst/expressions/AttributeMap.scala | 4 ++++ .../org/apache/spark/sql/catalyst/expressions/AttributeMap.scala | 4 ++++ .../org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala | 4 ++-- .../spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala | 2 +- .../spark/sql/catalyst/optimizer/NestedColumnAliasing.scala | 2 +- .../scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../org/apache/spark/sql/catalyst/optimizer/expressions.scala | 4 ++-- .../scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala | 4 ++-- .../scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 2 +- .../catalyst/plans/logical/statsEstimation/FilterEstimation.scala | 2 +- .../catalyst/plans/logical/statsEstimation/JoinEstimation.scala | 8 +++----- .../apache/spark/sql/execution/columnar/InMemoryRelation.scala | 2 +- .../org/apache/spark/sql/execution/command/CommandUtils.scala | 2 +- 13 files changed, 24 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala b/sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala index 3a424574b97..c55c542d957 100644 --- a/sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala +++ b/sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala @@ -31,6 +31,10 @@ object AttributeMap { new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap) } + def apply[A](kvs: Iterable[(Attribute, A)]): AttributeMap[A] = { + new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap) + } + def empty[A]: AttributeMap[A] = new AttributeMap(Map.empty) } diff --git a/sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala b/sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala index 1f1df2d2e1d..3d5d6471d26 100644 --- a/sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala +++ b/sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala @@ -31,6 +31,10 @@ object AttributeMap { new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap) } + def apply[A](kvs: Iterable[(Attribute, A)]): AttributeMap[A] = { + new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap) + } + def empty[A]: AttributeMap[A] = new AttributeMap(Map.empty) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 2217d783767..08847fbe958 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -1223,13 +1223,13 @@ trait TypeCoercionRule extends Rule[LogicalPlan] with Logging { // Check if the inputs have changed. val references = AttributeMap(plan.references.collect { case a if a.resolved => a -> a - }.toSeq) + }) def sameButDifferent(a: Attribute): Boolean = { references.get(a).exists(b => b.dataType != a.dataType || b.nullable != a.nullable) } val inputMap = AttributeMap(plan.inputSet.collect { case a if a.resolved && sameButDifferent(a) => a -> a - }.toSeq) + }) if (inputMap.isEmpty) { // Nothing changed. plan diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala index 53c09a3f68d..60bfc6873e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala @@ -209,7 +209,7 @@ object DecorrelateInnerQuery extends PredicateHelper { if (duplicates.nonEmpty) { val aliasMap = AttributeMap(duplicates.map { dup => dup -> Alias(dup, dup.toString)() - }.toSeq) + }) val aliasedExpressions = innerPlan.output.map { ref => aliasMap.getOrElse(ref, ref) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index 977e9b1ab13..579afa0439a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -150,7 +150,7 @@ object NestedColumnAliasing { // A reference attribute can have multiple aliases for nested fields. val attrToAliases = - AttributeMap(attributeToExtractValuesAndAliases.mapValues(_.map(_._2)).toSeq) + AttributeMap(attributeToExtractValuesAndAliases.mapValues(_.map(_._2))) plan match { case Project(projectList, child) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2bef03d633a..ecb93f6b239 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -587,7 +587,7 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { newChild } - val mapping = AttributeMap(currentNextAttrPairs.toSeq) + val mapping = AttributeMap(currentNextAttrPairs) // Create a an expression cleaning function for nodes that can actually produce redundant // aliases, use identity otherwise. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 4542ecb95fb..a59fe36ef42 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -954,7 +954,7 @@ object FoldablePropagation extends Rule[LogicalPlan] { case j: Join => val (newChildren, foldableMaps) = j.children.map(propagateFoldables).unzip val foldableMap = AttributeMap( - foldableMaps.foldLeft(Iterable.empty[(Attribute, Alias)])(_ ++ _.baseMap.values).toSeq) + foldableMaps.foldLeft(Iterable.empty[(Attribute, Alias)])(_ ++ _.baseMap.values)) val newJoin = replaceFoldable(j.withNewChildren(newChildren).asInstanceOf[Join], foldableMap) val missDerivedAttrsSet: AttributeSet = AttributeSet(newJoin.joinType match { @@ -966,7 +966,7 @@ object FoldablePropagation extends Rule[LogicalPlan] { }) val newFoldableMap = AttributeMap(foldableMap.baseMap.values.filterNot { case (attr, _) => missDerivedAttrsSet.contains(attr) - }.toSeq) + }) (newJoin, newFoldableMap) // For other plans, they are not safe to apply foldable propagation, and they should not diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index 37331362efa..52619b38098 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -85,7 +85,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { } val rewrites = AttributeMap(duplicates.map { dup => dup -> Alias(dup, dup.toString)() - }.toSeq) + }) val aliasedExpressions = subplan.output.map { ref => rewrites.getOrElse(ref, ref) } @@ -644,7 +644,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] with AliasHelpe } } } - (newChild, AttributeMap(subqueryAttrMapping.toSeq)) + (newChild, AttributeMap(subqueryAttrMapping)) } private def updateAttrs[E <: Expression]( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index cc62c81b101..0942919b176 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -301,7 +301,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] .exists(_._2.map(_._2.exprId).distinct.length > 1), "Found duplicate rewrite attributes") - val attributeRewrites = AttributeMap(attrMappingForCurrentPlan.toSeq) + val attributeRewrites = AttributeMap(attrMappingForCurrentPlan) // Using attrMapping from the children plans to rewrite their parent node. // Note that we shouldn't rewrite a node using attrMapping from its sibling nodes. newPlan = newPlan.rewriteAttrs(attributeRewrites) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index 840a475e67a..24d64399a2a 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -922,6 +922,6 @@ case class ColumnStatsMap(originalMap: AttributeMap[ColumnStat]) { attr -> oriColStat.updateCountStats( rowsBeforeFilter, rowsAfterFilter, updatedMap.get(attr.exprId).map(_._2)) } - AttributeMap(newColumnStats.toSeq) + AttributeMap(newColumnStats) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala index f2b74904f91..30f10970d1a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala @@ -83,8 +83,7 @@ case class JoinEstimation(join: Join) extends Logging { } // 3. Update statistics based on the output of join - val inputAttrStats = AttributeMap( - leftStats.attributeStats.toSeq ++ rightStats.attributeStats.toSeq) + val inputAttrStats = leftStats.attributeStats ++ rightStats.attributeStats val attributesWithStat = join.output.filter(a => inputAttrStats.get(a).map(_.hasCountStats).getOrElse(false)) val (fromLeft, fromRight) = attributesWithStat.partition(join.left.outputSet.contains(_)) @@ -145,8 +144,7 @@ case class JoinEstimation(join: Join) extends Logging { case _ => // When there is no equi-join condition, we do estimation like cartesian product. - val inputAttrStats = AttributeMap( - leftStats.attributeStats.toSeq ++ rightStats.attributeStats.toSeq) + val inputAttrStats = leftStats.attributeStats ++ rightStats.attributeStats // Propagate the original column stats val outputRows = leftStats.rowCount.get * rightStats.rowCount.get Some(Statistics( @@ -213,7 +211,7 @@ case class JoinEstimation(join: Join) extends Logging { } i += 1 } - (joinCard, AttributeMap(keyStatsAfterJoin.toSeq)) + (joinCard, AttributeMap(keyStatsAfterJoin)) } /** Returns join cardinality and the column stat for this pair of join keys. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 0ace24777b7..98f4a164a22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -394,7 +394,7 @@ case class InMemoryRelation( newColStats: Map[Attribute, ColumnStat]): Unit = this.synchronized { val newStats = statsOfPlanToCache.copy( rowCount = Some(rowCount), - attributeStats = AttributeMap((statsOfPlanToCache.attributeStats ++ newColStats).toSeq) + attributeStats = AttributeMap(statsOfPlanToCache.attributeStats ++ newColStats) ) statsOfPlanToCache = newStats } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 6883f93523b..d847868c0ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -320,7 +320,7 @@ object CommandUtils extends Logging { } } } - AttributeMap(attributePercentiles.toSeq) + AttributeMap(attributePercentiles) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org