This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new efca188b7b3 [SPARK-44485][SQL] Optimize TreeNode.generateTreeString
efca188b7b3 is described below

commit efca188b7b33d0c0fe3cf9ca83286741f7f57dc7
Author: Ziqi Liu <[email protected]>
AuthorDate: Sat Jul 22 16:43:31 2023 +0900

    [SPARK-44485][SQL] Optimize TreeNode.generateTreeString
    
    ### What changes were proposed in this pull request?
    
    Optimize several critical code path in `TreeNode.generateTreeString`
    
    ### Why are the changes needed?
    
    In `TreeNode.generateTreeString`, we observed inefficiency in scala 
collection operations and virtual function call in our internal workload.
    
    This inefficiency become significant in large plan (we hit a example of 
more than 1000 nodes). So it’s worth optimizing the super hot code path. By 
rewriting into native Java code(not so sweet as scala syntax sugar though), we 
should be able to get rid of most of the overhead.
    
    - `ArrayBuffer.append`
    <img width="440" alt="itable1" 
src="https://github.com/apache/spark/assets/22358241/3e1d2e5e-1eeb-46ef-ab7a-20f4cb75f602";>
    
    - `Seq.last`
    <img width="302" alt="itable2" 
src="https://github.com/apache/spark/assets/22358241/23f29695-8a01-4c8e-b75a-148a92278c2b";>
    
    - `SeqLike.$colon$plus`
    <img width="281" alt="itable3" 
src="https://github.com/apache/spark/assets/22358241/f0526746-62d0-4556-99be-04a24ab805d2";>
    
    - `StringOps.$times`
    <img width="334" alt="itable4" 
src="https://github.com/apache/spark/assets/22358241/3a46f18e-7027-421e-aa5a-130d02e1c19c";>
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing UTs
    
    Closes #42095 from liuzqt/SPARK-44485.
    
    Authored-by: Ziqi Liu <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
    (cherry picked from commit 09c44fd05d10dccc131885f94a61da0885de0ad0)
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../spark/sql/catalyst/util/StringUtils.scala      |  8 ++---
 .../apache/spark/sql/catalyst/trees/TreeNode.scala | 35 ++++++++++++++++------
 .../spark/sql/catalyst/util/StringUtils.scala      |  2 +-
 .../sql/execution/WholeStageCodegenExec.scala      |  4 +--
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |  8 +++--
 .../sql/execution/adaptive/QueryStageExec.scala    |  6 ++--
 .../sql/execution/basicPhysicalOperators.scala     |  2 +-
 7 files changed, 42 insertions(+), 23 deletions(-)

diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
index c12a1f50daa..20fb8bb94bd 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
@@ -18,8 +18,6 @@ package org.apache.spark.sql.catalyst.util
 
 import java.util.concurrent.atomic.AtomicBoolean
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.apache.spark.internal.Logging
 import org.apache.spark.unsafe.array.ByteArrayUtils
 
@@ -29,7 +27,7 @@ import org.apache.spark.unsafe.array.ByteArrayUtils
  * the string.
  */
 class StringConcat(val maxLength: Int = 
ByteArrayUtils.MAX_ROUNDED_ARRAY_LENGTH) {
-  protected val strings = new ArrayBuffer[String]
+  protected val strings = new java.util.ArrayList[String]
   protected var length: Int = 0
 
   def atLimit: Boolean = length >= maxLength
@@ -45,7 +43,7 @@ class StringConcat(val maxLength: Int = 
ByteArrayUtils.MAX_ROUNDED_ARRAY_LENGTH)
       if (!atLimit) {
         val available = maxLength - length
         val stringToAppend = if (available >= sLen) s else s.substring(0, 
available)
-        strings.append(stringToAppend)
+        strings.add(stringToAppend)
       }
 
       // Keeps the total length of appended strings. Note that we need to cap 
the length at
@@ -62,7 +60,7 @@ class StringConcat(val maxLength: Int = 
ByteArrayUtils.MAX_ROUNDED_ARRAY_LENGTH)
   override def toString: String = {
     val finalLength = if (atLimit) maxLength else length
     val result = new java.lang.StringBuilder(finalLength)
-    strings.foreach(result.append)
+    strings.forEach(s => result.append(s))
     result.toString
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 8d3f81666f8..9e605a45414 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -936,7 +936,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
       addSuffix: Boolean,
       maxFields: Int,
       printOperatorId: Boolean): Unit = {
-    generateTreeString(0, Nil, append, verbose, "", addSuffix, maxFields, 
printOperatorId, 0)
+    generateTreeString(0, new java.util.ArrayList(), append, verbose, "", 
addSuffix, maxFields,
+      printOperatorId, 0)
   }
 
   /**
@@ -998,7 +999,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
    */
   def generateTreeString(
       depth: Int,
-      lastChildren: Seq[Boolean],
+      lastChildren: java.util.ArrayList[Boolean],
       append: String => Unit,
       verbose: Boolean,
       prefix: String = "",
@@ -1006,12 +1007,14 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
       maxFields: Int,
       printNodeId: Boolean,
       indent: Int = 0): Unit = {
-    append("   " * indent)
+    (0 until indent).foreach(_ => append("   "))
     if (depth > 0) {
-      lastChildren.init.foreach { isLast =>
+      val iter = lastChildren.iterator
+      (0 until lastChildren.size - 1).foreach { i =>
+        val isLast = iter.next
         append(if (isLast) "   " else ":  ")
       }
-      append(if (lastChildren.last) "+- " else ":- ")
+      append(if (lastChildren.get(lastChildren.size() - 1)) "+- " else ":- ")
     }
 
     val str = if (verbose) {
@@ -1028,22 +1031,36 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
     append("\n")
 
     if (innerChildren.nonEmpty) {
+      lastChildren.add(children.isEmpty)
+      lastChildren.add(false)
       innerChildren.init.foreach(_.generateTreeString(
-        depth + 2, lastChildren :+ children.isEmpty :+ false, append, verbose,
+        depth + 2, lastChildren, append, verbose,
         addSuffix = addSuffix, maxFields = maxFields, printNodeId = 
printNodeId, indent = indent))
+      lastChildren.remove(lastChildren.size() - 1)
+      lastChildren.remove(lastChildren.size() - 1)
+
+      lastChildren.add(children.isEmpty)
+      lastChildren.add(true)
       innerChildren.last.generateTreeString(
-        depth + 2, lastChildren :+ children.isEmpty :+ true, append, verbose,
+        depth + 2, lastChildren, append, verbose,
         addSuffix = addSuffix, maxFields = maxFields, printNodeId = 
printNodeId, indent = indent)
+      lastChildren.remove(lastChildren.size() - 1)
+      lastChildren.remove(lastChildren.size() - 1)
     }
 
     if (children.nonEmpty) {
+      lastChildren.add(false)
       children.init.foreach(_.generateTreeString(
-        depth + 1, lastChildren :+ false, append, verbose, prefix, addSuffix,
+        depth + 1, lastChildren, append, verbose, prefix, addSuffix,
         maxFields, printNodeId = printNodeId, indent = indent)
       )
+      lastChildren.remove(lastChildren.size() - 1)
+
+      lastChildren.add(true)
       children.last.generateTreeString(
-        depth + 1, lastChildren :+ true, append, verbose, prefix,
+        depth + 1, lastChildren, append, verbose, prefix,
         addSuffix, maxFields, printNodeId = printNodeId, indent = indent)
+      lastChildren.remove(lastChildren.size() - 1)
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
index ccf6e5b57ac..f6d76bd139d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
@@ -145,7 +145,7 @@ object StringUtils extends Logging {
           s"... ${length - maxLength} more characters"
         }
         val result = new java.lang.StringBuilder(maxLength + 
truncateMsg.length)
-        strings.foreach(result.append)
+        strings.forEach(s => result.append(s))
         result.append(truncateMsg)
         result.toString
       } else {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index ddc2cfb56d4..5fc51cc6e31 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -534,7 +534,7 @@ case class InputAdapter(child: SparkPlan) extends 
UnaryExecNode with InputRDDCod
 
   override def generateTreeString(
       depth: Int,
-      lastChildren: Seq[Boolean],
+      lastChildren: java.util.ArrayList[Boolean],
       append: String => Unit,
       verbose: Boolean,
       prefix: String = "",
@@ -800,7 +800,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val 
codegenStageId: Int)
 
   override def generateTreeString(
       depth: Int,
-      lastChildren: Seq[Boolean],
+      lastChildren: java.util.ArrayList[Boolean],
       append: String => Unit,
       verbose: Boolean,
       prefix: String = "",
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 26c95c0fedd..36895b17aa8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -408,7 +408,7 @@ case class AdaptiveSparkPlanExec(
 
   override def generateTreeString(
       depth: Int,
-      lastChildren: Seq[Boolean],
+      lastChildren: java.util.ArrayList[Boolean],
       append: String => Unit,
       verbose: Boolean,
       prefix: String = "",
@@ -427,9 +427,10 @@ case class AdaptiveSparkPlanExec(
       printNodeId,
       indent)
     if (currentPhysicalPlan.fastEquals(initialPlan)) {
+      lastChildren.add(true)
       currentPhysicalPlan.generateTreeString(
         depth + 1,
-        lastChildren :+ true,
+        lastChildren,
         append,
         verbose,
         prefix = "",
@@ -437,6 +438,7 @@ case class AdaptiveSparkPlanExec(
         maxFields,
         printNodeId,
         indent)
+      lastChildren.remove(lastChildren.size() - 1)
     } else {
       generateTreeStringWithHeader(
         if (isFinalPlan) "Final Plan" else "Current Plan",
@@ -470,7 +472,7 @@ case class AdaptiveSparkPlanExec(
     append(s"+- == $header ==\n")
     plan.generateTreeString(
       0,
-      Nil,
+      new java.util.ArrayList(),
       append,
       verbose,
       prefix = "",
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index c6234a40726..b941feb12fc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -110,7 +110,7 @@ abstract class QueryStageExec extends LeafExecNode {
 
   override def generateTreeString(
       depth: Int,
-      lastChildren: Seq[Boolean],
+      lastChildren: java.util.ArrayList[Boolean],
       append: String => Unit,
       verbose: Boolean,
       prefix: String = "",
@@ -127,8 +127,10 @@ abstract class QueryStageExec extends LeafExecNode {
       maxFields,
       printNodeId,
       indent)
+    lastChildren.add(true)
     plan.generateTreeString(
-      depth + 1, lastChildren :+ true, append, verbose, "", false, maxFields, 
printNodeId, indent)
+      depth + 1, lastChildren, append, verbose, "", false, maxFields, 
printNodeId, indent)
+    lastChildren.remove(lastChildren.size() - 1)
   }
 
   override protected[sql] def cleanupResources(): Unit = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 68f056d894b..2fd79935507 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -783,7 +783,7 @@ abstract class BaseSubqueryExec extends SparkPlan {
 
   override def generateTreeString(
       depth: Int,
-      lastChildren: Seq[Boolean],
+      lastChildren: java.util.ArrayList[Boolean],
       append: String => Unit,
       verbose: Boolean,
       prefix: String = "",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to