Repository: spark Updated Branches: refs/heads/branch-2.0 3119d8eef -> 00bbf7873
[SPARK-15794] Should truncate toString() of very wide plans ## What changes were proposed in this pull request? With very wide tables, e.g. thousands of fields, the plan output is unreadable and often causes OOMs due to inefficient string processing. This truncates all struct and operator field lists to a user configurable threshold to limit performance impact. It would also be nice to optimize string generation to avoid these sort of O(n^2) slowdowns entirely (i.e. use StringBuilder everywhere including expressions), but this is probably too large of a change for 2.0 at this point, and truncation has other benefits for usability. ## How was this patch tested? Added a microbenchmark that covers this case particularly well. I also ran the microbenchmark while varying the truncation threshold. ``` numFields = 5 wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ 2000 wide x 50 rows (write in-mem) 2336 / 2558 0.0 23364.4 0.1X numFields = 25 wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ 2000 wide x 50 rows (write in-mem) 4237 / 4465 0.0 42367.9 0.1X numFields = 100 wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ 2000 wide x 50 rows (write in-mem) 10458 / 11223 0.0 104582.0 0.0X numFields = Infinity wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ [info] java.lang.OutOfMemoryError: Java heap space ``` Author: Eric Liang <[email protected]> Author: Eric Liang <[email protected]> Closes #13537 from ericl/truncated-string. (cherry picked from commit b914e1930fd5c5f2808f92d4958ec6fbeddf2e30) Signed-off-by: Josh Rosen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00bbf787 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00bbf787 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00bbf787 Branch: refs/heads/branch-2.0 Commit: 00bbf787340e208cc76230ffd96026c1305f942c Parents: 3119d8e Author: Eric Liang <[email protected]> Authored: Thu Jun 9 18:05:16 2016 -0700 Committer: Josh Rosen <[email protected]> Committed: Thu Jun 9 18:05:30 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/util/Utils.scala | 47 +++++++++++++++++++ .../org/apache/spark/util/UtilsSuite.scala | 8 ++++ .../sql/catalyst/expressions/Expression.scala | 4 +- .../spark/sql/catalyst/trees/TreeNode.scala | 6 +-- .../org/apache/spark/sql/types/StructType.scala | 7 +-- .../spark/sql/execution/ExistingRDD.scala | 10 ++-- .../spark/sql/execution/QueryExecution.scala | 5 +- .../execution/aggregate/HashAggregateExec.scala | 7 +-- .../execution/aggregate/SortAggregateExec.scala | 7 +-- .../execution/datasources/LogicalRelation.scala | 3 +- .../org/apache/spark/sql/execution/limit.scala | 5 +- .../execution/streaming/StreamExecution.scala | 3 +- .../spark/sql/execution/streaming/memory.scala | 3 +- .../benchmark/WideSchemaBenchmark.scala | 49 ++++++++++++++++++++ 14 files changed, 140 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1a9dbca..f9d0540 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -26,6 +26,7 @@ import java.nio.charset.StandardCharsets import java.nio.file.Files import java.util.{Locale, Properties, Random, UUID} import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicBoolean import javax.net.ssl.HttpsURLConnection import scala.annotation.tailrec @@ -78,6 +79,52 @@ private[spark] object Utils extends Logging { private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 @volatile private var localRootDirs: Array[String] = null + /** + * The performance overhead of creating and logging strings for wide schemas can be large. To + * limit the impact, we bound the number of fields to include by default. This can be overriden + * by setting the 'spark.debug.maxToStringFields' conf in SparkEnv. + */ + val DEFAULT_MAX_TO_STRING_FIELDS = 25 + + private def maxNumToStringFields = { + if (SparkEnv.get != null) { + SparkEnv.get.conf.getInt("spark.debug.maxToStringFields", DEFAULT_MAX_TO_STRING_FIELDS) + } else { + DEFAULT_MAX_TO_STRING_FIELDS + } + } + + /** Whether we have warned about plan string truncation yet. */ + private val truncationWarningPrinted = new AtomicBoolean(false) + + /** + * Format a sequence with semantics similar to calling .mkString(). Any elements beyond + * maxNumToStringFields will be dropped and replaced by a "... N more fields" placeholder. + * + * @return the trimmed and formatted string. + */ + def truncatedString[T]( + seq: Seq[T], + start: String, + sep: String, + end: String, + maxNumFields: Int = maxNumToStringFields): String = { + if (seq.length > maxNumFields) { + if (truncationWarningPrinted.compareAndSet(false, true)) { + logWarning( + "Truncated the string representation of a plan since it was too large. This " + + "behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.") + } + val numFields = math.max(0, maxNumFields - 1) + seq.take(numFields).mkString( + start, sep, sep + "... " + (seq.length - numFields) + " more fields" + end) + } else { + seq.mkString(start, sep, end) + } + } + + /** Shorthand for calling truncatedString() without start or end strings. */ + def truncatedString[T](seq: Seq[T], sep: String): String = truncatedString(seq, "", sep, "") /** Serialize an object using Java serialization */ def serialize[T](o: T): Array[Byte] = { http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 6698749..a5363f0 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -40,6 +40,14 @@ import org.apache.spark.network.util.ByteUnit class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { + test("truncatedString") { + assert(Utils.truncatedString(Nil, "[", ", ", "]", 2) == "[]") + assert(Utils.truncatedString(Seq(1, 2), "[", ", ", "]", 2) == "[1, 2]") + assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", 2) == "[1, ... 2 more fields]") + assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", -5) == "[, ... 3 more fields]") + assert(Utils.truncatedString(Seq(1, 2, 3), ", ") == "1, 2, 3") + } + test("timeConversion") { // Test -1 assert(Utils.timeStringAsSeconds("-1") === -1) http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index efe592d..10a1412 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils //////////////////////////////////////////////////////////////////////////////////////////////////// // This file defines the basic expression abstract classes in Catalyst. @@ -196,7 +197,8 @@ abstract class Expression extends TreeNode[Expression] { override def simpleString: String = toString - override def toString: String = prettyName + flatArguments.mkString("(", ", ", ")") + override def toString: String = prettyName + Utils.truncatedString( + flatArguments.toSeq, "(", ", ", ")") /** * Returns SQL representation of this expression. For expressions extending [[NonSQLExpression]], http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index c67366d..f924efe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -448,10 +448,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { case tn: TreeNode[_] => tn.simpleString :: Nil case seq: Seq[Any] if seq.toSet.subsetOf(allChildren.asInstanceOf[Set[Any]]) => Nil case iter: Iterable[_] if iter.isEmpty => Nil - case seq: Seq[_] => seq.mkString("[", ", ", "]") :: Nil - case set: Set[_] => set.mkString("{", ", ", "}") :: Nil + case seq: Seq[_] => Utils.truncatedString(seq, "[", ", ", "]") :: Nil + case set: Set[_] => Utils.truncatedString(set.toSeq, "{", ", ", "}") :: Nil case array: Array[_] if array.isEmpty => Nil - case array: Array[_] => array.mkString("[", ", ", "]") :: Nil + case array: Array[_] => Utils.truncatedString(array, "[", ", ", "]") :: Nil case null => Nil case None => Nil case Some(null) => Nil http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 9a92373..436512f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -22,11 +22,12 @@ import scala.util.Try import org.json4s.JsonDSL._ -import org.apache.spark.SparkException +import org.apache.spark.{SparkEnv, SparkException} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, InterpretedOrdering} import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, LegacyTypeStringParser} import org.apache.spark.sql.catalyst.util.quoteIdentifier +import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -293,8 +294,8 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru override def defaultSize: Int = fields.map(_.dataType.defaultSize).sum override def simpleString: String = { - val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}") - s"struct<${fieldTypes.mkString(",")}>" + val fieldTypes = fields.view.map(field => s"${field.name}:${field.dataType.simpleString}") + Utils.truncatedString(fieldTypes, "struct<", ",", ">") } override def sql: String = { http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index b8b3926..9ab98fd1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.util.Utils object RDDConversions { def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = { @@ -123,7 +124,7 @@ private[sql] case class RDDScanExec( } override def simpleString: String = { - s"Scan $nodeName${output.mkString("[", ",", "]")}" + s"Scan $nodeName${Utils.truncatedString(output, "[", ",", "]")}" } } @@ -186,7 +187,8 @@ private[sql] case class RowDataSourceScanExec( key + ": " + StringUtils.abbreviate(value, 100) } - s"$nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" + s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}" + + s"${Utils.truncatedString(metadataEntries, " ", ", ", "")}" } override def inputRDDs(): Seq[RDD[InternalRow]] = { @@ -239,8 +241,8 @@ private[sql] case class BatchedDataSourceScanExec( val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield { key + ": " + StringUtils.abbreviate(value, 100) } - val metadataStr = metadataEntries.mkString(" ", ", ", "") - s"Batched$nodeName${output.mkString("[", ",", "]")}$metadataStr" + val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "") + s"Batched$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr" } override def inputRDDs(): Seq[RDD[InternalRow]] = { http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 560214a..a2d4502 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCom import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _} +import org.apache.spark.util.Utils /** * The primary workflow for executing relational queries using Spark. Designed to allow easy @@ -206,8 +207,8 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { } override def toString: String = { - def output = - analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ") + def output = Utils.truncatedString( + analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ") val analyzedPlan = Seq( stringOrError(output), stringOrError(analyzed.treeString(verbose = true)) http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index b617e26..caeeba1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.types.{DecimalType, StringType, StructType} import org.apache.spark.unsafe.KVIterator +import org.apache.spark.util.Utils /** * Hash-based aggregate operator that can also fallback to sorting when data exceeds memory size. @@ -773,9 +774,9 @@ case class HashAggregateExec( testFallbackStartsAt match { case None => - val keyString = groupingExpressions.mkString("[", ",", "]") - val functionString = allAggregateExpressions.mkString("[", ",", "]") - val outputString = output.mkString("[", ",", "]") + val keyString = Utils.truncatedString(groupingExpressions, "[", ",", "]") + val functionString = Utils.truncatedString(allAggregateExpressions, "[", ",", "]") + val outputString = Utils.truncatedString(output, "[", ",", "]") if (verbose) { s"HashAggregate(key=$keyString, functions=$functionString, output=$outputString)" } else { http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index 41ba9f5..1712651 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, UnspecifiedDistribution} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.util.Utils /** * Sort-based aggregate operator. @@ -110,9 +111,9 @@ case class SortAggregateExec( private def toString(verbose: Boolean): String = { val allAggregateExpressions = aggregateExpressions - val keyString = groupingExpressions.mkString("[", ",", "]") - val functionString = allAggregateExpressions.mkString("[", ",", "]") - val outputString = output.mkString("[", ",", "]") + val keyString = Utils.truncatedString(groupingExpressions, "[", ",", "]") + val functionString = Utils.truncatedString(allAggregateExpressions, "[", ",", "]") + val outputString = Utils.truncatedString(output, "[", ",", "]") if (verbose) { s"SortAggregate(key=$keyString, functions=$functionString, output=$outputString)" } else { http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 0e0748f..a418d02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.util.Utils /** * Used to link a [[BaseRelation]] in to a logical query plan. @@ -82,5 +83,5 @@ case class LogicalRelation( expectedOutputAttributes, metastoreTableIdentifier).asInstanceOf[this.type] - override def simpleString: String = s"Relation[${output.mkString(",")}] $relation" + override def simpleString: String = s"Relation[${Utils.truncatedString(output, ",")}] $relation" } http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index b71f333..781c016 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, LazilyGeneratedOrdering} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.exchange.ShuffleExchange +import org.apache.spark.util.Utils /** @@ -159,8 +160,8 @@ case class TakeOrderedAndProjectExec( override def outputOrdering: Seq[SortOrder] = sortOrder override def simpleString: String = { - val orderByString = sortOrder.mkString("[", ",", "]") - val outputString = output.mkString("[", ",", "]") + val orderByString = Utils.truncatedString(sortOrder, "[", ",", "]") + val outputString = Utils.truncatedString(output, "[", ",", "]") s"TakeOrderedAndProject(limit=$limit, orderBy=$orderByString, output=$outputString)" } http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index d9800e4..954fc33 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -336,7 +336,8 @@ class StreamExecution( newData.get(source).map { data => val newPlan = data.logicalPlan assert(output.size == newPlan.output.size, - s"Invalid batch: ${output.mkString(",")} != ${newPlan.output.mkString(",")}") + s"Invalid batch: ${Utils.truncatedString(output, ",")} != " + + s"${Utils.truncatedString(newPlan.output, ",")}") replacements ++= output.zip(newPlan.output) newPlan }.getOrElse { http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 4496f41..77fd043 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LeafNode import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils object MemoryStream { protected val currentBlockId = new AtomicInteger(0) @@ -81,7 +82,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } } - override def toString: String = s"MemoryStream[${output.mkString(",")}]" + override def toString: String = s"MemoryStream[${Utils.truncatedString(output, ",")}]" override def getOffset: Option[Offset] = synchronized { if (batches.isEmpty) { http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala index b4811fe..06466e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala @@ -155,6 +155,55 @@ many column field r/w: Best/Avg Time(ms) Rate(M/s) Per Ro */ } + ignore("wide shallowly nested struct field read and write") { + val benchmark = new Benchmark( + "wide shallowly nested struct field r/w", scaleFactor) + for (width <- widthsToTest) { + val numRows = scaleFactor / width + var datum: String = "{" + for (i <- 1 to width) { + if (i == 1) { + datum += s""""value_$i": 1""" + } else { + datum += s""", "value_$i": 1""" + } + } + datum += "}" + datum = s"""{"a": {"b": {"c": $datum, "d": $datum}, "e": $datum}}""" + val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache() + df.count() // force caching + addCases(benchmark, df, s"$width wide x $numRows rows", "a.b.c.value_1") + } + benchmark.run() + +/* +Java HotSpot(TM) 64-Bit Server VM 1.7.0_80-b15 on Linux 4.2.0-36-generic +Intel(R) Xeon(R) CPU E5-1650 v3 @ 3.50GHz +wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +1 wide x 100000 rows (read in-mem) 100 / 125 1.0 997.7 1.0X +1 wide x 100000 rows (write in-mem) 130 / 147 0.8 1302.9 0.8X +1 wide x 100000 rows (read parquet) 195 / 228 0.5 1951.4 0.5X +1 wide x 100000 rows (write parquet) 248 / 259 0.4 2479.7 0.4X +10 wide x 10000 rows (read in-mem) 76 / 89 1.3 757.2 1.3X +10 wide x 10000 rows (write in-mem) 90 / 116 1.1 900.0 1.1X +10 wide x 10000 rows (read parquet) 90 / 135 1.1 903.9 1.1X +10 wide x 10000 rows (write parquet) 222 / 240 0.4 2222.8 0.4X +100 wide x 1000 rows (read in-mem) 71 / 91 1.4 710.8 1.4X +100 wide x 1000 rows (write in-mem) 252 / 324 0.4 2522.4 0.4X +100 wide x 1000 rows (read parquet) 310 / 329 0.3 3095.9 0.3X +100 wide x 1000 rows (write parquet) 253 / 267 0.4 2525.7 0.4X +1000 wide x 100 rows (read in-mem) 144 / 160 0.7 1439.5 0.7X +1000 wide x 100 rows (write in-mem) 2055 / 2326 0.0 20553.9 0.0X +1000 wide x 100 rows (read parquet) 750 / 925 0.1 7496.8 0.1X +1000 wide x 100 rows (write parquet) 235 / 317 0.4 2347.5 0.4X +2500 wide x 40 rows (read in-mem) 219 / 227 0.5 2190.9 0.5X +2500 wide x 40 rows (write in-mem) 5177 / 5423 0.0 51773.2 0.0X +2500 wide x 40 rows (read parquet) 1642 / 1714 0.1 16417.7 0.1X +2500 wide x 40 rows (write parquet) 357 / 381 0.3 3568.2 0.3X +*/ + } + ignore("wide struct field read and write") { val benchmark = new Benchmark("wide struct field r/w", scaleFactor) for (width <- widthsToTest) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
