Repository: spark Updated Branches: refs/heads/master f957796c4 -> bb870e72f
[SPARK-5523] [CORE] [STREAMING] Add a cache for hostname in TaskMetrics to decrease the memory usage and GC overhead Hostname in TaskMetrics will be created through deserialization, mostly the number of hostname is only the order of number of cluster node, so adding a cache layer to dedup the object could reduce the memory usage and alleviate GC overhead, especially for long-running and fast job generation applications like Spark Streaming. Author: jerryshao <[email protected]> Author: Saisai Shao <[email protected]> Closes #5064 from jerryshao/SPARK-5523 and squashes the following commits: 3e2412a [jerryshao] Address the comments b092a81 [Saisai Shao] Add a pool to cache the hostname Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb870e72 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb870e72 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb870e72 Branch: refs/heads/master Commit: bb870e72f42b6ce8d056df259f6fcf41808d7ed2 Parents: f957796 Author: jerryshao <[email protected]> Authored: Tue Jul 14 19:54:02 2015 -0700 Committer: Tathagata Das <[email protected]> Committed: Tue Jul 14 19:54:02 2015 -0700 ---------------------------------------------------------------------- .../org/apache/spark/executor/TaskMetrics.scala | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bb870e72/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index a3b4561..e80feee 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -17,11 +17,15 @@ package org.apache.spark.executor +import java.io.{IOException, ObjectInputStream} +import java.util.concurrent.ConcurrentHashMap + import scala.collection.mutable.ArrayBuffer import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.DataReadMethod.DataReadMethod import org.apache.spark.storage.{BlockId, BlockStatus} +import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -210,10 +214,26 @@ class TaskMetrics extends Serializable { private[spark] def updateInputMetrics(): Unit = synchronized { inputMetrics.foreach(_.updateBytesRead()) } + + @throws(classOf[IOException]) + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { + in.defaultReadObject() + // Get the hostname from cached data, since hostname is the order of number of nodes in + // cluster, so using cached hostname will decrease the object number and alleviate the GC + // overhead. + _hostname = TaskMetrics.getCachedHostName(_hostname) + } } private[spark] object TaskMetrics { + private val hostNameCache = new ConcurrentHashMap[String, String]() + def empty: TaskMetrics = new TaskMetrics + + def getCachedHostName(host: String): String = { + val canonicalHost = hostNameCache.putIfAbsent(host, host) + if (canonicalHost != null) canonicalHost else host + } } /** --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
