This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 89c92cc [SPARK-26504][SQL] Rope-wise dumping of Spark plans 89c92cc is described below commit 89c92ccc2046d068aea23ae7973a97c58cfdc966 Author: Maxim Gekk <maxim.g...@databricks.com> AuthorDate: Mon Dec 31 16:39:46 2018 +0100 [SPARK-26504][SQL] Rope-wise dumping of Spark plans ## What changes were proposed in this pull request? Proposed new class `StringConcat` for converting a sequence of strings to string with one memory allocation in the `toString` method. `StringConcat` replaces `StringBuilderWriter` in methods of dumping of Spark plans and codegen to strings. All `Writer` arguments are replaced by `String => Unit` in methods related to Spark plans stringification. ## How was this patch tested? It was tested by existing suites `QueryExecutionSuite`, `DebuggingSuite` as well as new tests for `StringConcat` in `StringUtilsSuite`. Closes #23406 from MaxGekk/rope-plan. Authored-by: Maxim Gekk <maxim.g...@databricks.com> Signed-off-by: Herman van Hovell <hvanhov...@databricks.com> --- .../spark/sql/catalyst/plans/QueryPlan.scala | 17 +++++ .../apache/spark/sql/catalyst/trees/TreeNode.scala | 38 +++++----- .../spark/sql/catalyst/util/StringUtils.scala | 32 ++++++++ .../spark/sql/catalyst/util/StringUtilsSuite.scala | 13 ++++ .../spark/sql/execution/QueryExecution.scala | 85 ++++++++++------------ .../sql/execution/WholeStageCodegenExec.scala | 8 +- .../apache/spark/sql/execution/debug/package.scala | 27 +++---- 7 files changed, 133 insertions(+), 87 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 125181f..8f5444e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.plans +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode} import org.apache.spark.sql.internal.SQLConf @@ -301,4 +302,20 @@ object QueryPlan extends PredicateHelper { Nil } } + + /** + * Converts the query plan to string and appends it via provided function. + */ + def append[T <: QueryPlan[T]]( + plan: => QueryPlan[T], + append: String => Unit, + verbose: Boolean, + addSuffix: Boolean, + maxFields: Int = SQLConf.get.maxToStringFields): Unit = { + try { + plan.treeString(append, verbose, addSuffix, maxFields) + } catch { + case e: AnalysisException => append(e.toString) + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 21e59bb..570a019 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -17,13 +17,11 @@ package org.apache.spark.sql.catalyst.trees -import java.io.Writer import java.util.UUID import scala.collection.Map import scala.reflect.ClassTag -import org.apache.commons.io.output.StringBuilderWriter import org.apache.commons.lang3.ClassUtils import org.json4s.JsonAST._ import org.json4s.JsonDSL._ @@ -37,6 +35,7 @@ import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} +import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -481,21 +480,18 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { verbose: Boolean, addSuffix: Boolean = false, maxFields: Int = SQLConf.get.maxToStringFields): String = { - val writer = new StringBuilderWriter() - try { - treeString(writer, verbose, addSuffix, maxFields) - writer.toString - } finally { - writer.close() - } + val concat = new StringConcat() + + treeString(concat.append, verbose, addSuffix, maxFields) + concat.toString } def treeString( - writer: Writer, + append: String => Unit, verbose: Boolean, addSuffix: Boolean, maxFields: Int): Unit = { - generateTreeString(0, Nil, writer, verbose, "", addSuffix, maxFields) + generateTreeString(0, Nil, append, verbose, "", addSuffix, maxFields) } /** @@ -558,7 +554,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { def generateTreeString( depth: Int, lastChildren: Seq[Boolean], - writer: Writer, + append: String => Unit, verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, @@ -566,9 +562,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { if (depth > 0) { lastChildren.init.foreach { isLast => - writer.write(if (isLast) " " else ": ") + append(if (isLast) " " else ": ") } - writer.write(if (lastChildren.last) "+- " else ":- ") + append(if (lastChildren.last) "+- " else ":- ") } val str = if (verbose) { @@ -576,24 +572,24 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { } else { simpleString(maxFields) } - writer.write(prefix) - writer.write(str) - writer.write("\n") + append(prefix) + append(str) + append("\n") if (innerChildren.nonEmpty) { innerChildren.init.foreach(_.generateTreeString( - depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose, + depth + 2, lastChildren :+ children.isEmpty :+ false, append, verbose, addSuffix = addSuffix, maxFields = maxFields)) innerChildren.last.generateTreeString( - depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose, + depth + 2, lastChildren :+ children.isEmpty :+ true, append, verbose, addSuffix = addSuffix, maxFields = maxFields) } if (children.nonEmpty) { children.init.foreach(_.generateTreeString( - depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix, maxFields)) + depth + 1, lastChildren :+ false, append, verbose, prefix, addSuffix, maxFields)) children.last.generateTreeString( - depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix, maxFields) + depth + 1, lastChildren :+ true, append, verbose, prefix, addSuffix, maxFields) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala index bc861a8..643b83b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.util import java.util.regex.{Pattern, PatternSyntaxException} +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.sql.AnalysisException import org.apache.spark.unsafe.types.UTF8String @@ -87,4 +89,34 @@ object StringUtils { } funcNames.toSeq } + + /** + * Concatenation of sequence of strings to final string with cheap append method + * and one memory allocation for the final string. + */ + class StringConcat { + private val strings = new ArrayBuffer[String] + private var length: Int = 0 + + /** + * Appends a string and accumulates its length to allocate a string buffer for all + * appended strings once in the toString method. + */ + def append(s: String): Unit = { + if (s != null) { + strings.append(s) + length += s.length + } + } + + /** + * The method allocates memory for all appended strings, writes them to the memory and + * returns concatenated string. + */ + override def toString: String = { + val result = new java.lang.StringBuilder(length) + strings.foreach(result.append) + result.toString + } + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala index 78fee51..616ec12 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala @@ -43,4 +43,17 @@ class StringUtilsSuite extends SparkFunSuite { assert(filterPattern(names, " a. ") === Seq("a1", "a2")) assert(filterPattern(names, " d* ") === Nil) } + + test("string concatenation") { + def concat(seq: String*): String = { + seq.foldLeft(new StringConcat())((acc, s) => {acc.append(s); acc}).toString + } + + assert(new StringConcat().toString == "") + assert(concat("") == "") + assert(concat(null) == "") + assert(concat("a") == "a") + assert(concat("1", "2") == "12") + assert(concat("abc", "\n", "123") == "abc\n123") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 9b8d2e8..7fccbf6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -17,20 +17,21 @@ package org.apache.spark.sql.execution -import java.io.{BufferedWriter, OutputStreamWriter, Writer} +import java.io.{BufferedWriter, OutputStreamWriter} import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import org.apache.commons.io.output.StringBuilderWriter import org.apache.hadoop.fs.Path import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand} import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} @@ -108,10 +109,6 @@ class QueryExecution( ReuseExchange(sparkSession.sessionState.conf), ReuseSubquery(sparkSession.sessionState.conf)) - protected def stringOrError[A](f: => A): String = - try f.toString catch { case e: AnalysisException => e.toString } - - /** * Returns the result as a hive compatible sequence of strings. This is used in tests and * `SparkSQLDriver` for CLI applications. @@ -197,55 +194,53 @@ class QueryExecution( } def simpleString: String = withRedaction { - s"""== Physical Plan == - |${stringOrError(executedPlan.treeString(verbose = false))} - """.stripMargin.trim - } - - private def writeOrError(writer: Writer)(f: Writer => Unit): Unit = { - try f(writer) - catch { - case e: AnalysisException => writer.write(e.toString) - } + val concat = new StringConcat() + concat.append("== Physical Plan ==\n") + QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = false) + concat.append("\n") + concat.toString } - private def writePlans(writer: Writer, maxFields: Int): Unit = { + private def writePlans(append: String => Unit, maxFields: Int): Unit = { val (verbose, addSuffix) = (true, false) - - writer.write("== Parsed Logical Plan ==\n") - writeOrError(writer)(logical.treeString(_, verbose, addSuffix, maxFields)) - writer.write("\n== Analyzed Logical Plan ==\n") - val analyzedOutput = stringOrError(truncatedString( - analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields)) - writer.write(analyzedOutput) - writer.write("\n") - writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix, maxFields)) - writer.write("\n== Optimized Logical Plan ==\n") - writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix, maxFields)) - writer.write("\n== Physical Plan ==\n") - writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix, maxFields)) + append("== Parsed Logical Plan ==\n") + QueryPlan.append(logical, append, verbose, addSuffix, maxFields) + append("\n== Analyzed Logical Plan ==\n") + val analyzedOutput = try { + truncatedString( + analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields) + } catch { + case e: AnalysisException => e.toString + } + append(analyzedOutput) + append("\n") + QueryPlan.append(analyzed, append, verbose, addSuffix, maxFields) + append("\n== Optimized Logical Plan ==\n") + QueryPlan.append(optimizedPlan, append, verbose, addSuffix, maxFields) + append("\n== Physical Plan ==\n") + QueryPlan.append(executedPlan, append, verbose, addSuffix, maxFields) } override def toString: String = withRedaction { - val writer = new StringBuilderWriter() - try { - writePlans(writer, SQLConf.get.maxToStringFields) - writer.toString - } finally { - writer.close() - } + val concat = new StringConcat() + writePlans(concat.append, SQLConf.get.maxToStringFields) + concat.toString } def stringWithStats: String = withRedaction { + val concat = new StringConcat() + val maxFields = SQLConf.get.maxToStringFields + // trigger to compute stats for logical plans optimizedPlan.stats // only show optimized logical plan and physical plan - s"""== Optimized Logical Plan == - |${stringOrError(optimizedPlan.treeString(verbose = true, addSuffix = true))} - |== Physical Plan == - |${stringOrError(executedPlan.treeString(verbose = true))} - """.stripMargin.trim + concat.append("== Optimized Logical Plan ==\n") + QueryPlan.append(optimizedPlan, concat.append, verbose = true, addSuffix = true, maxFields) + concat.append("\n== Physical Plan ==\n") + QueryPlan.append(executedPlan, concat.append, verbose = true, addSuffix = false, maxFields) + concat.append("\n") + concat.toString } /** @@ -282,7 +277,7 @@ class QueryExecution( /** * Dumps debug information about query execution into the specified file. * - * @param maxFields maximim number of fields converted to string representation. + * @param maxFields maximum number of fields converted to string representation. */ def toFile(path: String, maxFields: Int = Int.MaxValue): Unit = { val filePath = new Path(path) @@ -290,9 +285,9 @@ class QueryExecution( val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath))) try { - writePlans(writer, maxFields) + writePlans(writer.write, maxFields) writer.write("\n== Whole Stage Codegen ==\n") - org.apache.spark.sql.execution.debug.writeCodegen(writer, executedPlan) + org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan) } finally { writer.close() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index f4927de..3b0a996 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -493,7 +493,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod override def generateTreeString( depth: Int, lastChildren: Seq[Boolean], - writer: Writer, + append: String => Unit, verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, @@ -501,7 +501,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod child.generateTreeString( depth, lastChildren, - writer, + append, verbose, prefix = "", addSuffix = false, @@ -777,7 +777,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) override def generateTreeString( depth: Int, lastChildren: Seq[Boolean], - writer: Writer, + append: String => Unit, verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, @@ -785,7 +785,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) child.generateTreeString( depth, lastChildren, - writer, + append, verbose, s"*($codegenStageId) ", false, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index ae8197f..53b74c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -17,13 +17,10 @@ package org.apache.spark.sql.execution -import java.io.Writer import java.util.Collections import scala.collection.JavaConverters._ -import org.apache.commons.io.output.StringBuilderWriter - import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -32,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.trees.TreeNodeRef +import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamingQuery @@ -73,24 +71,19 @@ package object debug { * @return single String containing all WholeStageCodegen subtrees and corresponding codegen */ def codegenString(plan: SparkPlan): String = { - val writer = new StringBuilderWriter() - - try { - writeCodegen(writer, plan) - writer.toString - } finally { - writer.close() - } + val concat = new StringConcat() + writeCodegen(concat.append, plan) + concat.toString } - def writeCodegen(writer: Writer, plan: SparkPlan): Unit = { + def writeCodegen(append: String => Unit, plan: SparkPlan): Unit = { val codegenSeq = codegenStringSeq(plan) - writer.write(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n") + append(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n") for (((subtree, code), i) <- codegenSeq.zipWithIndex) { - writer.write(s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n") - writer.write(subtree) - writer.write("\nGenerated code:\n") - writer.write(s"${code}\n") + append(s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n") + append(subtree) + append("\nGenerated code:\n") + append(s"${code}\n") } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org