This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 89c92cc  [SPARK-26504][SQL] Rope-wise dumping of Spark plans
89c92cc is described below

commit 89c92ccc2046d068aea23ae7973a97c58cfdc966
Author: Maxim Gekk <maxim.g...@databricks.com>
AuthorDate: Mon Dec 31 16:39:46 2018 +0100

    [SPARK-26504][SQL] Rope-wise dumping of Spark plans
    
    ## What changes were proposed in this pull request?
    
    Proposed new class `StringConcat` for converting a sequence of strings to 
string with one memory allocation in the `toString` method.  `StringConcat` 
replaces `StringBuilderWriter` in methods of dumping of Spark plans and codegen 
to strings.
    
    All `Writer` arguments are replaced by `String => Unit` in methods related 
to Spark plans stringification.
    
    ## How was this patch tested?
    
    It was tested by existing suites `QueryExecutionSuite`, `DebuggingSuite` as 
well as new tests for `StringConcat` in `StringUtilsSuite`.
    
    Closes #23406 from MaxGekk/rope-plan.
    
    Authored-by: Maxim Gekk <maxim.g...@databricks.com>
    Signed-off-by: Herman van Hovell <hvanhov...@databricks.com>
---
 .../spark/sql/catalyst/plans/QueryPlan.scala       | 17 +++++
 .../apache/spark/sql/catalyst/trees/TreeNode.scala | 38 +++++-----
 .../spark/sql/catalyst/util/StringUtils.scala      | 32 ++++++++
 .../spark/sql/catalyst/util/StringUtilsSuite.scala | 13 ++++
 .../spark/sql/execution/QueryExecution.scala       | 85 ++++++++++------------
 .../sql/execution/WholeStageCodegenExec.scala      |  8 +-
 .../apache/spark/sql/execution/debug/package.scala | 27 +++----
 7 files changed, 133 insertions(+), 87 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 125181f..8f5444e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.plans
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode}
 import org.apache.spark.sql.internal.SQLConf
@@ -301,4 +302,20 @@ object QueryPlan extends PredicateHelper {
       Nil
     }
   }
+
+  /**
+   * Converts the query plan to string and appends it via provided function.
+   */
+  def append[T <: QueryPlan[T]](
+      plan: => QueryPlan[T],
+      append: String => Unit,
+      verbose: Boolean,
+      addSuffix: Boolean,
+      maxFields: Int = SQLConf.get.maxToStringFields): Unit = {
+    try {
+      plan.treeString(append, verbose, addSuffix, maxFields)
+    } catch {
+      case e: AnalysisException => append(e.toString)
+    }
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 21e59bb..570a019 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -17,13 +17,11 @@
 
 package org.apache.spark.sql.catalyst.trees
 
-import java.io.Writer
 import java.util.UUID
 
 import scala.collection.Map
 import scala.reflect.ClassTag
 
-import org.apache.commons.io.output.StringBuilderWriter
 import org.apache.commons.lang3.ClassUtils
 import org.json4s.JsonAST._
 import org.json4s.JsonDSL._
@@ -37,6 +35,7 @@ import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, 
Partitioning}
+import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -481,21 +480,18 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
       verbose: Boolean,
       addSuffix: Boolean = false,
       maxFields: Int = SQLConf.get.maxToStringFields): String = {
-    val writer = new StringBuilderWriter()
-    try {
-      treeString(writer, verbose, addSuffix, maxFields)
-      writer.toString
-    } finally {
-      writer.close()
-    }
+    val concat = new StringConcat()
+
+    treeString(concat.append, verbose, addSuffix, maxFields)
+    concat.toString
   }
 
   def treeString(
-      writer: Writer,
+      append: String => Unit,
       verbose: Boolean,
       addSuffix: Boolean,
       maxFields: Int): Unit = {
-    generateTreeString(0, Nil, writer, verbose, "", addSuffix, maxFields)
+    generateTreeString(0, Nil, append, verbose, "", addSuffix, maxFields)
   }
 
   /**
@@ -558,7 +554,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
   def generateTreeString(
       depth: Int,
       lastChildren: Seq[Boolean],
-      writer: Writer,
+      append: String => Unit,
       verbose: Boolean,
       prefix: String = "",
       addSuffix: Boolean = false,
@@ -566,9 +562,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
 
     if (depth > 0) {
       lastChildren.init.foreach { isLast =>
-        writer.write(if (isLast) "   " else ":  ")
+        append(if (isLast) "   " else ":  ")
       }
-      writer.write(if (lastChildren.last) "+- " else ":- ")
+      append(if (lastChildren.last) "+- " else ":- ")
     }
 
     val str = if (verbose) {
@@ -576,24 +572,24 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
     } else {
       simpleString(maxFields)
     }
-    writer.write(prefix)
-    writer.write(str)
-    writer.write("\n")
+    append(prefix)
+    append(str)
+    append("\n")
 
     if (innerChildren.nonEmpty) {
       innerChildren.init.foreach(_.generateTreeString(
-        depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose,
+        depth + 2, lastChildren :+ children.isEmpty :+ false, append, verbose,
         addSuffix = addSuffix, maxFields = maxFields))
       innerChildren.last.generateTreeString(
-        depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose,
+        depth + 2, lastChildren :+ children.isEmpty :+ true, append, verbose,
         addSuffix = addSuffix, maxFields = maxFields)
     }
 
     if (children.nonEmpty) {
       children.init.foreach(_.generateTreeString(
-        depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix, 
maxFields))
+        depth + 1, lastChildren :+ false, append, verbose, prefix, addSuffix, 
maxFields))
       children.last.generateTreeString(
-        depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix, 
maxFields)
+        depth + 1, lastChildren :+ true, append, verbose, prefix, addSuffix, 
maxFields)
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
index bc861a8..643b83b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.util
 
 import java.util.regex.{Pattern, PatternSyntaxException}
 
+import scala.collection.mutable.ArrayBuffer
+
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -87,4 +89,34 @@ object StringUtils {
     }
     funcNames.toSeq
   }
+
+  /**
+   * Concatenation of sequence of strings to final string with cheap append 
method
+   * and one memory allocation for the final string.
+   */
+  class StringConcat {
+    private val strings = new ArrayBuffer[String]
+    private var length: Int = 0
+
+    /**
+     * Appends a string and accumulates its length to allocate a string buffer 
for all
+     * appended strings once in the toString method.
+     */
+    def append(s: String): Unit = {
+      if (s != null) {
+        strings.append(s)
+        length += s.length
+      }
+    }
+
+    /**
+     * The method allocates memory for all appended strings, writes them to 
the memory and
+     * returns concatenated string.
+     */
+    override def toString: String = {
+      val result = new java.lang.StringBuilder(length)
+      strings.foreach(result.append)
+      result.toString
+    }
+  }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala
index 78fee51..616ec12 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala
@@ -43,4 +43,17 @@ class StringUtilsSuite extends SparkFunSuite {
     assert(filterPattern(names, " a. ") === Seq("a1", "a2"))
     assert(filterPattern(names, " d* ") === Nil)
   }
+
+  test("string concatenation") {
+    def concat(seq: String*): String = {
+      seq.foldLeft(new StringConcat())((acc, s) => {acc.append(s); 
acc}).toString
+    }
+
+    assert(new StringConcat().toString == "")
+    assert(concat("") == "")
+    assert(concat(null) == "")
+    assert(concat("a") == "a")
+    assert(concat("1", "2") == "12")
+    assert(concat("abc", "\n", "123") == "abc\n123")
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 9b8d2e8..7fccbf6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -17,20 +17,21 @@
 
 package org.apache.spark.sql.execution
 
-import java.io.{BufferedWriter, OutputStreamWriter, Writer}
+import java.io.{BufferedWriter, OutputStreamWriter}
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
 
-import org.apache.commons.io.output.StringBuilderWriter
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
 import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
+import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.command.{DescribeTableCommand, 
ExecutedCommandExec, ShowTablesCommand}
 import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ReuseExchange}
@@ -108,10 +109,6 @@ class QueryExecution(
     ReuseExchange(sparkSession.sessionState.conf),
     ReuseSubquery(sparkSession.sessionState.conf))
 
-  protected def stringOrError[A](f: => A): String =
-    try f.toString catch { case e: AnalysisException => e.toString }
-
-
   /**
    * Returns the result as a hive compatible sequence of strings. This is used 
in tests and
    * `SparkSQLDriver` for CLI applications.
@@ -197,55 +194,53 @@ class QueryExecution(
   }
 
   def simpleString: String = withRedaction {
-    s"""== Physical Plan ==
-       |${stringOrError(executedPlan.treeString(verbose = false))}
-      """.stripMargin.trim
-  }
-
-  private def writeOrError(writer: Writer)(f: Writer => Unit): Unit = {
-    try f(writer)
-    catch {
-      case e: AnalysisException => writer.write(e.toString)
-    }
+    val concat = new StringConcat()
+    concat.append("== Physical Plan ==\n")
+    QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = 
false)
+    concat.append("\n")
+    concat.toString
   }
 
-  private def writePlans(writer: Writer, maxFields: Int): Unit = {
+  private def writePlans(append: String => Unit, maxFields: Int): Unit = {
     val (verbose, addSuffix) = (true, false)
-
-    writer.write("== Parsed Logical Plan ==\n")
-    writeOrError(writer)(logical.treeString(_, verbose, addSuffix, maxFields))
-    writer.write("\n== Analyzed Logical Plan ==\n")
-    val analyzedOutput = stringOrError(truncatedString(
-      analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", 
", maxFields))
-    writer.write(analyzedOutput)
-    writer.write("\n")
-    writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix, maxFields))
-    writer.write("\n== Optimized Logical Plan ==\n")
-    writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix, 
maxFields))
-    writer.write("\n== Physical Plan ==\n")
-    writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix, 
maxFields))
+    append("== Parsed Logical Plan ==\n")
+    QueryPlan.append(logical, append, verbose, addSuffix, maxFields)
+    append("\n== Analyzed Logical Plan ==\n")
+    val analyzedOutput = try {
+      truncatedString(
+        analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", 
", maxFields)
+    } catch {
+      case e: AnalysisException => e.toString
+    }
+    append(analyzedOutput)
+    append("\n")
+    QueryPlan.append(analyzed, append, verbose, addSuffix, maxFields)
+    append("\n== Optimized Logical Plan ==\n")
+    QueryPlan.append(optimizedPlan, append, verbose, addSuffix, maxFields)
+    append("\n== Physical Plan ==\n")
+    QueryPlan.append(executedPlan, append, verbose, addSuffix, maxFields)
   }
 
   override def toString: String = withRedaction {
-    val writer = new StringBuilderWriter()
-    try {
-      writePlans(writer, SQLConf.get.maxToStringFields)
-      writer.toString
-    } finally {
-      writer.close()
-    }
+    val concat = new StringConcat()
+    writePlans(concat.append, SQLConf.get.maxToStringFields)
+    concat.toString
   }
 
   def stringWithStats: String = withRedaction {
+    val concat = new StringConcat()
+    val maxFields = SQLConf.get.maxToStringFields
+
     // trigger to compute stats for logical plans
     optimizedPlan.stats
 
     // only show optimized logical plan and physical plan
-    s"""== Optimized Logical Plan ==
-        |${stringOrError(optimizedPlan.treeString(verbose = true, addSuffix = 
true))}
-        |== Physical Plan ==
-        |${stringOrError(executedPlan.treeString(verbose = true))}
-    """.stripMargin.trim
+    concat.append("== Optimized Logical Plan ==\n")
+    QueryPlan.append(optimizedPlan, concat.append, verbose = true, addSuffix = 
true, maxFields)
+    concat.append("\n== Physical Plan ==\n")
+    QueryPlan.append(executedPlan, concat.append, verbose = true, addSuffix = 
false, maxFields)
+    concat.append("\n")
+    concat.toString
   }
 
   /**
@@ -282,7 +277,7 @@ class QueryExecution(
     /**
      * Dumps debug information about query execution into the specified file.
      *
-     * @param maxFields maximim number of fields converted to string 
representation.
+     * @param maxFields maximum number of fields converted to string 
representation.
      */
     def toFile(path: String, maxFields: Int = Int.MaxValue): Unit = {
       val filePath = new Path(path)
@@ -290,9 +285,9 @@ class QueryExecution(
       val writer = new BufferedWriter(new 
OutputStreamWriter(fs.create(filePath)))
 
       try {
-        writePlans(writer, maxFields)
+        writePlans(writer.write, maxFields)
         writer.write("\n== Whole Stage Codegen ==\n")
-        org.apache.spark.sql.execution.debug.writeCodegen(writer, executedPlan)
+        org.apache.spark.sql.execution.debug.writeCodegen(writer.write, 
executedPlan)
       } finally {
         writer.close()
       }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index f4927de..3b0a996 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -493,7 +493,7 @@ case class InputAdapter(child: SparkPlan) extends 
UnaryExecNode with InputRDDCod
   override def generateTreeString(
       depth: Int,
       lastChildren: Seq[Boolean],
-      writer: Writer,
+      append: String => Unit,
       verbose: Boolean,
       prefix: String = "",
       addSuffix: Boolean = false,
@@ -501,7 +501,7 @@ case class InputAdapter(child: SparkPlan) extends 
UnaryExecNode with InputRDDCod
     child.generateTreeString(
       depth,
       lastChildren,
-      writer,
+      append,
       verbose,
       prefix = "",
       addSuffix = false,
@@ -777,7 +777,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val 
codegenStageId: Int)
   override def generateTreeString(
       depth: Int,
       lastChildren: Seq[Boolean],
-      writer: Writer,
+      append: String => Unit,
       verbose: Boolean,
       prefix: String = "",
       addSuffix: Boolean = false,
@@ -785,7 +785,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val 
codegenStageId: Int)
     child.generateTreeString(
       depth,
       lastChildren,
-      writer,
+      append,
       verbose,
       s"*($codegenStageId) ",
       false,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index ae8197f..53b74c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -17,13 +17,10 @@
 
 package org.apache.spark.sql.execution
 
-import java.io.Writer
 import java.util.Collections
 
 import scala.collection.JavaConverters._
 
-import org.apache.commons.io.output.StringBuilderWriter
-
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
@@ -32,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, 
CodegenContext, ExprCode}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.trees.TreeNodeRef
+import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
 import org.apache.spark.sql.execution.streaming.{StreamExecution, 
StreamingQueryWrapper}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.StreamingQuery
@@ -73,24 +71,19 @@ package object debug {
    * @return single String containing all WholeStageCodegen subtrees and 
corresponding codegen
    */
   def codegenString(plan: SparkPlan): String = {
-    val writer = new StringBuilderWriter()
-
-    try {
-      writeCodegen(writer, plan)
-      writer.toString
-    } finally {
-      writer.close()
-    }
+    val concat = new StringConcat()
+    writeCodegen(concat.append, plan)
+    concat.toString
   }
 
-  def writeCodegen(writer: Writer, plan: SparkPlan): Unit = {
+  def writeCodegen(append: String => Unit, plan: SparkPlan): Unit = {
     val codegenSeq = codegenStringSeq(plan)
-    writer.write(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n")
+    append(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n")
     for (((subtree, code), i) <- codegenSeq.zipWithIndex) {
-      writer.write(s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n")
-      writer.write(subtree)
-      writer.write("\nGenerated code:\n")
-      writer.write(s"${code}\n")
+      append(s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n")
+      append(subtree)
+      append("\nGenerated code:\n")
+      append(s"${code}\n")
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to