Repository: spark
Updated Branches:
  refs/heads/master f957796c4 -> bb870e72f


[SPARK-5523] [CORE] [STREAMING] Add a cache for hostname in TaskMetrics to 
decrease the memory usage and GC overhead

Hostname in TaskMetrics will be created through deserialization, mostly the 
number of hostname is only the order of number of cluster node, so adding a 
cache layer to dedup the object could reduce the memory usage and alleviate GC 
overhead, especially for long-running and fast job generation applications like 
Spark Streaming.

Author: jerryshao <[email protected]>
Author: Saisai Shao <[email protected]>

Closes #5064 from jerryshao/SPARK-5523 and squashes the following commits:

3e2412a [jerryshao] Address the comments
b092a81 [Saisai Shao] Add a pool to cache the hostname


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb870e72
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb870e72
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb870e72

Branch: refs/heads/master
Commit: bb870e72f42b6ce8d056df259f6fcf41808d7ed2
Parents: f957796
Author: jerryshao <[email protected]>
Authored: Tue Jul 14 19:54:02 2015 -0700
Committer: Tathagata Das <[email protected]>
Committed: Tue Jul 14 19:54:02 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/executor/TaskMetrics.scala | 20 ++++++++++++++++++++
 1 file changed, 20 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bb870e72/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index a3b4561..e80feee 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,11 +17,15 @@
 
 package org.apache.spark.executor
 
+import java.io.{IOException, ObjectInputStream}
+import java.util.concurrent.ConcurrentHashMap
+
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.DataReadMethod.DataReadMethod
 import org.apache.spark.storage.{BlockId, BlockStatus}
+import org.apache.spark.util.Utils
 
 /**
  * :: DeveloperApi ::
@@ -210,10 +214,26 @@ class TaskMetrics extends Serializable {
   private[spark] def updateInputMetrics(): Unit = synchronized {
     inputMetrics.foreach(_.updateBytesRead())
   }
+
+  @throws(classOf[IOException])
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException 
{
+    in.defaultReadObject()
+    // Get the hostname from cached data, since hostname is the order of 
number of nodes in
+    // cluster, so using cached hostname will decrease the object number and 
alleviate the GC
+    // overhead.
+    _hostname = TaskMetrics.getCachedHostName(_hostname)
+  }
 }
 
 private[spark] object TaskMetrics {
+  private val hostNameCache = new ConcurrentHashMap[String, String]()
+
   def empty: TaskMetrics = new TaskMetrics
+
+  def getCachedHostName(host: String): String = {
+    val canonicalHost = hostNameCache.putIfAbsent(host, host)
+    if (canonicalHost != null) canonicalHost else host
+  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to