Repository: spark
Updated Branches:
refs/heads/master 91b1ef28d -> 0e4bdebec
[SPARK-15443][SQL] Fix 'explain' for streaming Dataset
## What changes were proposed in this pull request?
- Fix the `explain` command for streaming Dataset/DataFrame. E.g.,
```
== Parsed Logical Plan ==
'SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0,
java.lang.String, true], true) AS value#7]
+- 'MapElements <function1>, obj#6: java.lang.String
+- 'DeserializeToObject
unresolveddeserializer(createexternalrow(getcolumnbyordinal(0,
StringType).toString, StructField(value,StringType,true))), obj#5:
org.apache.spark.sql.Row
+- Filter <function1>.apply
+- StreamingRelation FileSource[/Users/zsx/stream], [value#0]
== Analyzed Logical Plan ==
value: string
SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0,
java.lang.String, true], true) AS value#7]
+- MapElements <function1>, obj#6: java.lang.String
+- DeserializeToObject createexternalrow(value#0.toString,
StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
+- Filter <function1>.apply
+- StreamingRelation FileSource[/Users/zsx/stream], [value#0]
== Optimized Logical Plan ==
SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0,
java.lang.String, true], true) AS value#7]
+- MapElements <function1>, obj#6: java.lang.String
+- DeserializeToObject createexternalrow(value#0.toString,
StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
+- Filter <function1>.apply
+- StreamingRelation FileSource[/Users/zsx/stream], [value#0]
== Physical Plan ==
*SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0,
java.lang.String, true], true) AS value#7]
+- *MapElements <function1>, obj#6: java.lang.String
+- *DeserializeToObject createexternalrow(value#0.toString,
StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
+- *Filter <function1>.apply
+- StreamingRelation FileSource[/Users/zsx/stream], [value#0]
```
- Add `StreamingQuery.explain` to display the last execution plan. E.g.,
```
== Parsed Logical Plan ==
SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0,
java.lang.String, true], true) AS value#7]
+- MapElements <function1>, obj#6: java.lang.String
+- DeserializeToObject createexternalrow(value#12.toString,
StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
+- Filter <function1>.apply
+- Relation[value#12] text
== Analyzed Logical Plan ==
value: string
SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0,
java.lang.String, true], true) AS value#7]
+- MapElements <function1>, obj#6: java.lang.String
+- DeserializeToObject createexternalrow(value#12.toString,
StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
+- Filter <function1>.apply
+- Relation[value#12] text
== Optimized Logical Plan ==
SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0,
java.lang.String, true], true) AS value#7]
+- MapElements <function1>, obj#6: java.lang.String
+- DeserializeToObject createexternalrow(value#12.toString,
StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
+- Filter <function1>.apply
+- Relation[value#12] text
== Physical Plan ==
*SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0,
java.lang.String, true], true) AS value#7]
+- *MapElements <function1>, obj#6: java.lang.String
+- *DeserializeToObject createexternalrow(value#12.toString,
StructField(value,StringType,true)), obj#5: org.apache.spark.sql.Row
+- *Filter <function1>.apply
+- *Scan text [value#12] Format:
org.apache.spark.sql.execution.datasources.text.TextFileFormat1836ab91,
InputPaths: file:/Users/zsx/stream/a.txt, file:/Users/zsx/stream/b.txt,
file:/Users/zsx/stream/c.txt, PushedFilters: [], ReadSchema:
struct<value:string>
```
## How was this patch tested?
The added unit tests.
Author: Shixiong Zhu <[email protected]>
Closes #13815 from zsxwing/sdf-explain.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e4bdebe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e4bdebe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e4bdebe
Branch: refs/heads/master
Commit: 0e4bdebece892edb126fa443f67c846e44e7367e
Parents: 91b1ef2
Author: Shixiong Zhu <[email protected]>
Authored: Thu Jun 23 16:04:16 2016 -0700
Committer: Yin Huai <[email protected]>
Committed: Thu Jun 23 16:04:16 2016 -0700
----------------------------------------------------------------------
.../spark/sql/execution/SparkStrategies.scala | 18 +++++++++-
.../spark/sql/execution/command/commands.scala | 11 +++++-
.../streaming/IncrementalExecution.scala | 1 +
.../execution/streaming/StreamExecution.scala | 20 +++++++++++
.../execution/streaming/StreamingRelation.scala | 14 ++++++++
.../spark/sql/streaming/StreamingQuery.scala | 15 ++++++++
.../sql/streaming/FileStreamSourceSuite.scala | 36 ++++++++++++++++++++
.../spark/sql/streaming/StreamSuite.scala | 29 ++++++++++++++++
8 files changed, 142 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0e4bdebe/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 8e2f2ed..b619d4e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
-import org.apache.spark.sql.execution.streaming.MemoryPlan
+import org.apache.spark.sql.execution.streaming.{MemoryPlan,
StreamingExecutionRelation, StreamingRelation, StreamingRelationExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQuery
@@ -307,6 +307,22 @@ private[sql] abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
}
}
+ /**
+ * This strategy is just for explaining `Dataset/DataFrame` created by
`spark.readStream`.
+ * It won't affect the execution, because `StreamingRelation` will be
replaced with
+ * `StreamingExecutionRelation` in `StreamingQueryManager` and
`StreamingExecutionRelation` will
+ * be replaced with the real relation using the `Source` in
`StreamExecution`.
+ */
+ object StreamingRelationStrategy extends Strategy {
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case s: StreamingRelation =>
+ StreamingRelationExec(s.sourceName, s.output) :: Nil
+ case s: StreamingExecutionRelation =>
+ StreamingRelationExec(s.toString, s.output) :: Nil
+ case _ => Nil
+ }
+ }
+
// Can we automate these 'pass through' operations?
object BasicOperators extends Strategy {
def numPartitions: Int = self.numPartitions
http://git-wip-us.apache.org/repos/asf/spark/blob/0e4bdebe/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 38bb6e4..7eaad81 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.debug._
+import org.apache.spark.sql.execution.streaming.IncrementalExecution
+import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._
/**
@@ -98,7 +100,14 @@ case class ExplainCommand(
// Run through the optimizer to generate the physical plan.
override def run(sparkSession: SparkSession): Seq[Row] = try {
- val queryExecution = sparkSession.sessionState.executePlan(logicalPlan)
+ val queryExecution =
+ if (logicalPlan.isStreaming) {
+ // This is used only by explaining `Dataset/DataFrame` created by
`spark.readStream`, so the
+ // output mode does not matter since there is no `Sink`.
+ new IncrementalExecution(sparkSession, logicalPlan,
OutputMode.Append(), "<unknown>", 0)
+ } else {
+ sparkSession.sessionState.executePlan(logicalPlan)
+ }
val outputString =
if (codegen) {
codegenString(queryExecution.executedPlan)
http://git-wip-us.apache.org/repos/asf/spark/blob/0e4bdebe/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index bc0e443..0ce0055 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -37,6 +37,7 @@ class IncrementalExecution private[sql](
// TODO: make this always part of planning.
val stateStrategy =
sparkSession.sessionState.planner.StatefulAggregationStrategy +:
+ sparkSession.sessionState.planner.StreamingRelationStrategy +:
sparkSession.sessionState.experimentalMethods.extraStrategies
// Modified planner with stateful operations.
http://git-wip-us.apache.org/repos/asf/spark/blob/0e4bdebe/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 1428b97..f1af79e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeMap}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming._
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
@@ -473,6 +474,25 @@ class StreamExecution(
}
}
+ /** Expose for tests */
+ def explainInternal(extended: Boolean): String = {
+ if (lastExecution == null) {
+ "N/A"
+ } else {
+ val explain = ExplainCommand(lastExecution.logical, extended = extended)
+
sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect()
+ .map(_.getString(0)).mkString("\n")
+ }
+ }
+
+ override def explain(extended: Boolean): Unit = {
+ // scalastyle:off println
+ println(explainInternal(extended))
+ // scalastyle:on println
+ }
+
+ override def explain(): Unit = explain(extended = false)
+
override def toString: String = {
s"Streaming Query - $name [state = $state]"
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e4bdebe/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index 4d65d2f..e8b0009 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -17,8 +17,11 @@
package org.apache.spark.sql.execution.streaming
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.datasources.DataSource
object StreamingRelation {
@@ -50,6 +53,17 @@ case class StreamingExecutionRelation(source: Source,
output: Seq[Attribute]) ex
override def toString: String = source.toString
}
+/**
+ * A dummy physical plan for [[StreamingRelation]] to support
+ * [[org.apache.spark.sql.Dataset.explain]]
+ */
+case class StreamingRelationExec(sourceName: String, output: Seq[Attribute])
extends LeafExecNode {
+ override def toString: String = sourceName
+ override protected def doExecute(): RDD[InternalRow] = {
+ throw new UnsupportedOperationException("StreamingRelationExec cannot be
executed")
+ }
+}
+
object StreamingExecutionRelation {
def apply(source: Source): StreamingExecutionRelation = {
StreamingExecutionRelation(source, source.schema.toAttributes)
http://git-wip-us.apache.org/repos/asf/spark/blob/0e4bdebe/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index dc81a5b..19d1ecf 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -107,6 +107,7 @@ trait StreamingQuery {
* method may block forever. Additionally, this method is only guaranteed to
block until data that
* has been synchronously appended data to a
[[org.apache.spark.sql.execution.streaming.Source]]
* prior to invocation. (i.e. `getOffset` must immediately reflect the
addition).
+ * @since 2.0.0
*/
def processAllAvailable(): Unit
@@ -116,4 +117,18 @@ trait StreamingQuery {
* @since 2.0.0
*/
def stop(): Unit
+
+ /**
+ * Prints the physical plan to the console for debugging purposes.
+ * @since 2.0.0
+ */
+ def explain(): Unit
+
+ /**
+ * Prints the physical plan to the console for debugging purposes.
+ *
+ * @param extended whether to do extended explain or not
+ * @since 2.0.0
+ */
+ def explain(extended: Boolean): Unit
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e4bdebe/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 6971f93..0eade71 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -592,6 +592,42 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
)
}
}
+
+ test("explain") {
+ withTempDirs { case (src, tmp) =>
+ src.mkdirs()
+
+ val df =
spark.readStream.format("text").load(src.getCanonicalPath).map(_ + "-x")
+ // Test `explain` not throwing errors
+ df.explain()
+
+ val q = df.writeStream.queryName("file_explain").format("memory").start()
+ .asInstanceOf[StreamExecution]
+ try {
+ assert("N/A" === q.explainInternal(false))
+ assert("N/A" === q.explainInternal(true))
+
+ val tempFile = Utils.tempFileWith(new File(tmp, "text"))
+ val finalFile = new File(src, tempFile.getName)
+ require(stringToFile(tempFile, "foo").renameTo(finalFile))
+
+ q.processAllAvailable()
+
+ val explainWithoutExtended = q.explainInternal(false)
+ // `extended = false` only displays the physical plan.
+ assert("Relation.*text".r.findAllMatchIn(explainWithoutExtended).size
=== 0)
+ assert("TextFileFormat".r.findAllMatchIn(explainWithoutExtended).size
=== 1)
+
+ val explainWithExtended = q.explainInternal(true)
+ // `extended = true` displays 3 logical plans
(Parsed/Optimized/Optimized) and 1 physical
+ // plan.
+ assert("Relation.*text".r.findAllMatchIn(explainWithExtended).size ===
3)
+ assert("TextFileFormat".r.findAllMatchIn(explainWithExtended).size ===
1)
+ } finally {
+ q.stop()
+ }
+ }
+ }
}
class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
http://git-wip-us.apache.org/repos/asf/spark/blob/0e4bdebe/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index b8e40e7..c4a894b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -242,6 +242,35 @@ class StreamSuite extends StreamTest {
val o2 = OutputMode.Complete
assert(o2 === InternalOutputModes.Complete)
}
+
+ test("explain") {
+ val inputData = MemoryStream[String]
+ val df = inputData.toDS().map(_ + "foo")
+ // Test `explain` not throwing errors
+ df.explain()
+ val q = df.writeStream.queryName("memory_explain").format("memory").start()
+ .asInstanceOf[StreamExecution]
+ try {
+ assert("N/A" === q.explainInternal(false))
+ assert("N/A" === q.explainInternal(true))
+
+ inputData.addData("abc")
+ q.processAllAvailable()
+
+ val explainWithoutExtended = q.explainInternal(false)
+ // `extended = false` only displays the physical plan.
+ assert("LocalRelation".r.findAllMatchIn(explainWithoutExtended).size ===
0)
+ assert("LocalTableScan".r.findAllMatchIn(explainWithoutExtended).size
=== 1)
+
+ val explainWithExtended = q.explainInternal(true)
+ // `extended = true` displays 3 logical plans
(Parsed/Optimized/Optimized) and 1 physical
+ // plan.
+ assert("LocalRelation".r.findAllMatchIn(explainWithExtended).size === 3)
+ assert("LocalTableScan".r.findAllMatchIn(explainWithExtended).size === 1)
+ } finally {
+ q.stop()
+ }
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]