This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new aaa8a80  [SPARK-35613][CORE][SQL] Cache commonly occurring strings in 
SQLMetrics, JSONProtocol and AccumulatorV2 classes
aaa8a80 is described below

commit aaa8a80c9d3426107de5873b4391600701121385
Author: Venkata krishnan Sowrirajan <vsowrira...@linkedin.com>
AuthorDate: Tue Jun 15 22:02:19 2021 -0500

    [SPARK-35613][CORE][SQL] Cache commonly occurring strings in SQLMetrics, 
JSONProtocol and AccumulatorV2 classes
    
    ### What changes were proposed in this pull request?
    Cache commonly occurring duplicate Some objects in SQLMetrics by using a 
Guava cache and reusing the existing Guava String Interner to avoid duplicate 
strings in JSONProtocol. Also with AccumulatorV2 we have seen lot of Some(-1L) 
and Some(0L) occurrences in a heap dump that is naively interned by having 
reusing a already constructed Some(-1L) and Some(0L)
    
    To give some context on the impact and the garbage got accumulated, below 
are the details of the complex spark job which we troubleshooted and figured 
out the bottlenecks. **tl;dr - In short, major issues were the accumulation of 
duplicate objects mainly from SQLMetrics.**
    
    Greater than 25% of the 40G driver heap filled with (a very large number 
of) **duplicate**, immutable objects.
    
    1. Very large number of **duplicate** immutable objects.
    
    - Type of metric is represented by `'scala.Some("sql")'` - which is created 
for each metric.
    - Fixing this reduced memory usage from 4GB to a few bytes.
    
    2. `scala.Some(0)` and `scala.Some(-1)` are very common metric values 
(typically to indicate absence of metric)
    
    - Individually the values are all immutable, but spark sql was creating a 
new instance each time.
    - Intern'ing these resulted in saving ~4.5GB for a 40G heap.
    
    3. Using string interpolation for metric names.
    
    - Interpolation results in creation of a new string object.
    - We end up with a very large number of metric names - though the number of 
unique strings is miniscule.
    - ~7.5 GB in the 40 GB heap : which went down to a few KB's when fixed.
    
    ### Why are the changes needed?
    To reduce overall driver memory footprint which eventually reduces the Full 
GC pauses.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Since these are memory related optimizations, unit tests are not added. 
These changes are added in our internal platform which made it possible for one 
of the complex spark job continuously failing to succeed along with other set 
of optimizations.
    
    Closes #32754 from venkata91/SPARK-35613.
    
    Authored-by: Venkata krishnan Sowrirajan <vsowrira...@linkedin.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../scala/org/apache/spark/status/LiveEntity.scala | 16 +-----------
 .../org/apache/spark/util/AccumulatorV2.scala      | 19 +++++++++++++-
 .../scala/org/apache/spark/util/JsonProtocol.scala |  9 ++++---
 .../main/scala/org/apache/spark/util/Utils.scala   |  8 ++++++
 .../spark/sql/execution/metric/SQLMetrics.scala    | 30 +++++++++++++++-------
 5 files changed, 53 insertions(+), 29 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala 
b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index 5af76e9..fc5fc32 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -24,8 +24,6 @@ import scala.collection.JavaConverters._
 import scala.collection.immutable.{HashSet, TreeSet}
 import scala.collection.mutable.HashMap
 
-import com.google.common.collect.Interners
-
 import org.apache.spark.JobExecutionStatus
 import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.resource.{ExecutorResourceRequest, 
ResourceInformation, ResourceProfile, TaskResourceRequest}
@@ -34,6 +32,7 @@ import org.apache.spark.status.api.v1
 import org.apache.spark.storage.{RDDInfo, StorageLevel}
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{AccumulatorContext, Utils}
+import org.apache.spark.util.Utils.weakIntern
 import org.apache.spark.util.collection.OpenHashSet
 
 /**
@@ -511,8 +510,6 @@ private class LiveStage(var info: StageInfo) extends 
LiveEntity {
  */
 private class LiveRDDPartition(val blockName: String, rddLevel: StorageLevel) {
 
-  import LiveEntityHelpers._
-
   // Pointers used by RDDPartitionSeq.
   @volatile var prev: LiveRDDPartition = null
   @volatile var next: LiveRDDPartition = null
@@ -543,8 +540,6 @@ private class LiveRDDPartition(val blockName: String, 
rddLevel: StorageLevel) {
 
 private class LiveRDDDistribution(exec: LiveExecutor) {
 
-  import LiveEntityHelpers._
-
   val executorId = exec.executorId
   var memoryUsed = 0L
   var diskUsed = 0L
@@ -582,8 +577,6 @@ private class LiveRDDDistribution(exec: LiveExecutor) {
  */
 private class LiveRDD(val info: RDDInfo, storageLevel: StorageLevel) extends 
LiveEntity {
 
-  import LiveEntityHelpers._
-
   var memoryUsed = 0L
   var diskUsed = 0L
 
@@ -657,8 +650,6 @@ private class SchedulerPool(name: String) extends 
LiveEntity {
 
 private[spark] object LiveEntityHelpers {
 
-  private val stringInterner = Interners.newWeakInterner[String]()
-
   private def accuValuetoString(value: Any): String = value match {
     case list: java.util.List[_] =>
       // SPARK-30379: For collection accumulator, string representation might
@@ -689,11 +680,6 @@ private[spark] object LiveEntityHelpers {
       .toSeq
   }
 
-  /** String interning to reduce the memory usage. */
-  def weakIntern(s: String): String = {
-    stringInterner.intern(s)
-  }
-
   // scalastyle:off argcount
   def createMetrics(
       executorDeserializeTime: Long,
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala 
b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 1453840..16e58ad 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong
 import org.apache.spark.{InternalAccumulator, SparkContext, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.AccumulableInfo
+import org.apache.spark.util.AccumulatorContext.internOption
 
 private[spark] case class AccumulatorMetadata(
     id: Long,
@@ -107,7 +108,8 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
    */
   private[spark] def toInfo(update: Option[Any], value: Option[Any]): 
AccumulableInfo = {
     val isInternal = 
name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))
-    new AccumulableInfo(id, name, update, value, isInternal, countFailedValues)
+    AccumulableInfo(id, name, internOption(update), internOption(value), 
isInternal,
+      countFailedValues)
   }
 
   final private[spark] def isAtDriverSide: Boolean = atDriverSide
@@ -226,6 +228,9 @@ private[spark] object AccumulatorContext extends Logging {
 
   private[this] val nextId = new AtomicLong(0L)
 
+  private[this] val someOfMinusOne = Some(-1L)
+  private[this] val someOfZero = Some(0L)
+
   /**
    * Returns a globally unique ID for a new [[AccumulatorV2]].
    * Note: Once you copy the [[AccumulatorV2]] the ID is no longer unique.
@@ -281,6 +286,18 @@ private[spark] object AccumulatorContext extends Logging {
     originals.clear()
   }
 
+  /** Naive way to reduce the duplicate Some objects for values 0 and -1
+   *  TODO: Eventually if this spreads out to more values then using
+   *  Guava's weak interner would be a better solution.
+   */
+  def internOption(value: Option[Any]): Option[Any] = {
+    value match {
+      case Some(0L) => someOfZero
+      case Some(-1L) => someOfMinusOne
+      case _ => value
+    }
+  }
+
   // Identifier for distinguishing SQL metrics from other accumulators
   private[spark] val SQL_ACCUM_IDENTIFIER = "sql"
 }
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index c74cca9..4e68ee0 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -37,6 +37,7 @@ import org.apache.spark.resource.{ExecutorResourceRequest, 
ResourceInformation,
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.storage._
+import org.apache.spark.util.Utils.weakIntern
 
 /**
  * Serializes SparkListener events to/from JSON.  This protocol provides 
strong backwards-
@@ -916,8 +917,8 @@ private[spark] object JsonProtocol {
     val index = (json \ "Index").extract[Int]
     val attempt = jsonOption(json \ "Attempt").map(_.extract[Int]).getOrElse(1)
     val launchTime = (json \ "Launch Time").extract[Long]
-    val executorId = (json \ "Executor ID").extract[String].intern()
-    val host = (json \ "Host").extract[String].intern()
+    val executorId = weakIntern((json \ "Executor ID").extract[String])
+    val host = weakIntern((json \ "Host").extract[String])
     val taskLocality = TaskLocality.withName((json \ 
"Locality").extract[String])
     val speculative = jsonOption(json \ 
"Speculative").exists(_.extract[Boolean])
     val gettingResultTime = (json \ "Getting Result Time").extract[Long]
@@ -1137,8 +1138,8 @@ private[spark] object JsonProtocol {
     if (json == JNothing) {
       return null
     }
-    val executorId = (json \ "Executor ID").extract[String].intern()
-    val host = (json \ "Host").extract[String].intern()
+    val executorId = weakIntern((json \ "Executor ID").extract[String])
+    val host = weakIntern((json \ "Host").extract[String])
     val port = (json \ "Port").extract[Int]
     BlockManagerId(executorId, host, port)
   }
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 18a69cf..ff3d577 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -45,6 +45,7 @@ import scala.util.matching.Regex
 
 import _root_.io.netty.channel.unix.Errors.NativeIoException
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+import com.google.common.collect.Interners
 import com.google.common.io.{ByteStreams, Files => GFiles}
 import com.google.common.net.InetAddresses
 import org.apache.commons.codec.binary.Hex
@@ -102,6 +103,8 @@ private[spark] object Utils extends Logging {
   /** Scheme used for files that are locally available on worker nodes in the 
cluster. */
   val LOCAL_SCHEME = "local"
 
+  private val weakStringInterner = Interners.newWeakInterner[String]()
+
   private val PATTERN_FOR_COMMAND_LINE_ARG = "-D(.+?)=(.+)".r
 
   /** Serialize an object using Java serialization */
@@ -174,6 +177,11 @@ private[spark] object Utils extends Logging {
     }
   }
 
+  /** String interning to reduce the memory usage. */
+  def weakIntern(s: String): String = {
+    weakStringInterner.intern(s)
+  }
+
   /**
    * Get the ClassLoader which loaded Spark.
    */
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 959144b..1ba5b20 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -22,12 +22,14 @@ import java.util.{Arrays, Locale}
 
 import scala.concurrent.duration._
 
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+
 import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.AccumulableInfo
 import org.apache.spark.sql.connector.metric.CustomMetric
 import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates
 import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}
-
+import org.apache.spark.util.AccumulatorContext.internOption
 
 /**
  * A metric used in a SQL query plan. This is implemented as an 
[[AccumulatorV2]]. Updates on
@@ -77,8 +79,8 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) 
extends Accumulato
 
   // Provide special identifier as metadata so we can tell that this is a 
`SQLMetric` later
   override def toInfo(update: Option[Any], value: Option[Any]): 
AccumulableInfo = {
-    new AccumulableInfo(
-      id, name, update, value, true, true, 
Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
+    AccumulableInfo(id, name, internOption(update), internOption(value), true, 
true,
+      SQLMetrics.cachedSQLAccumIdentifier)
   }
 }
 
@@ -91,6 +93,16 @@ object SQLMetrics {
 
   private val baseForAvgMetric: Int = 10
 
+  val cachedSQLAccumIdentifier = Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)
+
+  private val metricsCache: LoadingCache[String, Option[String]] =
+    CacheBuilder.newBuilder().maximumSize(10000)
+    .build(new CacheLoader[String, Option[String]] {
+      override def load(name: String): Option[String] = {
+        Option(name)
+      }
+    })
+
   /**
    * Converts a double value to long value by multiplying a base integer, so 
we can store it in
    * `SQLMetrics`. It only works for average metrics. When showing the metrics 
on UI, we restore
@@ -104,7 +116,7 @@ object SQLMetrics {
 
   def createMetric(sc: SparkContext, name: String): SQLMetric = {
     val acc = new SQLMetric(SUM_METRIC)
-    acc.register(sc, name = Some(name), countFailedValues = false)
+    acc.register(sc, name = metricsCache.get(name), countFailedValues = false)
     acc
   }
 
@@ -113,7 +125,7 @@ object SQLMetrics {
    */
   def createV2CustomMetric(sc: SparkContext, customMetric: CustomMetric): 
SQLMetric = {
     val acc = new 
SQLMetric(CustomMetrics.buildV2CustomMetricTypeName(customMetric))
-    acc.register(sc, name = Some(customMetric.description()), 
countFailedValues = false)
+    acc.register(sc, name = metricsCache.get(customMetric.description()), 
countFailedValues = false)
     acc
   }
 
@@ -126,7 +138,7 @@ object SQLMetrics {
     // data size total (min, med, max):
     // 100GB (100MB, 1GB, 10GB)
     val acc = new SQLMetric(SIZE_METRIC, -1)
-    acc.register(sc, name = Some(name), countFailedValues = false)
+    acc.register(sc, name = metricsCache.get(name), countFailedValues = false)
     acc
   }
 
@@ -135,14 +147,14 @@ object SQLMetrics {
     // duration total (min, med, max):
     // 5s (800ms, 1s, 2s)
     val acc = new SQLMetric(TIMING_METRIC, -1)
-    acc.register(sc, name = Some(name), countFailedValues = false)
+    acc.register(sc, name = metricsCache.get(name), countFailedValues = false)
     acc
   }
 
   def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = {
     // Same with createTimingMetric, just normalize the unit of time to 
millisecond.
     val acc = new SQLMetric(NS_TIMING_METRIC, -1)
-    acc.register(sc, name = Some(name), countFailedValues = false)
+    acc.register(sc, name = metricsCache.get(name), countFailedValues = false)
     acc
   }
 
@@ -157,7 +169,7 @@ object SQLMetrics {
     // probe avg (min, med, max):
     // (1.2, 2.2, 6.3)
     val acc = new SQLMetric(AVERAGE_METRIC)
-    acc.register(sc, name = Some(name), countFailedValues = false)
+    acc.register(sc, name = metricsCache.get(name), countFailedValues = false)
     acc
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to