Repository: spark
Updated Branches:
  refs/heads/master 3c96937c7 -> 87ca7396c


[SPARK-24161][SS] Enable debug package feature on structured streaming

## What changes were proposed in this pull request?

Currently, debug package has a implicit class "DebugQuery" which matches 
Dataset to provide debug features on Dataset class. It doesn't work with 
structured streaming: it requires query is already started, and the information 
can be retrieved from StreamingQuery, not Dataset. I guess that's why "explain" 
had to be placed to StreamingQuery whereas it already exists on Dataset.

This patch adds a new implicit class "DebugStreamQuery" which matches 
StreamingQuery to provide similar debug features on StreamingQuery class.

## How was this patch tested?

Added relevant unit tests.

Author: Jungtaek Lim <kabh...@gmail.com>

Closes #21222 from HeartSaVioR/SPARK-24161.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/87ca7396
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/87ca7396
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/87ca7396

Branch: refs/heads/master
Commit: 87ca7396c7b21a87874d8ceb32e53119c609002c
Parents: 3c96937
Author: Jungtaek Lim <kabh...@gmail.com>
Authored: Mon Aug 6 15:23:47 2018 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Mon Aug 6 15:23:47 2018 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/debug/package.scala     |  59 +++++++++-
 .../spark/sql/streaming/StreamSuite.scala       | 116 +++++++++++++++++++
 2 files changed, 173 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/87ca7396/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index a717cbd..366e1fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -29,6 +29,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, 
CodegenContext, ExprCode}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.trees.TreeNodeRef
+import org.apache.spark.sql.execution.streaming.{StreamExecution, 
StreamingQueryWrapper}
+import 
org.apache.spark.sql.execution.streaming.continuous.WriteToContinuousDataSourceExec
+import org.apache.spark.sql.streaming.StreamingQuery
 import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
 
 /**
@@ -40,6 +43,16 @@ import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
  *   sql("SELECT 1").debug()
  *   sql("SELECT 1").debugCodegen()
  * }}}
+ *
+ * or for streaming case (structured streaming):
+ * {{{
+ *   import org.apache.spark.sql.execution.debug._
+ *   val query = df.writeStream.<...>.start()
+ *   query.debugCodegen()
+ * }}}
+ *
+ * Note that debug in structured streaming is not supported, because it 
doesn't make sense for
+ * streaming to execute batch once while main query is running concurrently.
  */
 package object debug {
 
@@ -89,13 +102,49 @@ package object debug {
   }
 
   /**
+   * Get WholeStageCodegenExec subtrees and the codegen in a query plan into 
one String
+   *
+   * @param query the streaming query for codegen
+   * @return single String containing all WholeStageCodegen subtrees and 
corresponding codegen
+   */
+  def codegenString(query: StreamingQuery): String = {
+    val w = asStreamExecution(query)
+    if (w.lastExecution != null) {
+      codegenString(w.lastExecution.executedPlan)
+    } else {
+      "No physical plan. Waiting for data."
+    }
+  }
+
+  /**
+   * Get WholeStageCodegenExec subtrees and the codegen in a query plan
+   *
+   * @param query the streaming query for codegen
+   * @return Sequence of WholeStageCodegen subtrees and corresponding codegen
+   */
+  def codegenStringSeq(query: StreamingQuery): Seq[(String, String)] = {
+    val w = asStreamExecution(query)
+    if (w.lastExecution != null) {
+      codegenStringSeq(w.lastExecution.executedPlan)
+    } else {
+      Seq.empty
+    }
+  }
+
+  private def asStreamExecution(query: StreamingQuery): StreamExecution = 
query match {
+    case wrapper: StreamingQueryWrapper => wrapper.streamingQuery
+    case q: StreamExecution => q
+    case _ => throw new IllegalArgumentException("Parameter should be an 
instance of " +
+      "StreamExecution!")
+  }
+
+  /**
    * Augments [[Dataset]]s with debug methods.
    */
   implicit class DebugQuery(query: Dataset[_]) extends Logging {
     def debug(): Unit = {
-      val plan = query.queryExecution.executedPlan
       val visited = new collection.mutable.HashSet[TreeNodeRef]()
-      val debugPlan = plan transform {
+      val debugPlan = query.queryExecution.executedPlan transform {
         case s: SparkPlan if !visited.contains(new TreeNodeRef(s)) =>
           visited += new TreeNodeRef(s)
           DebugExec(s)
@@ -116,6 +165,12 @@ package object debug {
     }
   }
 
+  implicit class DebugStreamQuery(query: StreamingQuery) extends Logging {
+    def debugCodegen(): Unit = {
+      debugPrint(codegenString(query))
+    }
+  }
+
   case class DebugExec(child: SparkPlan) extends UnaryExecNode with 
CodegenSupport {
     def output: Seq[Attribute] = child.output
 

http://git-wip-us.apache.org/repos/asf/spark/blob/87ca7396/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index ca38f04..bf509b1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -27,6 +27,7 @@ import scala.util.control.ControlThrowable
 import com.google.common.util.concurrent.UncheckedExecutionException
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
+import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
@@ -35,6 +36,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Range
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream
 import org.apache.spark.sql.execution.streaming.state.{StateStore, 
StateStoreConf, StateStoreId, StateStoreProvider}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -513,6 +515,120 @@ class StreamSuite extends StreamTest {
     }
   }
 
+  test("explain-continuous") {
+    val inputData = ContinuousMemoryStream[Int]
+    val df = inputData.toDS().map(_ * 2).filter(_ > 5)
+
+    // Test `df.explain`
+    val explain = ExplainCommand(df.queryExecution.logical, extended = false)
+    val explainString =
+      spark.sessionState
+        .executePlan(explain)
+        .executedPlan
+        .executeCollect()
+        .map(_.getString(0))
+        .mkString("\n")
+    assert(explainString.contains("Filter"))
+    assert(explainString.contains("MapElements"))
+    assert(!explainString.contains("LocalTableScan"))
+
+    // Test StreamingQuery.display
+    val q = df.writeStream.queryName("memory_continuous_explain")
+      .outputMode(OutputMode.Update()).format("memory")
+      .trigger(Trigger.Continuous("1 seconds"))
+      .start()
+      .asInstanceOf[StreamingQueryWrapper]
+      .streamingQuery
+    try {
+      // in continuous mode, the query will be run even there's no data
+      // sleep a bit to ensure initialization
+      eventually(timeout(2.seconds), interval(100.milliseconds)) {
+        assert(q.lastExecution != null)
+      }
+
+      val explainWithoutExtended = q.explainInternal(false)
+
+      // `extended = false` only displays the physical plan.
+      assert("Streaming RelationV2 ContinuousMemoryStream".r
+        .findAllMatchIn(explainWithoutExtended).size === 0)
+      assert("ScanV2 ContinuousMemoryStream".r
+        .findAllMatchIn(explainWithoutExtended).size === 1)
+
+      val explainWithExtended = q.explainInternal(true)
+      // `extended = true` displays 3 logical plans 
(Parsed/Optimized/Optimized) and 1 physical
+      // plan.
+      assert("Streaming RelationV2 ContinuousMemoryStream".r
+        .findAllMatchIn(explainWithExtended).size === 3)
+      assert("ScanV2 ContinuousMemoryStream".r
+        .findAllMatchIn(explainWithExtended).size === 1)
+    } finally {
+      q.stop()
+    }
+  }
+
+  test("codegen-microbatch") {
+    val inputData = MemoryStream[Int]
+    val df = inputData.toDS().map(_ * 2).filter(_ > 5)
+
+    // Test StreamingQuery.codegen
+    val q = df.writeStream.queryName("memory_microbatch_codegen")
+      .outputMode(OutputMode.Update)
+      .format("memory")
+      .trigger(Trigger.ProcessingTime("1 seconds"))
+      .start()
+
+    try {
+      import org.apache.spark.sql.execution.debug._
+      assert("No physical plan. Waiting for data." === codegenString(q))
+      assert(codegenStringSeq(q).isEmpty)
+
+      inputData.addData(1, 2, 3, 4, 5)
+      q.processAllAvailable()
+
+      assertDebugCodegenResult(q)
+    } finally {
+      q.stop()
+    }
+  }
+
+  test("codegen-continuous") {
+    val inputData = ContinuousMemoryStream[Int]
+    val df = inputData.toDS().map(_ * 2).filter(_ > 5)
+
+    // Test StreamingQuery.codegen
+    val q = df.writeStream.queryName("memory_continuous_codegen")
+      .outputMode(OutputMode.Update)
+      .format("memory")
+      .trigger(Trigger.Continuous("1 seconds"))
+      .start()
+
+    try {
+      // in continuous mode, the query will be run even there's no data
+      // sleep a bit to ensure initialization
+      eventually(timeout(2.seconds), interval(100.milliseconds)) {
+        
assert(q.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution != 
null)
+      }
+
+      assertDebugCodegenResult(q)
+    } finally {
+      q.stop()
+    }
+  }
+
+  private def assertDebugCodegenResult(query: StreamingQuery): Unit = {
+    import org.apache.spark.sql.execution.debug._
+
+    val codegenStr = codegenString(query)
+    assert(codegenStr.contains("Found 1 WholeStageCodegen subtrees."))
+    // assuming that code is generated for the test query
+    assert(codegenStr.contains("Generated code:"))
+
+    val codegenStrSeq = codegenStringSeq(query)
+    assert(codegenStrSeq.nonEmpty)
+    assert(codegenStrSeq.head._1.contains("*(1)"))
+    assert(codegenStrSeq.head._2.contains("codegenStageId=1"))
+  }
+
   test("SPARK-19065: dropDuplicates should not create expressions using the 
same id") {
     withTempPath { testPath =>
       val data = Seq((1, 2), (2, 3), (3, 4))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to