This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 812ad55 [SPARK-26103][SQL] Limit the length of debug strings for
query plans
812ad55 is described below
commit 812ad5546148d2194ab0e4230ee85b8f6a5be2fb
Author: Dave DeCaprio <[email protected]>
AuthorDate: Wed Mar 13 09:58:43 2019 -0700
[SPARK-26103][SQL] Limit the length of debug strings for query plans
## What changes were proposed in this pull request?
The PR puts in a limit on the size of a debug string generated for a tree
node. Helps to fix out of memory errors when large plans have huge debug
strings. In addition to SPARK-26103, this should also address SPARK-23904 and
SPARK-25380. AN alternative solution was proposed in #23076, but that solution
doesn't address all the cases that can cause a large query. This limit is only
on calls treeString that don't pass a Writer, which makes it play nicely with
#22429, #23018 and #230 [...]
- A new configuration parameter called spark.sql.debug.maxPlanLength was
added to control the length of the plans.
- When plans are truncated, "..." is printed to indicate that it isn't a
full plan
- A warning is printed out the first time a truncated plan is displayed.
The warning explains what happened and how to adjust the limit.
## How was this patch tested?
Unit tests were created for the new SizeLimitedWriter. Also a unit test
for TreeNode was created that checks that a long plan is correctly truncated.
Closes #23169 from DaveDeCaprio/text-plan-size.
Lead-authored-by: Dave DeCaprio <[email protected]>
Co-authored-by: David DeCaprio <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
---
.../apache/spark/sql/catalyst/trees/TreeNode.scala | 4 +-
.../spark/sql/catalyst/util/StringUtils.scala | 58 ++++++++++++++++++----
.../org/apache/spark/sql/internal/SQLConf.scala | 13 +++++
.../spark/sql/catalyst/trees/TreeNodeSuite.scala | 29 ++++++++++-
.../spark/sql/catalyst/util/StringUtilsSuite.scala | 33 +++++++++---
.../spark/sql/execution/QueryExecution.scala | 14 +++---
6 files changed, 126 insertions(+), 25 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index d214ebb..72b1931 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode,
Partitioning}
-import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
+import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat,
StringConcat}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -480,7 +480,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
extends Product {
verbose: Boolean,
addSuffix: Boolean = false,
maxFields: Int = SQLConf.get.maxToStringFields): String = {
- val concat = new StringConcat()
+ val concat = new PlanStringConcat()
treeString(concat.append, verbose, addSuffix, maxFields)
concat.toString
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
index 643b83b..6118d8c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
@@ -17,14 +17,18 @@
package org.apache.spark.sql.catalyst.util
+import java.util.concurrent.atomic.AtomicBoolean
import java.util.regex.{Pattern, PatternSyntaxException}
import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.types.UTF8String
-object StringUtils {
+object StringUtils extends Logging {
/**
* Validate and convert SQL 'like' pattern to a Java regular expression.
@@ -92,20 +96,29 @@ object StringUtils {
/**
* Concatenation of sequence of strings to final string with cheap append
method
- * and one memory allocation for the final string.
+ * and one memory allocation for the final string. Can also bound the final
size of
+ * the string.
*/
- class StringConcat {
- private val strings = new ArrayBuffer[String]
- private var length: Int = 0
+ class StringConcat(val maxLength: Int =
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+ protected val strings = new ArrayBuffer[String]
+ protected var length: Int = 0
+
+ def atLimit: Boolean = length >= maxLength
/**
* Appends a string and accumulates its length to allocate a string buffer
for all
- * appended strings once in the toString method.
+ * appended strings once in the toString method. Returns true if the
string still
+ * has room for further appends before it hits its max limit.
*/
def append(s: String): Unit = {
if (s != null) {
- strings.append(s)
- length += s.length
+ val sLen = s.length
+ if (!atLimit) {
+ val available = maxLength - length
+ val stringToAppend = if (available >= sLen) s else s.substring(0,
available)
+ strings.append(stringToAppend)
+ }
+ length += sLen
}
}
@@ -114,9 +127,36 @@ object StringUtils {
* returns concatenated string.
*/
override def toString: String = {
- val result = new java.lang.StringBuilder(length)
+ val finalLength = if (atLimit) maxLength else length
+ val result = new java.lang.StringBuilder(finalLength)
strings.foreach(result.append)
result.toString
}
}
+
+ /**
+ * A string concatenator for plan strings. Uses length from a configured
value, and
+ * prints a warning the first time a plan is truncated.
+ */
+ class PlanStringConcat extends StringConcat(Math.max(0,
SQLConf.get.maxPlanStringLength - 30)) {
+ override def toString: String = {
+ if (atLimit) {
+ logWarning(
+ "Truncated the string representation of a plan since it was too
long. The " +
+ s"plan had length ${length} and the maximum is ${maxLength}. This
behavior " +
+ "can be adjusted by setting
'${SQLConf.MAX_PLAN_STRING_LENGTH.key}'.")
+ val truncateMsg = if (maxLength == 0) {
+ s"Truncated plan of $length characters"
+ } else {
+ s"... ${length - maxLength} more characters"
+ }
+ val result = new java.lang.StringBuilder(maxLength +
truncateMsg.length)
+ strings.foreach(result.append)
+ result.append(truncateMsg)
+ result.toString
+ } else {
+ super.toString
+ }
+ }
+ }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 193d311..20f4080 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1691,6 +1691,17 @@ object SQLConf {
.intConf
.createWithDefault(25)
+ val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanStringLength")
+ .doc("Maximum number of characters to output for a plan string. If the
plan is " +
+ "longer, further output will be truncated. The default setting always
generates a full " +
+ "plan. Set this to a lower value such as 8k if plan strings are taking
up too much " +
+ "memory or are causing OutOfMemory errors in the driver or UI
processes.")
+ .bytesConf(ByteUnit.BYTE)
+ .checkValue(i => i >= 0 && i <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
"Invalid " +
+ "value for 'spark.sql.maxPlanStringLength'. Length must be a valid
string length " +
+ "(nonnegative and shorter than the maximum size).")
+ .createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
+
val SET_COMMAND_REJECTS_SPARK_CORE_CONFS =
buildConf("spark.sql.legacy.setCommandRejectsSparkCoreConfs")
.internal()
@@ -2146,6 +2157,8 @@ class SQLConf extends Serializable with Logging {
def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS)
+ def maxPlanStringLength: Int = getConf(SQLConf.MAX_PLAN_STRING_LENGTH).toInt
+
def setCommandRejectsSparkCoreConfs: Boolean =
getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CORE_CONFS)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index cb911d7..e7ad04f 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -33,9 +33,10 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.dsl.expressions.DslString
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.catalyst.plans.{LeftOuter, NaturalJoin}
+import org.apache.spark.sql.catalyst.plans.{LeftOuter, NaturalJoin, SQLHelper}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Union}
import org.apache.spark.sql.catalyst.plans.physical.{IdentityBroadcastMode,
RoundRobinPartitioning, SinglePartition}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
@@ -81,7 +82,7 @@ case class SelfReferenceUDF(
def apply(key: String): Boolean = config.contains(key)
}
-class TreeNodeSuite extends SparkFunSuite {
+class TreeNodeSuite extends SparkFunSuite with SQLHelper {
test("top node changed") {
val after = Literal(1) transform { case Literal(1, _) => Literal(2) }
assert(after === Literal(2))
@@ -595,4 +596,28 @@ class TreeNodeSuite extends SparkFunSuite {
val expected = Coalesce(Stream(Literal(1), Literal(3)))
assert(result === expected)
}
+
+ test("treeString limits plan length") {
+ withSQLConf(SQLConf.MAX_PLAN_STRING_LENGTH.key -> "200") {
+ val ds = (1 until 20).foldLeft(Literal("TestLiteral"): Expression) {
case (treeNode, x) =>
+ Add(Literal(x), treeNode)
+ }
+
+ val planString = ds.treeString
+ logWarning("Plan string: " + planString)
+ assert(planString.endsWith(" more characters"))
+ assert(planString.length <= SQLConf.get.maxPlanStringLength)
+ }
+ }
+
+ test("treeString limit at zero") {
+ withSQLConf(SQLConf.MAX_PLAN_STRING_LENGTH.key -> "0") {
+ val ds = (1 until 2).foldLeft(Literal("TestLiteral"): Expression) { case
(treeNode, x) =>
+ Add(Literal(x), treeNode)
+ }
+
+ val planString = ds.treeString
+ assert(planString.startsWith("Truncated plan of"))
+ }
+ }
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala
index 616ec12..63d3831 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala
@@ -46,14 +46,35 @@ class StringUtilsSuite extends SparkFunSuite {
test("string concatenation") {
def concat(seq: String*): String = {
- seq.foldLeft(new StringConcat())((acc, s) => {acc.append(s);
acc}).toString
+ seq.foldLeft(new StringConcat()) { (acc, s) => acc.append(s); acc
}.toString
}
assert(new StringConcat().toString == "")
- assert(concat("") == "")
- assert(concat(null) == "")
- assert(concat("a") == "a")
- assert(concat("1", "2") == "12")
- assert(concat("abc", "\n", "123") == "abc\n123")
+ assert(concat("") === "")
+ assert(concat(null) === "")
+ assert(concat("a") === "a")
+ assert(concat("1", "2") === "12")
+ assert(concat("abc", "\n", "123") === "abc\n123")
+ }
+
+ test("string concatenation with limit") {
+ def concat(seq: String*): String = {
+ seq.foldLeft(new StringConcat(7)) { (acc, s) => acc.append(s); acc
}.toString
+ }
+ assert(concat("under") === "under")
+ assert(concat("under", "over", "extra") === "underov")
+ assert(concat("underover") === "underov")
+ assert(concat("under", "ov") === "underov")
+ }
+
+ test("string concatenation return value") {
+ def checkLimit(s: String): Boolean = {
+ val sc = new StringConcat(7)
+ sc.append(s)
+ sc.atLimit
+ }
+ assert(!checkLimit("under"))
+ assert(checkLimit("1234567"))
+ assert(checkLimit("1234567890"))
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 49d6acf..5d2710bd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -28,7 +28,7 @@ import
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
+import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat,
StringConcat}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.exchange.{EnsureRequirements,
ReuseExchange}
import org.apache.spark.sql.internal.SQLConf
@@ -114,7 +114,7 @@ class QueryExecution(
ReuseSubquery(sparkSession.sessionState.conf))
def simpleString: String = withRedaction {
- val concat = new StringConcat()
+ val concat = new PlanStringConcat()
concat.append("== Physical Plan ==\n")
QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix =
false)
concat.append("\n")
@@ -142,13 +142,13 @@ class QueryExecution(
}
override def toString: String = withRedaction {
- val concat = new StringConcat()
+ val concat = new PlanStringConcat()
writePlans(concat.append, SQLConf.get.maxToStringFields)
concat.toString
}
def stringWithStats: String = withRedaction {
- val concat = new StringConcat()
+ val concat = new PlanStringConcat()
val maxFields = SQLConf.get.maxToStringFields
// trigger to compute stats for logical plans
@@ -203,9 +203,11 @@ class QueryExecution(
val filePath = new Path(path)
val fs =
filePath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val writer = new BufferedWriter(new
OutputStreamWriter(fs.create(filePath)))
-
+ val append = (s: String) => {
+ writer.write(s)
+ }
try {
- writePlans(writer.write, maxFields)
+ writePlans(append, maxFields)
writer.write("\n== Whole Stage Codegen ==\n")
org.apache.spark.sql.execution.debug.writeCodegen(writer.write,
executedPlan)
} finally {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]