This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new aaa8a80 [SPARK-35613][CORE][SQL] Cache commonly occurring strings in SQLMetrics, JSONProtocol and AccumulatorV2 classes aaa8a80 is described below commit aaa8a80c9d3426107de5873b4391600701121385 Author: Venkata krishnan Sowrirajan <vsowrira...@linkedin.com> AuthorDate: Tue Jun 15 22:02:19 2021 -0500 [SPARK-35613][CORE][SQL] Cache commonly occurring strings in SQLMetrics, JSONProtocol and AccumulatorV2 classes ### What changes were proposed in this pull request? Cache commonly occurring duplicate Some objects in SQLMetrics by using a Guava cache and reusing the existing Guava String Interner to avoid duplicate strings in JSONProtocol. Also with AccumulatorV2 we have seen lot of Some(-1L) and Some(0L) occurrences in a heap dump that is naively interned by having reusing a already constructed Some(-1L) and Some(0L) To give some context on the impact and the garbage got accumulated, below are the details of the complex spark job which we troubleshooted and figured out the bottlenecks. **tl;dr - In short, major issues were the accumulation of duplicate objects mainly from SQLMetrics.** Greater than 25% of the 40G driver heap filled with (a very large number of) **duplicate**, immutable objects. 1. Very large number of **duplicate** immutable objects. - Type of metric is represented by `'scala.Some("sql")'` - which is created for each metric. - Fixing this reduced memory usage from 4GB to a few bytes. 2. `scala.Some(0)` and `scala.Some(-1)` are very common metric values (typically to indicate absence of metric) - Individually the values are all immutable, but spark sql was creating a new instance each time. - Intern'ing these resulted in saving ~4.5GB for a 40G heap. 3. Using string interpolation for metric names. - Interpolation results in creation of a new string object. - We end up with a very large number of metric names - though the number of unique strings is miniscule. - ~7.5 GB in the 40 GB heap : which went down to a few KB's when fixed. ### Why are the changes needed? To reduce overall driver memory footprint which eventually reduces the Full GC pauses. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Since these are memory related optimizations, unit tests are not added. These changes are added in our internal platform which made it possible for one of the complex spark job continuously failing to succeed along with other set of optimizations. Closes #32754 from venkata91/SPARK-35613. Authored-by: Venkata krishnan Sowrirajan <vsowrira...@linkedin.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../scala/org/apache/spark/status/LiveEntity.scala | 16 +----------- .../org/apache/spark/util/AccumulatorV2.scala | 19 +++++++++++++- .../scala/org/apache/spark/util/JsonProtocol.scala | 9 ++++--- .../main/scala/org/apache/spark/util/Utils.scala | 8 ++++++ .../spark/sql/execution/metric/SQLMetrics.scala | 30 +++++++++++++++------- 5 files changed, 53 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 5af76e9..fc5fc32 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -24,8 +24,6 @@ import scala.collection.JavaConverters._ import scala.collection.immutable.{HashSet, TreeSet} import scala.collection.mutable.HashMap -import com.google.common.collect.Interners - import org.apache.spark.JobExecutionStatus import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, ResourceProfile, TaskResourceRequest} @@ -34,6 +32,7 @@ import org.apache.spark.status.api.v1 import org.apache.spark.storage.{RDDInfo, StorageLevel} import org.apache.spark.ui.SparkUI import org.apache.spark.util.{AccumulatorContext, Utils} +import org.apache.spark.util.Utils.weakIntern import org.apache.spark.util.collection.OpenHashSet /** @@ -511,8 +510,6 @@ private class LiveStage(var info: StageInfo) extends LiveEntity { */ private class LiveRDDPartition(val blockName: String, rddLevel: StorageLevel) { - import LiveEntityHelpers._ - // Pointers used by RDDPartitionSeq. @volatile var prev: LiveRDDPartition = null @volatile var next: LiveRDDPartition = null @@ -543,8 +540,6 @@ private class LiveRDDPartition(val blockName: String, rddLevel: StorageLevel) { private class LiveRDDDistribution(exec: LiveExecutor) { - import LiveEntityHelpers._ - val executorId = exec.executorId var memoryUsed = 0L var diskUsed = 0L @@ -582,8 +577,6 @@ private class LiveRDDDistribution(exec: LiveExecutor) { */ private class LiveRDD(val info: RDDInfo, storageLevel: StorageLevel) extends LiveEntity { - import LiveEntityHelpers._ - var memoryUsed = 0L var diskUsed = 0L @@ -657,8 +650,6 @@ private class SchedulerPool(name: String) extends LiveEntity { private[spark] object LiveEntityHelpers { - private val stringInterner = Interners.newWeakInterner[String]() - private def accuValuetoString(value: Any): String = value match { case list: java.util.List[_] => // SPARK-30379: For collection accumulator, string representation might @@ -689,11 +680,6 @@ private[spark] object LiveEntityHelpers { .toSeq } - /** String interning to reduce the memory usage. */ - def weakIntern(s: String): String = { - stringInterner.intern(s) - } - // scalastyle:off argcount def createMetrics( executorDeserializeTime: Long, diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 1453840..16e58ad 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong import org.apache.spark.{InternalAccumulator, SparkContext, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.scheduler.AccumulableInfo +import org.apache.spark.util.AccumulatorContext.internOption private[spark] case class AccumulatorMetadata( id: Long, @@ -107,7 +108,8 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { */ private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) - new AccumulableInfo(id, name, update, value, isInternal, countFailedValues) + AccumulableInfo(id, name, internOption(update), internOption(value), isInternal, + countFailedValues) } final private[spark] def isAtDriverSide: Boolean = atDriverSide @@ -226,6 +228,9 @@ private[spark] object AccumulatorContext extends Logging { private[this] val nextId = new AtomicLong(0L) + private[this] val someOfMinusOne = Some(-1L) + private[this] val someOfZero = Some(0L) + /** * Returns a globally unique ID for a new [[AccumulatorV2]]. * Note: Once you copy the [[AccumulatorV2]] the ID is no longer unique. @@ -281,6 +286,18 @@ private[spark] object AccumulatorContext extends Logging { originals.clear() } + /** Naive way to reduce the duplicate Some objects for values 0 and -1 + * TODO: Eventually if this spreads out to more values then using + * Guava's weak interner would be a better solution. + */ + def internOption(value: Option[Any]): Option[Any] = { + value match { + case Some(0L) => someOfZero + case Some(-1L) => someOfMinusOne + case _ => value + } + } + // Identifier for distinguishing SQL metrics from other accumulators private[spark] val SQL_ACCUM_IDENTIFIER = "sql" } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index c74cca9..4e68ee0 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -37,6 +37,7 @@ import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.storage._ +import org.apache.spark.util.Utils.weakIntern /** * Serializes SparkListener events to/from JSON. This protocol provides strong backwards- @@ -916,8 +917,8 @@ private[spark] object JsonProtocol { val index = (json \ "Index").extract[Int] val attempt = jsonOption(json \ "Attempt").map(_.extract[Int]).getOrElse(1) val launchTime = (json \ "Launch Time").extract[Long] - val executorId = (json \ "Executor ID").extract[String].intern() - val host = (json \ "Host").extract[String].intern() + val executorId = weakIntern((json \ "Executor ID").extract[String]) + val host = weakIntern((json \ "Host").extract[String]) val taskLocality = TaskLocality.withName((json \ "Locality").extract[String]) val speculative = jsonOption(json \ "Speculative").exists(_.extract[Boolean]) val gettingResultTime = (json \ "Getting Result Time").extract[Long] @@ -1137,8 +1138,8 @@ private[spark] object JsonProtocol { if (json == JNothing) { return null } - val executorId = (json \ "Executor ID").extract[String].intern() - val host = (json \ "Host").extract[String].intern() + val executorId = weakIntern((json \ "Executor ID").extract[String]) + val host = weakIntern((json \ "Host").extract[String]) val port = (json \ "Port").extract[Int] BlockManagerId(executorId, host, port) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 18a69cf..ff3d577 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -45,6 +45,7 @@ import scala.util.matching.Regex import _root_.io.netty.channel.unix.Errors.NativeIoException import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} +import com.google.common.collect.Interners import com.google.common.io.{ByteStreams, Files => GFiles} import com.google.common.net.InetAddresses import org.apache.commons.codec.binary.Hex @@ -102,6 +103,8 @@ private[spark] object Utils extends Logging { /** Scheme used for files that are locally available on worker nodes in the cluster. */ val LOCAL_SCHEME = "local" + private val weakStringInterner = Interners.newWeakInterner[String]() + private val PATTERN_FOR_COMMAND_LINE_ARG = "-D(.+?)=(.+)".r /** Serialize an object using Java serialization */ @@ -174,6 +177,11 @@ private[spark] object Utils extends Logging { } } + /** String interning to reduce the memory usage. */ + def weakIntern(s: String): String = { + weakStringInterner.intern(s) + } + /** * Get the ClassLoader which loaded Spark. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 959144b..1ba5b20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -22,12 +22,14 @@ import java.util.{Arrays, Locale} import scala.concurrent.duration._ +import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} + import org.apache.spark.SparkContext import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.sql.connector.metric.CustomMetric import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils} - +import org.apache.spark.util.AccumulatorContext.internOption /** * A metric used in a SQL query plan. This is implemented as an [[AccumulatorV2]]. Updates on @@ -77,8 +79,8 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato // Provide special identifier as metadata so we can tell that this is a `SQLMetric` later override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { - new AccumulableInfo( - id, name, update, value, true, true, Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) + AccumulableInfo(id, name, internOption(update), internOption(value), true, true, + SQLMetrics.cachedSQLAccumIdentifier) } } @@ -91,6 +93,16 @@ object SQLMetrics { private val baseForAvgMetric: Int = 10 + val cachedSQLAccumIdentifier = Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER) + + private val metricsCache: LoadingCache[String, Option[String]] = + CacheBuilder.newBuilder().maximumSize(10000) + .build(new CacheLoader[String, Option[String]] { + override def load(name: String): Option[String] = { + Option(name) + } + }) + /** * Converts a double value to long value by multiplying a base integer, so we can store it in * `SQLMetrics`. It only works for average metrics. When showing the metrics on UI, we restore @@ -104,7 +116,7 @@ object SQLMetrics { def createMetric(sc: SparkContext, name: String): SQLMetric = { val acc = new SQLMetric(SUM_METRIC) - acc.register(sc, name = Some(name), countFailedValues = false) + acc.register(sc, name = metricsCache.get(name), countFailedValues = false) acc } @@ -113,7 +125,7 @@ object SQLMetrics { */ def createV2CustomMetric(sc: SparkContext, customMetric: CustomMetric): SQLMetric = { val acc = new SQLMetric(CustomMetrics.buildV2CustomMetricTypeName(customMetric)) - acc.register(sc, name = Some(customMetric.description()), countFailedValues = false) + acc.register(sc, name = metricsCache.get(customMetric.description()), countFailedValues = false) acc } @@ -126,7 +138,7 @@ object SQLMetrics { // data size total (min, med, max): // 100GB (100MB, 1GB, 10GB) val acc = new SQLMetric(SIZE_METRIC, -1) - acc.register(sc, name = Some(name), countFailedValues = false) + acc.register(sc, name = metricsCache.get(name), countFailedValues = false) acc } @@ -135,14 +147,14 @@ object SQLMetrics { // duration total (min, med, max): // 5s (800ms, 1s, 2s) val acc = new SQLMetric(TIMING_METRIC, -1) - acc.register(sc, name = Some(name), countFailedValues = false) + acc.register(sc, name = metricsCache.get(name), countFailedValues = false) acc } def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = { // Same with createTimingMetric, just normalize the unit of time to millisecond. val acc = new SQLMetric(NS_TIMING_METRIC, -1) - acc.register(sc, name = Some(name), countFailedValues = false) + acc.register(sc, name = metricsCache.get(name), countFailedValues = false) acc } @@ -157,7 +169,7 @@ object SQLMetrics { // probe avg (min, med, max): // (1.2, 2.2, 6.3) val acc = new SQLMetric(AVERAGE_METRIC) - acc.register(sc, name = Some(name), countFailedValues = false) + acc.register(sc, name = metricsCache.get(name), countFailedValues = false) acc } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org