Repository: spark Updated Branches: refs/heads/branch-1.0 086ca9c86 -> 0e39c884c
[SPARK-2184][SQL] AddExchange isn't idempotent ...redPartitioning. Author: Michael Armbrust <[email protected]> Closes #1122 from marmbrus/fixAddExchange and squashes the following commits: 3417537 [Michael Armbrust] Don't bind partitioning expressions as that breaks comparison with requiredPartitioning. (cherry picked from commit 5ff75c748a27bcfae71759d0e509218f0c5d0200) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e39c884 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e39c884 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e39c884 Branch: refs/heads/branch-1.0 Commit: 0e39c884cf840e4213fabfeebce0ad83ea7816a7 Parents: 086ca9c Author: Michael Armbrust <[email protected]> Authored: Wed Jun 18 17:52:42 2014 -0700 Committer: Reynold Xin <[email protected]> Committed: Wed Jun 18 17:52:51 2014 -0700 ---------------------------------------------------------------------- .../spark/sql/catalyst/expressions/BoundAttribute.scala | 4 ++-- .../org/apache/spark/sql/catalyst/expressions/Row.scala | 3 +++ .../main/scala/org/apache/spark/sql/execution/Exchange.scala | 8 ++++---- 3 files changed, 9 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0e39c884/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala index 4ebf6c4..655d4a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala @@ -68,7 +68,7 @@ class BindReferences[TreeNode <: QueryPlan[TreeNode]] extends Rule[TreeNode] { } object BindReferences extends Logging { - def bindReference(expression: Expression, input: Seq[Attribute]): Expression = { + def bindReference[A <: Expression](expression: A, input: Seq[Attribute]): A = { expression.transform { case a: AttributeReference => attachTree(a, "Binding attribute") { val ordinal = input.indexWhere(_.exprId == a.exprId) @@ -83,6 +83,6 @@ object BindReferences extends Logging { BoundReference(ordinal, a) } } - } + }.asInstanceOf[A] // Kind of a hack, but safe. TODO: Tighten return type when possible. } } http://git-wip-us.apache.org/repos/asf/spark/blob/0e39c884/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala index 77b5429..74ae723 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala @@ -208,6 +208,9 @@ class GenericMutableRow(size: Int) extends GenericRow(size) with MutableRow { class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[Row] { + def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) = + this(ordering.map(BindReferences.bindReference(_, inputSchema))) + def compare(a: Row, b: Row): Int = { var i = 0 while (i < ordering.size) { http://git-wip-us.apache.org/repos/asf/spark/blob/0e39c884/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index cef2941..05dfb85 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -22,7 +22,7 @@ import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf} import org.apache.spark.rdd.ShuffledRDD import org.apache.spark.sql.{SQLConf, SQLContext, Row} import org.apache.spark.sql.catalyst.errors.attachTree -import org.apache.spark.sql.catalyst.expressions.{MutableProjection, RowOrdering} +import org.apache.spark.sql.catalyst.expressions.{NoBind, MutableProjection, RowOrdering} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.util.MutablePair @@ -31,7 +31,7 @@ import org.apache.spark.util.MutablePair * :: DeveloperApi :: */ @DeveloperApi -case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode { +case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode with NoBind { override def outputPartitioning = newPartitioning @@ -42,7 +42,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una case HashPartitioning(expressions, numPartitions) => // TODO: Eliminate redundant expressions in grouping key and value. val rdd = child.execute().mapPartitions { iter => - val hashExpressions = new MutableProjection(expressions) + val hashExpressions = new MutableProjection(expressions, child.output) val mutablePair = new MutablePair[Row, Row]() iter.map(r => mutablePair.update(hashExpressions(r), r)) } @@ -53,7 +53,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una case RangePartitioning(sortingExpressions, numPartitions) => // TODO: RangePartitioner should take an Ordering. - implicit val ordering = new RowOrdering(sortingExpressions) + implicit val ordering = new RowOrdering(sortingExpressions, child.output) val rdd = child.execute().mapPartitions { iter => val mutablePair = new MutablePair[Row, Null](null, null)
