Some comments regarding JobConf and InputFormat caching for HadoopRDDs. Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/6a2bbec5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/6a2bbec5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/6a2bbec5
Branch: refs/heads/master Commit: 6a2bbec5e3840cea5c128d521fe91050de8689db Parents: 96929f2 Author: Harvey Feng <har...@databricks.com> Authored: Sat Oct 5 17:39:17 2013 -0700 Committer: Harvey Feng <har...@databricks.com> Committed: Sat Oct 5 17:53:58 2013 -0700 ---------------------------------------------------------------------- .../apache/spark/deploy/SparkHadoopUtil.scala | 10 ++++++---- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a2bbec5/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index f416b95..993ba6b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -27,14 +27,16 @@ import org.apache.hadoop.mapred.JobConf * Contains util methods to interact with Hadoop from spark. */ class SparkHadoopUtil { - // A general map for metadata needed during HadoopRDD split computation (e.g., HadoopFileRDD uses - // this to cache JobConfs). + // A general, soft-reference map for metadata needed during HadoopRDD split computation + // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() - // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems + // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop + // subsystems def newConfiguration(): Configuration = new Configuration() - // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster + // Add any user credentials to the job conf which are necessary for running on a secure Hadoop + // cluster def addCredentials(conf: JobConf) {} def isYarnMode(): Boolean = { false } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a2bbec5/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 51e5bb8..d3b3fff 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -52,8 +52,15 @@ class HadoopFileRDD[K, V]( override def getJobConf(): JobConf = { if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { + // getJobConf() has been called previously, so there is already a local cache of the JobConf + // needed by this RDD. return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] } else { + // Create a new JobConf, set the input file/directory paths to read from, and cache the + // JobConf (i.e., in a shared hash map in the slave's JVM process that's accessible through + // HadoopRDD.putCachedMetadata()), so that we only create one copy across multiple + // getJobConf() calls for this RDD in the local process. + // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. val newJobConf = new JobConf(broadcastedConf.value.value) FileInputFormat.setInputPaths(newJobConf, path) HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) @@ -112,10 +119,16 @@ class HadoopRDD[K, V]( protected def getJobConf(): JobConf = { val conf: Configuration = broadcastedConf.value.value if (conf.isInstanceOf[JobConf]) { + // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. return conf.asInstanceOf[JobConf] } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { + // getJobConf() has been called previously, so there is already a local cache of the JobConf + // needed by this RDD. return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] } else { + // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the + // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). + // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. val newJobConf = new JobConf(broadcastedConf.value.value) HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) return newJobConf @@ -126,6 +139,8 @@ class HadoopRDD[K, V]( if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) { return HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]] } + // Once an InputFormat for this RDD is created, cache it so that only one reflection call is + // done in each local process. val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) .asInstanceOf[InputFormat[K, V]] if (newInputFormat.isInstanceOf[Configurable]) { @@ -197,6 +212,10 @@ class HadoopRDD[K, V]( } private[spark] object HadoopRDD { + /** + * The three methods below are helpers for accessing the local map, a property of the SparkEnv of + * the local process. + */ def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key) def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)