Repository: spark
Updated Branches:
refs/heads/branch-1.0 26f6b9893 -> 086ca9c86
[SPARK-2176][SQL] Extra unnecessary exchange operator in the result of an
explain command
```
hql("explain select * from src group by key").collect().foreach(println)
[ExplainCommand [plan#27:0]]
[ Aggregate false, [key#25], [key#25,value#26]]
[ Exchange (HashPartitioning [key#25:0], 200)]
[ Exchange (HashPartitioning [key#25:0], 200)]
[ Aggregate true, [key#25], [key#25]]
[ HiveTableScan [key#25,value#26], (MetastoreRelation default, src, None),
None]
```
There are two exchange operators.
However, if we do not use explain...
```
hql("select * from src group by key")
res4: org.apache.spark.sql.SchemaRDD =
SchemaRDD[8] at RDD at SchemaRDD.scala:100
== Query Plan ==
Aggregate false, [key#8], [key#8,value#9]
Exchange (HashPartitioning [key#8:0], 200)
Aggregate true, [key#8], [key#8]
HiveTableScan [key#8,value#9], (MetastoreRelation default, src, None), None
```
The plan is fine.
The cause of this bug is explained below.
When we create an `execution.ExplainCommand`, we use the `executedPlan` as the
child of this `ExplainCommand`. But, this `executedPlan` is prepared for
execution again when we generate the `executedPlan` for the `ExplainCommand`.
Basically, `prepareForExecution` is called twice on a physical plan. Because
after `prepareForExecution` we have already bounded those references (in
`BoundReference`s), `AddExchange` cannot figure out we are using the same
partitioning (we use `AttributeReference`s to create an `ExchangeOperator` and
then those references will be changed to `BoundReference`s after
`prepareForExecution` is called). So, an extra `ExchangeOperator` is inserted.
I think in `CommandStrategy`, we should just use the `sparkPlan` (`sparkPlan`
is the input of `prepareForExecution`) to initialize the `ExplainCommand`
instead of using `executedPlan`.
The link to JIRA: https://issues.apache.org/jira/browse/SPARK-2176
Author: Yin Huai <[email protected]>
Closes #1116 from yhuai/SPARK-2176 and squashes the following commits:
197c19c [Yin Huai] Use sparkPlan to initialize a Physical Explain Command
instead of using executedPlan.
(cherry picked from commit 587d32012ceeec1e80cec1878312f164cdb76ec8)
Signed-off-by: Reynold Xin <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/086ca9c8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/086ca9c8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/086ca9c8
Branch: refs/heads/branch-1.0
Commit: 086ca9c86f36ca57668ccbc33f329f495f5caa7e
Parents: 26f6b98
Author: Yin Huai <[email protected]>
Authored: Wed Jun 18 10:51:32 2014 -0700
Committer: Reynold Xin <[email protected]>
Committed: Wed Jun 18 10:51:40 2014 -0700
----------------------------------------------------------------------
sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 2 ++
.../scala/org/apache/spark/sql/execution/SparkStrategies.scala | 4 ++--
2 files changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/086ca9c8/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index f7e0332..1617ec7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -307,6 +307,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
lazy val optimizedPlan = optimizer(analyzed)
// TODO: Don't just pick the first one...
lazy val sparkPlan = planner(optimizedPlan).next()
+ // executedPlan should not be used to initialize any SparkPlan. It should
be
+ // only used for execution.
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
/** Internal version of the RDD. Avoids copies and has no schema */
http://git-wip-us.apache.org/repos/asf/spark/blob/086ca9c8/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 2233216..70c1171 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -251,8 +251,8 @@ private[sql] abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
case logical.SetCommand(key, value) =>
Seq(execution.SetCommand(key, value, plan.output)(context))
case logical.ExplainCommand(child) =>
- val executedPlan = context.executePlan(child).executedPlan
- Seq(execution.ExplainCommand(executedPlan, plan.output)(context))
+ val sparkPlan = context.executePlan(child).sparkPlan
+ Seq(execution.ExplainCommand(sparkPlan, plan.output)(context))
case logical.CacheCommand(tableName, cache) =>
Seq(execution.CacheCommand(tableName, cache)(context))
case _ => Nil