Repository: spark Updated Branches: refs/heads/master 3c96937c7 -> 87ca7396c
[SPARK-24161][SS] Enable debug package feature on structured streaming ## What changes were proposed in this pull request? Currently, debug package has a implicit class "DebugQuery" which matches Dataset to provide debug features on Dataset class. It doesn't work with structured streaming: it requires query is already started, and the information can be retrieved from StreamingQuery, not Dataset. I guess that's why "explain" had to be placed to StreamingQuery whereas it already exists on Dataset. This patch adds a new implicit class "DebugStreamQuery" which matches StreamingQuery to provide similar debug features on StreamingQuery class. ## How was this patch tested? Added relevant unit tests. Author: Jungtaek Lim <kabh...@gmail.com> Closes #21222 from HeartSaVioR/SPARK-24161. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/87ca7396 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/87ca7396 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/87ca7396 Branch: refs/heads/master Commit: 87ca7396c7b21a87874d8ceb32e53119c609002c Parents: 3c96937 Author: Jungtaek Lim <kabh...@gmail.com> Authored: Mon Aug 6 15:23:47 2018 -0700 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Mon Aug 6 15:23:47 2018 -0700 ---------------------------------------------------------------------- .../spark/sql/execution/debug/package.scala | 59 +++++++++- .../spark/sql/streaming/StreamSuite.scala | 116 +++++++++++++++++++ 2 files changed, 173 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/87ca7396/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index a717cbd..366e1fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -29,6 +29,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.trees.TreeNodeRef +import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper} +import org.apache.spark.sql.execution.streaming.continuous.WriteToContinuousDataSourceExec +import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.util.{AccumulatorV2, LongAccumulator} /** @@ -40,6 +43,16 @@ import org.apache.spark.util.{AccumulatorV2, LongAccumulator} * sql("SELECT 1").debug() * sql("SELECT 1").debugCodegen() * }}} + * + * or for streaming case (structured streaming): + * {{{ + * import org.apache.spark.sql.execution.debug._ + * val query = df.writeStream.<...>.start() + * query.debugCodegen() + * }}} + * + * Note that debug in structured streaming is not supported, because it doesn't make sense for + * streaming to execute batch once while main query is running concurrently. */ package object debug { @@ -89,13 +102,49 @@ package object debug { } /** + * Get WholeStageCodegenExec subtrees and the codegen in a query plan into one String + * + * @param query the streaming query for codegen + * @return single String containing all WholeStageCodegen subtrees and corresponding codegen + */ + def codegenString(query: StreamingQuery): String = { + val w = asStreamExecution(query) + if (w.lastExecution != null) { + codegenString(w.lastExecution.executedPlan) + } else { + "No physical plan. Waiting for data." + } + } + + /** + * Get WholeStageCodegenExec subtrees and the codegen in a query plan + * + * @param query the streaming query for codegen + * @return Sequence of WholeStageCodegen subtrees and corresponding codegen + */ + def codegenStringSeq(query: StreamingQuery): Seq[(String, String)] = { + val w = asStreamExecution(query) + if (w.lastExecution != null) { + codegenStringSeq(w.lastExecution.executedPlan) + } else { + Seq.empty + } + } + + private def asStreamExecution(query: StreamingQuery): StreamExecution = query match { + case wrapper: StreamingQueryWrapper => wrapper.streamingQuery + case q: StreamExecution => q + case _ => throw new IllegalArgumentException("Parameter should be an instance of " + + "StreamExecution!") + } + + /** * Augments [[Dataset]]s with debug methods. */ implicit class DebugQuery(query: Dataset[_]) extends Logging { def debug(): Unit = { - val plan = query.queryExecution.executedPlan val visited = new collection.mutable.HashSet[TreeNodeRef]() - val debugPlan = plan transform { + val debugPlan = query.queryExecution.executedPlan transform { case s: SparkPlan if !visited.contains(new TreeNodeRef(s)) => visited += new TreeNodeRef(s) DebugExec(s) @@ -116,6 +165,12 @@ package object debug { } } + implicit class DebugStreamQuery(query: StreamingQuery) extends Logging { + def debugCodegen(): Unit = { + debugPrint(codegenString(query)) + } + } + case class DebugExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport { def output: Seq[Attribute] = child.output http://git-wip-us.apache.org/repos/asf/spark/blob/87ca7396/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index ca38f04..bf509b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -27,6 +27,7 @@ import scala.util.control.ControlThrowable import com.google.common.util.concurrent.UncheckedExecutionException import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration +import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} @@ -35,6 +36,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProvider} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -513,6 +515,120 @@ class StreamSuite extends StreamTest { } } + test("explain-continuous") { + val inputData = ContinuousMemoryStream[Int] + val df = inputData.toDS().map(_ * 2).filter(_ > 5) + + // Test `df.explain` + val explain = ExplainCommand(df.queryExecution.logical, extended = false) + val explainString = + spark.sessionState + .executePlan(explain) + .executedPlan + .executeCollect() + .map(_.getString(0)) + .mkString("\n") + assert(explainString.contains("Filter")) + assert(explainString.contains("MapElements")) + assert(!explainString.contains("LocalTableScan")) + + // Test StreamingQuery.display + val q = df.writeStream.queryName("memory_continuous_explain") + .outputMode(OutputMode.Update()).format("memory") + .trigger(Trigger.Continuous("1 seconds")) + .start() + .asInstanceOf[StreamingQueryWrapper] + .streamingQuery + try { + // in continuous mode, the query will be run even there's no data + // sleep a bit to ensure initialization + eventually(timeout(2.seconds), interval(100.milliseconds)) { + assert(q.lastExecution != null) + } + + val explainWithoutExtended = q.explainInternal(false) + + // `extended = false` only displays the physical plan. + assert("Streaming RelationV2 ContinuousMemoryStream".r + .findAllMatchIn(explainWithoutExtended).size === 0) + assert("ScanV2 ContinuousMemoryStream".r + .findAllMatchIn(explainWithoutExtended).size === 1) + + val explainWithExtended = q.explainInternal(true) + // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical + // plan. + assert("Streaming RelationV2 ContinuousMemoryStream".r + .findAllMatchIn(explainWithExtended).size === 3) + assert("ScanV2 ContinuousMemoryStream".r + .findAllMatchIn(explainWithExtended).size === 1) + } finally { + q.stop() + } + } + + test("codegen-microbatch") { + val inputData = MemoryStream[Int] + val df = inputData.toDS().map(_ * 2).filter(_ > 5) + + // Test StreamingQuery.codegen + val q = df.writeStream.queryName("memory_microbatch_codegen") + .outputMode(OutputMode.Update) + .format("memory") + .trigger(Trigger.ProcessingTime("1 seconds")) + .start() + + try { + import org.apache.spark.sql.execution.debug._ + assert("No physical plan. Waiting for data." === codegenString(q)) + assert(codegenStringSeq(q).isEmpty) + + inputData.addData(1, 2, 3, 4, 5) + q.processAllAvailable() + + assertDebugCodegenResult(q) + } finally { + q.stop() + } + } + + test("codegen-continuous") { + val inputData = ContinuousMemoryStream[Int] + val df = inputData.toDS().map(_ * 2).filter(_ > 5) + + // Test StreamingQuery.codegen + val q = df.writeStream.queryName("memory_continuous_codegen") + .outputMode(OutputMode.Update) + .format("memory") + .trigger(Trigger.Continuous("1 seconds")) + .start() + + try { + // in continuous mode, the query will be run even there's no data + // sleep a bit to ensure initialization + eventually(timeout(2.seconds), interval(100.milliseconds)) { + assert(q.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution != null) + } + + assertDebugCodegenResult(q) + } finally { + q.stop() + } + } + + private def assertDebugCodegenResult(query: StreamingQuery): Unit = { + import org.apache.spark.sql.execution.debug._ + + val codegenStr = codegenString(query) + assert(codegenStr.contains("Found 1 WholeStageCodegen subtrees.")) + // assuming that code is generated for the test query + assert(codegenStr.contains("Generated code:")) + + val codegenStrSeq = codegenStringSeq(query) + assert(codegenStrSeq.nonEmpty) + assert(codegenStrSeq.head._1.contains("*(1)")) + assert(codegenStrSeq.head._2.contains("codegenStageId=1")) + } + test("SPARK-19065: dropDuplicates should not create expressions using the same id") { withTempPath { testPath => val data = Seq((1, 2), (2, 3), (3, 4)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org