Repository: spark
Updated Branches:
refs/heads/master 252417fa2 -> 63b7f127c
[SPARK-15076][SQL] Add ReorderAssociativeOperator optimizer
## What changes were proposed in this pull request?
This issue add a new optimizer `ReorderAssociativeOperator` by taking advantage
of integral associative property. Currently, Spark works like the following.
1) Can optimize `1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + a` into `45 + a`.
2) Cannot optimize `a + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9`.
This PR can handle Case 2 for **Add/Multiply** expression whose data types are
`ByteType`, `ShortType`, `IntegerType`, and `LongType`. The followings are the
plan comparison between `before` and `after` this issue.
**Before**
```scala
scala> sql("select a+1+2+3+4+5+6+7+8+9 from (select explode(array(1))
a)").explain
== Physical Plan ==
WholeStageCodegen
: +- Project [(((((((((a#7 + 1) + 2) + 3) + 4) + 5) + 6) + 7) + 8) + 9) AS
(((((((((a + 1) + 2) + 3) + 4) + 5) + 6) + 7) + 8) + 9)#8]
: +- INPUT
+- Generate explode([1]), false, false, [a#7]
+- Scan OneRowRelation[]
scala> sql("select a*1*2*3*4*5*6*7*8*9 from (select explode(array(1))
a)").explain
== Physical Plan ==
*Project [(((((((((a#18 * 1) * 2) * 3) * 4) * 5) * 6) * 7) * 8) * 9) AS
(((((((((a * 1) * 2) * 3) * 4) * 5) * 6) * 7) * 8) * 9)#19]
+- Generate explode([1]), false, false, [a#18]
+- Scan OneRowRelation[]
```
**After**
```scala
scala> sql("select a+1+2+3+4+5+6+7+8+9 from (select explode(array(1))
a)").explain
== Physical Plan ==
WholeStageCodegen
: +- Project [(a#7 + 45) AS (((((((((a + 1) + 2) + 3) + 4) + 5) + 6) + 7) + 8)
+ 9)#8]
: +- INPUT
+- Generate explode([1]), false, false, [a#7]
+- Scan OneRowRelation[]
scala> sql("select a*1*2*3*4*5*6*7*8*9 from (select explode(array(1))
a)").explain
== Physical Plan ==
*Project [(a#18 * 362880) AS (((((((((a * 1) * 2) * 3) * 4) * 5) * 6) * 7) * 8)
* 9)#19]
+- Generate explode([1]), false, false, [a#18]
+- Scan OneRowRelation[]
```
This PR is greatly generalized by cloud-fan 's key ideas; he should be credited
for the work he did.
## How was this patch tested?
Pass the Jenkins tests including new testsuite.
Author: Dongjoon Hyun <[email protected]>
Closes #12850 from dongjoon-hyun/SPARK-15076.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/63b7f127
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/63b7f127
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/63b7f127
Branch: refs/heads/master
Commit: 63b7f127caf2fdf96eeb8457afd6c96bc8309a58
Parents: 252417f
Author: Dongjoon Hyun <[email protected]>
Authored: Thu Jun 2 09:48:58 2016 -0700
Committer: Wenchen Fan <[email protected]>
Committed: Thu Jun 2 09:48:58 2016 -0700
----------------------------------------------------------------------
.../sql/catalyst/optimizer/Optimizer.scala | 39 ++++++++++++
.../ReorderAssociativeOperatorSuite.scala | 63 ++++++++++++++++++++
2 files changed, 102 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/63b7f127/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 93762ad..11cd84b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -94,6 +94,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog,
conf: CatalystConf)
FoldablePropagation,
OptimizeIn(conf),
ConstantFolding,
+ ReorderAssociativeOperator,
LikeSimplification,
BooleanSimplification,
SimplifyConditionals,
@@ -738,6 +739,44 @@ object InferFiltersFromConstraints extends
Rule[LogicalPlan] with PredicateHelpe
}
/**
+ * Reorder associative integral-type operators and fold all constants into one.
+ */
+object ReorderAssociativeOperator extends Rule[LogicalPlan] {
+ private def flattenAdd(e: Expression): Seq[Expression] = e match {
+ case Add(l, r) => flattenAdd(l) ++ flattenAdd(r)
+ case other => other :: Nil
+ }
+
+ private def flattenMultiply(e: Expression): Seq[Expression] = e match {
+ case Multiply(l, r) => flattenMultiply(l) ++ flattenMultiply(r)
+ case other => other :: Nil
+ }
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case q: LogicalPlan => q transformExpressionsDown {
+ case a: Add if a.deterministic && a.dataType.isInstanceOf[IntegralType]
=>
+ val (foldables, others) = flattenAdd(a).partition(_.foldable)
+ if (foldables.size > 1) {
+ val foldableExpr = foldables.reduce((x, y) => Add(x, y))
+ val c = Literal.create(foldableExpr.eval(EmptyRow), a.dataType)
+ if (others.isEmpty) c else Add(others.reduce((x, y) => Add(x, y)), c)
+ } else {
+ a
+ }
+ case m: Multiply if m.deterministic &&
m.dataType.isInstanceOf[IntegralType] =>
+ val (foldables, others) = flattenMultiply(m).partition(_.foldable)
+ if (foldables.size > 1) {
+ val foldableExpr = foldables.reduce((x, y) => Multiply(x, y))
+ val c = Literal.create(foldableExpr.eval(EmptyRow), m.dataType)
+ if (others.isEmpty) c else Multiply(others.reduce((x, y) =>
Multiply(x, y)), c)
+ } else {
+ m
+ }
+ }
+ }
+}
+
+/**
* Replaces [[Expression Expressions]] that can be statically evaluated with
* equivalent [[Literal]] values.
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/63b7f127/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReorderAssociativeOperatorSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReorderAssociativeOperatorSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReorderAssociativeOperatorSuite.scala
new file mode 100644
index 0000000..05e15e9
--- /dev/null
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReorderAssociativeOperatorSuite.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class ReorderAssociativeOperatorSuite extends PlanTest {
+
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("ReorderAssociativeOperator", Once,
+ ReorderAssociativeOperator) :: Nil
+ }
+
+ val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+
+ test("Reorder associative operators") {
+ val originalQuery =
+ testRelation
+ .select(
+ (Literal(3) + ((Literal(1) + 'a) + 2)) + 4,
+ 'b * 1 * 2 * 3 * 4,
+ ('b + 1) * 2 * 3 * 4,
+ 'a + 1 + 'b + 2 + 'c + 3,
+ 'a + 1 + 'b * 2 + 'c + 3,
+ Rand(0) * 1 * 2 * 3 * 4)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+
+ val correctAnswer =
+ testRelation
+ .select(
+ ('a + 10).as("((3 + ((1 + a) + 2)) + 4)"),
+ ('b * 24).as("((((b * 1) * 2) * 3) * 4)"),
+ (('b + 1) * 24).as("((((b + 1) * 2) * 3) * 4)"),
+ ('a + 'b + 'c + 6).as("(((((a + 1) + b) + 2) + c) + 3)"),
+ ('a + 'b * 2 + 'c + 4).as("((((a + 1) + (b * 2)) + c) + 3)"),
+ Rand(0) * 1 * 2 * 3 * 4)
+ .analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]