Some comments regarding JobConf and InputFormat caching for HadoopRDDs.

Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/6a2bbec5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/6a2bbec5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/6a2bbec5

Branch: refs/heads/master
Commit: 6a2bbec5e3840cea5c128d521fe91050de8689db
Parents: 96929f2
Author: Harvey Feng <har...@databricks.com>
Authored: Sat Oct 5 17:39:17 2013 -0700
Committer: Harvey Feng <har...@databricks.com>
Committed: Sat Oct 5 17:53:58 2013 -0700

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala    | 10 ++++++----
 .../scala/org/apache/spark/rdd/HadoopRDD.scala   | 19 +++++++++++++++++++
 2 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a2bbec5/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index f416b95..993ba6b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -27,14 +27,16 @@ import org.apache.hadoop.mapred.JobConf
  * Contains util methods to interact with Hadoop from spark.
  */
 class SparkHadoopUtil {
-  // A general map for metadata needed during HadoopRDD split computation 
(e.g., HadoopFileRDD uses
-  // this to cache JobConfs).
+  // A general, soft-reference map for metadata needed during HadoopRDD split 
computation
+  // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
   private[spark] val hadoopJobMetadata = new 
MapMaker().softValues().makeMap[String, Any]()
 
-  // Return an appropriate (subclass) of Configuration. Creating config can 
initializes some hadoop subsystems
+  // Return an appropriate (subclass) of Configuration. Creating config can 
initializes some hadoop
+  // subsystems
   def newConfiguration(): Configuration = new Configuration()
 
-  // add any user credentials to the job conf which are necessary for running 
on a secure Hadoop cluster
+  // Add any user credentials to the job conf which are necessary for running 
on a secure Hadoop
+  // cluster
   def addCredentials(conf: JobConf) {}
 
   def isYarnMode(): Boolean = { false }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a2bbec5/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 51e5bb8..d3b3fff 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -52,8 +52,15 @@ class HadoopFileRDD[K, V](
 
   override def getJobConf(): JobConf = {
     if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+      // getJobConf() has been called previously, so there is already a local 
cache of the JobConf
+      // needed by this RDD.
       return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
     } else {
+      // Create a new JobConf, set the input file/directory paths to read 
from, and cache the
+      // JobConf (i.e., in a shared hash map in the slave's JVM process that's 
accessible through
+      // HadoopRDD.putCachedMetadata()), so that we only create one copy 
across multiple
+      // getJobConf() calls for this RDD in the local process.
+      // The caching helps minimize GC, since a JobConf can contain ~10KB of 
temporary objects.
       val newJobConf = new JobConf(broadcastedConf.value.value)
       FileInputFormat.setInputPaths(newJobConf, path)
       HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
@@ -112,10 +119,16 @@ class HadoopRDD[K, V](
   protected def getJobConf(): JobConf = {
     val conf: Configuration = broadcastedConf.value.value
     if (conf.isInstanceOf[JobConf]) {
+      // A user-broadcasted JobConf was provided to the HadoopRDD, so always 
use it.
       return conf.asInstanceOf[JobConf]
     } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+      // getJobConf() has been called previously, so there is already a local 
cache of the JobConf
+      // needed by this RDD.
       return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
     } else {
+      // Create a JobConf that will be cached and used across this RDD's 
getJobConf() calls in the
+      // local process. The local cache is accessed through 
HadoopRDD.putCachedMetadata().
+      // The caching helps minimize GC, since a JobConf can contain ~10KB of 
temporary objects.
       val newJobConf = new JobConf(broadcastedConf.value.value)
       HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
       return newJobConf
@@ -126,6 +139,8 @@ class HadoopRDD[K, V](
     if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) {
       return 
HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]]
     }
+    // Once an InputFormat for this RDD is created, cache it so that only one 
reflection call is
+    // done in each local process.
     val newInputFormat = 
ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
       .asInstanceOf[InputFormat[K, V]]
     if (newInputFormat.isInstanceOf[Configurable]) {
@@ -197,6 +212,10 @@ class HadoopRDD[K, V](
 }
 
 private[spark] object HadoopRDD {
+  /**
+   * The three methods below are helpers for accessing the local map, a 
property of the SparkEnv of
+   * the local process.
+   */
   def getCachedMetadata(key: String) = 
SparkEnv.get.hadoop.hadoopJobMetadata.get(key)
 
   def containsCachedMetadata(key: String) = 
SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)

Reply via email to