Repository: spark
Updated Branches:
  refs/heads/branch-1.0 26f6b9893 -> 086ca9c86


[SPARK-2176][SQL] Extra unnecessary exchange operator in the result of an 
explain command

```
hql("explain select * from src group by key").collect().foreach(println)

[ExplainCommand [plan#27:0]]
[ Aggregate false, [key#25], [key#25,value#26]]
[  Exchange (HashPartitioning [key#25:0], 200)]
[   Exchange (HashPartitioning [key#25:0], 200)]
[    Aggregate true, [key#25], [key#25]]
[     HiveTableScan [key#25,value#26], (MetastoreRelation default, src, None), 
None]
```

There are two exchange operators.

However, if we do not use explain...
```
hql("select * from src group by key")

res4: org.apache.spark.sql.SchemaRDD =
SchemaRDD[8] at RDD at SchemaRDD.scala:100
== Query Plan ==
Aggregate false, [key#8], [key#8,value#9]
 Exchange (HashPartitioning [key#8:0], 200)
  Aggregate true, [key#8], [key#8]
   HiveTableScan [key#8,value#9], (MetastoreRelation default, src, None), None
```
The plan is fine.

The cause of this bug is explained below.

When we create an `execution.ExplainCommand`, we use the `executedPlan` as the 
child of this `ExplainCommand`. But, this `executedPlan` is prepared for 
execution again when we generate the `executedPlan` for the `ExplainCommand`. 
Basically, `prepareForExecution` is called twice on a physical plan. Because 
after `prepareForExecution` we have already bounded those references (in 
`BoundReference`s), `AddExchange` cannot figure out we are using the same 
partitioning (we use `AttributeReference`s to create an `ExchangeOperator` and 
then those references will be changed to `BoundReference`s after 
`prepareForExecution` is called). So, an extra `ExchangeOperator` is inserted.

I think in `CommandStrategy`, we should just use the `sparkPlan` (`sparkPlan` 
is the input of `prepareForExecution`) to initialize the `ExplainCommand` 
instead of using `executedPlan`.

The link to JIRA: https://issues.apache.org/jira/browse/SPARK-2176

Author: Yin Huai <[email protected]>

Closes #1116 from yhuai/SPARK-2176 and squashes the following commits:

197c19c [Yin Huai] Use sparkPlan to initialize a Physical Explain Command 
instead of using executedPlan.

(cherry picked from commit 587d32012ceeec1e80cec1878312f164cdb76ec8)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/086ca9c8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/086ca9c8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/086ca9c8

Branch: refs/heads/branch-1.0
Commit: 086ca9c86f36ca57668ccbc33f329f495f5caa7e
Parents: 26f6b98
Author: Yin Huai <[email protected]>
Authored: Wed Jun 18 10:51:32 2014 -0700
Committer: Reynold Xin <[email protected]>
Committed: Wed Jun 18 10:51:40 2014 -0700

----------------------------------------------------------------------
 sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala    | 2 ++
 .../scala/org/apache/spark/sql/execution/SparkStrategies.scala   | 4 ++--
 2 files changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/086ca9c8/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index f7e0332..1617ec7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -307,6 +307,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
     lazy val optimizedPlan = optimizer(analyzed)
     // TODO: Don't just pick the first one...
     lazy val sparkPlan = planner(optimizedPlan).next()
+    // executedPlan should not be used to initialize any SparkPlan. It should 
be
+    // only used for execution.
     lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
 
     /** Internal version of the RDD. Avoids copies and has no schema */

http://git-wip-us.apache.org/repos/asf/spark/blob/086ca9c8/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 2233216..70c1171 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -251,8 +251,8 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       case logical.SetCommand(key, value) =>
         Seq(execution.SetCommand(key, value, plan.output)(context))
       case logical.ExplainCommand(child) =>
-        val executedPlan = context.executePlan(child).executedPlan
-        Seq(execution.ExplainCommand(executedPlan, plan.output)(context))
+        val sparkPlan = context.executePlan(child).sparkPlan
+        Seq(execution.ExplainCommand(sparkPlan, plan.output)(context))
       case logical.CacheCommand(tableName, cache) =>
         Seq(execution.CacheCommand(tableName, cache)(context))
       case _ => Nil

Reply via email to