Repository: spark
Updated Branches:
refs/heads/master 8640cdb83 -> c8fb776d4
[SPARK-15692][SQL] Improves the explain output of several physical plans by
displaying embedded logical plan in tree style
## What changes were proposed in this pull request?
Improves the explain output of several physical plans by displaying embedded
logical plan in tree style
Some physical plan contains a embedded logical plan, for example, `cache
tableName query` maps to:
```
case class CacheTableCommand(
tableName: String,
plan: Option[LogicalPlan],
isLazy: Boolean)
extends RunnableCommand
```
It is easier to read the explain output if we can display the `plan` in tree
style.
**Before change:**
Everything is messed in one line.
```
scala> Seq((1,2)).toDF().createOrReplaceTempView("testView")
scala> spark.sql("cache table testView2 select * from testView").explain()
== Physical Plan ==
ExecutedCommand CacheTableCommand testView2, Some('Project [*]
+- 'UnresolvedRelation `testView`, None
), false
```
**After change:**
```
scala> spark.sql("cache table testView2 select * from testView").explain()
== Physical Plan ==
ExecutedCommand
: +- CacheTableCommand testView2, false
: : +- 'Project [*]
: : +- 'UnresolvedRelation `testView`, None
```
## How was this patch tested?
Manual test.
Author: Sean Zhong <[email protected]>
Closes #13433 from clockfly/verbose_breakdown_3_2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8fb776d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8fb776d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8fb776d
Branch: refs/heads/master
Commit: c8fb776d4a0134c47f90272c4bd5e4bba902aae5
Parents: 8640cdb
Author: Sean Zhong <[email protected]>
Authored: Wed Jun 1 17:03:39 2016 -0700
Committer: Wenchen Fan <[email protected]>
Committed: Wed Jun 1 17:03:39 2016 -0700
----------------------------------------------------------------------
.../org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 2 +-
.../org/apache/spark/sql/catalyst/trees/TreeNode.scala | 10 +++++++---
.../sql/execution/columnar/InMemoryTableScanExec.scala | 5 +++++
.../org/apache/spark/sql/execution/command/cache.scala | 6 +++++-
.../org/apache/spark/sql/execution/command/commands.scala | 6 +++---
.../sql/execution/command/createDataSourceTables.scala | 3 +++
.../org/apache/spark/sql/execution/command/views.scala | 3 +++
.../datasources/InsertIntoDataSourceCommand.scala | 3 +++
8 files changed, 30 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c8fb776d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index d4447ca..6784c3a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -264,7 +264,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
extends TreeNode[PlanT
expressions.flatMap(_.collect {case e: SubqueryExpression =>
e.plan.asInstanceOf[PlanType]})
}
- override def innerChildren: Seq[PlanType] = subqueries
+ override protected def innerChildren: Seq[QueryPlan[_]] = subqueries
/**
* Canonicalized copy of this query plan.
http://git-wip-us.apache.org/repos/asf/spark/blob/c8fb776d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index d87e6c7..3ebd815 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -424,9 +424,12 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
extends Product {
*/
protected def stringArgs: Iterator[Any] = productIterator
+ private lazy val allChildren: Set[TreeNode[_]] = (children ++
innerChildren).toSet[TreeNode[_]]
+
/** Returns a string representing the arguments to this node, minus any
children */
def argString: String = productIterator.flatMap {
- case tn: TreeNode[_] if containsChild(tn) => Nil
+ case tn: TreeNode[_] if allChildren.contains(tn) => Nil
+ case Some(tn: TreeNode[_]) if allChildren.contains(tn) => Nil
case tn: TreeNode[_] => s"${tn.simpleString}" :: Nil
case seq: Seq[BaseType] if seq.toSet.subsetOf(children.toSet) => Nil
case seq: Seq[_] => seq.mkString("[", ",", "]") :: Nil
@@ -467,9 +470,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
extends Product {
}
/**
- * All the nodes that are parts of this node, this is used by subquries.
+ * All the nodes that should be shown as a inner nested tree of this node.
+ * For example, this can be used to show sub-queries.
*/
- protected def innerChildren: Seq[BaseType] = Nil
+ protected def innerChildren: Seq[TreeNode[_]] = Seq.empty
/**
* Appends the string represent of this node and its children to the given
StringBuilder.
http://git-wip-us.apache.org/repos/asf/spark/blob/c8fb776d/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index ba61940..7ccc9de 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
@@ -70,6 +71,8 @@ private[sql] case class InMemoryRelation(
private[sql] var _batchStats: ListAccumulator[InternalRow] = null)
extends logical.LeafNode with MultiInstanceRelation {
+ override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
+
override def producedAttributes: AttributeSet = outputSet
private[sql] val batchStats: ListAccumulator[InternalRow] =
@@ -222,6 +225,8 @@ private[sql] case class InMemoryTableScanExec(
@transient relation: InMemoryRelation)
extends LeafExecNode {
+ override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++
super.innerChildren
+
private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"))
http://git-wip-us.apache.org/repos/asf/spark/blob/c8fb776d/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index b1290a4..3e5eed2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -19,15 +19,19 @@ package org.apache.spark.sql.execution.command
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
case class CacheTableCommand(
tableName: String,
plan: Option[LogicalPlan],
isLazy: Boolean)
extends RunnableCommand {
+ override protected def innerChildren: Seq[QueryPlan[_]] = {
+ plan.toSeq
+ }
+
override def run(sparkSession: SparkSession): Seq[Row] = {
plan.foreach { logicalPlan =>
Dataset.ofRows(sparkSession,
logicalPlan).createOrReplaceTempView(tableName)
http://git-wip-us.apache.org/repos/asf/spark/blob/c8fb776d/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 642a95a..38bb6e4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
@@ -57,6 +58,8 @@ private[sql] case class ExecutedCommandExec(cmd:
RunnableCommand) extends SparkP
cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
}
+ override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil
+
override def output: Seq[Attribute] = cmd.output
override def children: Seq[SparkPlan] = Nil
@@ -68,11 +71,8 @@ private[sql] case class ExecutedCommandExec(cmd:
RunnableCommand) extends SparkP
protected override def doExecute(): RDD[InternalRow] = {
sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}
-
- override def argString: String = cmd.toString
}
-
/**
* An explain command for users to see how a command will be executed.
*
http://git-wip-us.apache.org/repos/asf/spark/blob/c8fb776d/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 9956c5b..66753fa 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.HiveSerDe
@@ -138,6 +139,8 @@ case class CreateDataSourceTableAsSelectCommand(
query: LogicalPlan)
extends RunnableCommand {
+ override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
+
override def run(sparkSession: SparkSession): Seq[Row] = {
// Since we are saving metadata to metastore, we need to check if
metastore supports
// the table name and database name we have for this query.
MetaStoreUtils.validateName
http://git-wip-us.apache.org/repos/asf/spark/blob/c8fb776d/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 20c0278..b56c200 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.{AnalysisException, Row,
SparkSession}
import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable,
CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
@@ -50,6 +51,8 @@ case class CreateViewCommand(
isTemporary: Boolean)
extends RunnableCommand {
+ override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
+
// TODO: Note that this class can NOT canonicalize the view SQL string
entirely, which is
// different from Hive and may not work for some cases like create view on
self join.
http://git-wip-us.apache.org/repos/asf/spark/blob/c8fb776d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
index 25b901f..8549ae9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.sources.InsertableRelation
@@ -32,6 +33,8 @@ private[sql] case class InsertIntoDataSourceCommand(
overwrite: Boolean)
extends RunnableCommand {
+ override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
+
override def run(sparkSession: SparkSession): Seq[Row] = {
val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
val data = Dataset.ofRows(sparkSession, query)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]