Updated Branches:
  refs/heads/branch-0.8 4a2e76a3f -> ff53f02a1

Merge pull request #20 from harveyfeng/hadoop-config-cache

Allow users to pass broadcasted Configurations and cache InputFormats across 
Hadoop file reads.

Note: originally from https://github.com/mesos/spark/pull/942

Currently motivated by Shark queries on Hive-partitioned tables, where there's 
a JobConf broadcast for every Hive-partition (i.e., every subdirectory read). 
The only thing different about those JobConfs is the input path - the Hadoop 
Configuration that the JobConfs are constructed from remain the same.
This PR only modifies the old Hadoop API RDDs, but similar additions to the new 
API might reduce computation latencies a little bit for high-frequency 
FileInputDStreams (which only uses the new API right now).

As a small bonus, added InputFormats caching, to avoid reflection calls for 
every RDD#compute().

Few other notes:

Added a general soft-reference hashmap in SparkHadoopUtil because I wanted to 
avoid adding another class to SparkEnv.
SparkContext default hadoopConfiguration isn't cached. There's no equals() 
method for Configuration, so there isn't a good way to determine when 
configuration properties have changed.

(cherry picked from commit 4a25b116d4e451afdf10fc4f018c383ed2c7789a)
Signed-off-by: Reynold Xin <r...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/5383a5a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/5383a5a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/5383a5a5

Branch: refs/heads/branch-0.8
Commit: 5383a5a506c3f79d468afa3bfed04e787e7fe780
Parents: 64fae16
Author: Matei Zaharia <ma...@eecs.berkeley.edu>
Authored: Sat Oct 5 19:28:55 2013 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Sat Oct 12 21:26:11 2013 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/CacheManager.scala   |   4 +-
 .../scala/org/apache/spark/SparkContext.scala   |  39 ++++--
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  12 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 140 ++++++++++++++++---
 4 files changed, 161 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5383a5a5/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala 
b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 3aeea99..4cf7eb9 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -26,7 +26,9 @@ import org.apache.spark.rdd.RDD
     sure a node doesn't load two copies of an RDD at once.
   */
 private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
-  private val loading = new HashSet[String]
+
+  /** Keys of RDD splits that are being computed/loaded. */
+  private val loading = new HashSet[String]()
 
   /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is 
cached. */
   def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, 
storageLevel: StorageLevel)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5383a5a5/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 6ad708f..aacc017 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat 
=> NewFileInputFor
 
 import org.apache.mesos.MesosNativeLibrary
 
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.LocalSparkCluster
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
@@ -83,9 +84,11 @@ class SparkContext(
     val sparkHome: String = null,
     val jars: Seq[String] = Nil,
     val environment: Map[String, String] = Map(),
-    // This is used only by yarn for now, but should be relevant to other 
cluster types (mesos, etc) too.
-    // This is typically generated from 
InputFormatInfo.computePreferredLocations .. host, set of data-local splits on 
host
-    val preferredNodeLocationData: scala.collection.Map[String, 
scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map())
+    // This is used only by yarn for now, but should be relevant to other 
cluster types (mesos, etc)
+    // too. This is typically generated from 
InputFormatInfo.computePreferredLocations .. host, set
+    // of data-local splits on host
+    val preferredNodeLocationData: scala.collection.Map[String, 
scala.collection.Set[SplitInfo]] =
+      scala.collection.immutable.Map())
   extends Logging {
 
   // Ensure logging is initialized before we spawn any threads
@@ -238,7 +241,8 @@ class SparkContext(
     val env = SparkEnv.get
     val conf = env.hadoop.newConfiguration()
     // Explicitly check for S3 environment variables
-    if (System.getenv("AWS_ACCESS_KEY_ID") != null && 
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
+    if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
+        System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
       conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
       conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
       conf.set("fs.s3.awsSecretAccessKey", 
System.getenv("AWS_SECRET_ACCESS_KEY"))
@@ -337,6 +341,8 @@ class SparkContext(
       valueClass: Class[V],
       minSplits: Int = defaultMinSplits
       ): RDD[(K, V)] = {
+    // Add necessary security credentials to the JobConf before broadcasting 
it.
+    SparkEnv.get.hadoop.addCredentials(conf)
     new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, 
minSplits)
   }
 
@@ -347,10 +353,27 @@ class SparkContext(
       keyClass: Class[K],
       valueClass: Class[V],
       minSplits: Int = defaultMinSplits
-      ) : RDD[(K, V)] = {
-    val conf = new JobConf(hadoopConfiguration)
-    FileInputFormat.setInputPaths(conf, path)
-    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, 
minSplits)
+      ): RDD[(K, V)] = {
+    // A Hadoop configuration can be about 10 KB, which is pretty big, so 
broadcast it.
+    val confBroadcast = broadcast(new 
SerializableWritable(hadoopConfiguration))
+    hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, 
minSplits)
+  }
+
+  /**
+   * Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a 
Hadoop Configuration
+   * that has already been broadcast, assuming that it's safe to use it to 
construct a
+   * HadoopFileRDD (i.e., except for file 'path', all other configuration 
properties can be resued).
+   */
+  def hadoopFile[K, V](
+      path: String,
+      confBroadcast: Broadcast[SerializableWritable[Configuration]],
+      inputFormatClass: Class[_ <: InputFormat[K, V]],
+      keyClass: Class[K],
+      valueClass: Class[V],
+      minSplits: Int
+      ): RDD[(K, V)] = {
+    new HadoopFileRDD(
+      this, path, confBroadcast, inputFormatClass, keyClass, valueClass, 
minSplits)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5383a5a5/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 0a5f4c3..993ba6b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -16,6 +16,9 @@
  */
 
 package org.apache.spark.deploy
+
+import com.google.common.collect.MapMaker
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 
@@ -24,11 +27,16 @@ import org.apache.hadoop.mapred.JobConf
  * Contains util methods to interact with Hadoop from spark.
  */
 class SparkHadoopUtil {
+  // A general, soft-reference map for metadata needed during HadoopRDD split 
computation
+  // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
+  private[spark] val hadoopJobMetadata = new 
MapMaker().softValues().makeMap[String, Any]()
 
-  // Return an appropriate (subclass) of Configuration. Creating config can 
initializes some hadoop subsystems
+  // Return an appropriate (subclass) of Configuration. Creating config can 
initializes some hadoop
+  // subsystems
   def newConfiguration(): Configuration = new Configuration()
 
-  // add any user credentials to the job conf which are necessary for running 
on a secure Hadoop cluster
+  // Add any user credentials to the job conf which are necessary for running 
on a secure Hadoop
+  // cluster
   def addCredentials(conf: JobConf) {}
 
   def isYarnMode(): Boolean = { false }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5383a5a5/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2cb6734..d3b3fff 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -19,6 +19,7 @@ package org.apache.spark.rdd
 
 import java.io.EOFException
 
+import org.apache.hadoop.mapred.FileInputFormat
 import org.apache.hadoop.mapred.InputFormat
 import org.apache.hadoop.mapred.InputSplit
 import org.apache.hadoop.mapred.JobConf
@@ -26,10 +27,47 @@ import org.apache.hadoop.mapred.RecordReader
 import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.util.ReflectionUtils
 
-import org.apache.spark.{Logging, Partition, SerializableWritable, 
SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.{Logging, Partition, SerializableWritable, 
SparkContext, SparkEnv,
+  TaskContext}
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.util.NextIterator
 import org.apache.hadoop.conf.{Configuration, Configurable}
 
+/**
+ * An RDD that reads a file (or multiple files) from Hadoop (e.g. files in 
HDFS, the local file
+ * system, or S3).
+ * This accepts a general, broadcasted Hadoop Configuration because those tend 
to remain the same
+ * across multiple reads; the 'path' is the only variable that is different 
across new JobConfs
+ * created from the Configuration.
+ */
+class HadoopFileRDD[K, V](
+    sc: SparkContext,
+    path: String,
+    broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+    inputFormatClass: Class[_ <: InputFormat[K, V]],
+    keyClass: Class[K],
+    valueClass: Class[V],
+    minSplits: Int)
+  extends HadoopRDD[K, V](sc, broadcastedConf, inputFormatClass, keyClass, 
valueClass, minSplits) {
+
+  override def getJobConf(): JobConf = {
+    if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+      // getJobConf() has been called previously, so there is already a local 
cache of the JobConf
+      // needed by this RDD.
+      return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+    } else {
+      // Create a new JobConf, set the input file/directory paths to read 
from, and cache the
+      // JobConf (i.e., in a shared hash map in the slave's JVM process that's 
accessible through
+      // HadoopRDD.putCachedMetadata()), so that we only create one copy 
across multiple
+      // getJobConf() calls for this RDD in the local process.
+      // The caching helps minimize GC, since a JobConf can contain ~10KB of 
temporary objects.
+      val newJobConf = new JobConf(broadcastedConf.value.value)
+      FileInputFormat.setInputPaths(newJobConf, path)
+      HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+      return newJobConf
+    }
+  }
+}
 
 /**
  * A Spark split class that wraps around a Hadoop InputSplit.
@@ -45,29 +83,80 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, 
@transient s: InputSp
 }
 
 /**
- * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in 
HDFS, the local file
- * system, or S3, tables in HBase, etc).
+ * A base class that provides core functionality for reading data partitions 
stored in Hadoop.
  */
 class HadoopRDD[K, V](
     sc: SparkContext,
-    @transient conf: JobConf,
+    broadcastedConf: Broadcast[SerializableWritable[Configuration]],
     inputFormatClass: Class[_ <: InputFormat[K, V]],
     keyClass: Class[K],
     valueClass: Class[V],
     minSplits: Int)
   extends RDD[(K, V)](sc, Nil) with Logging {
 
-  // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
-  private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+  def this(
+      sc: SparkContext,
+      conf: JobConf,
+      inputFormatClass: Class[_ <: InputFormat[K, V]],
+      keyClass: Class[K],
+      valueClass: Class[V],
+      minSplits: Int) = {
+    this(
+      sc,
+      sc.broadcast(new SerializableWritable(conf))
+        .asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
+      inputFormatClass,
+      keyClass,
+      valueClass,
+      minSplits)
+  }
+
+  protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
+
+  protected val inputFormatCacheKey = "rdd_%d_input_format".format(id)
+
+  // Returns a JobConf that will be used on slaves to obtain input splits for 
Hadoop reads.
+  protected def getJobConf(): JobConf = {
+    val conf: Configuration = broadcastedConf.value.value
+    if (conf.isInstanceOf[JobConf]) {
+      // A user-broadcasted JobConf was provided to the HadoopRDD, so always 
use it.
+      return conf.asInstanceOf[JobConf]
+    } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+      // getJobConf() has been called previously, so there is already a local 
cache of the JobConf
+      // needed by this RDD.
+      return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+    } else {
+      // Create a JobConf that will be cached and used across this RDD's 
getJobConf() calls in the
+      // local process. The local cache is accessed through 
HadoopRDD.putCachedMetadata().
+      // The caching helps minimize GC, since a JobConf can contain ~10KB of 
temporary objects.
+      val newJobConf = new JobConf(broadcastedConf.value.value)
+      HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+      return newJobConf
+    }
+  }
+
+  protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
+    if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) {
+      return 
HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]]
+    }
+    // Once an InputFormat for this RDD is created, cache it so that only one 
reflection call is
+    // done in each local process.
+    val newInputFormat = 
ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
+      .asInstanceOf[InputFormat[K, V]]
+    if (newInputFormat.isInstanceOf[Configurable]) {
+      newInputFormat.asInstanceOf[Configurable].setConf(conf)
+    }
+    HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat)
+    return newInputFormat
+  }
 
   override def getPartitions: Array[Partition] = {
-    val env = SparkEnv.get
-    env.hadoop.addCredentials(conf)
-    val inputFormat = createInputFormat(conf)
+    val jobConf = getJobConf()
+    val inputFormat = getInputFormat(jobConf)
     if (inputFormat.isInstanceOf[Configurable]) {
-      inputFormat.asInstanceOf[Configurable].setConf(conf)
+      inputFormat.asInstanceOf[Configurable].setConf(jobConf)
     }
-    val inputSplits = inputFormat.getSplits(conf, minSplits)
+    val inputSplits = inputFormat.getSplits(jobConf, minSplits)
     val array = new Array[Partition](inputSplits.size)
     for (i <- 0 until inputSplits.size) {
       array(i) = new HadoopPartition(id, i, inputSplits(i))
@@ -75,22 +164,14 @@ class HadoopRDD[K, V](
     array
   }
 
-  def createInputFormat(conf: JobConf): InputFormat[K, V] = {
-    ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
-      .asInstanceOf[InputFormat[K, V]]
-  }
-
   override def compute(theSplit: Partition, context: TaskContext) = new 
NextIterator[(K, V)] {
     val split = theSplit.asInstanceOf[HadoopPartition]
     logInfo("Input split: " + split.inputSplit)
     var reader: RecordReader[K, V] = null
 
-    val conf = confBroadcast.value.value
-    val fmt = createInputFormat(conf)
-    if (fmt.isInstanceOf[Configurable]) {
-      fmt.asInstanceOf[Configurable].setConf(conf)
-    }
-    reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
+    val jobConf = getJobConf()
+    val inputFormat = getInputFormat(jobConf)
+    reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, 
Reporter.NULL)
 
     // Register an on-task-completion callback to close the input stream.
     context.addOnCompleteCallback{ () => closeIfNeeded() }
@@ -127,5 +208,18 @@ class HadoopRDD[K, V](
     // Do nothing. Hadoop RDD should not be checkpointed.
   }
 
-  def getConf: Configuration = confBroadcast.value.value
+  def getConf: Configuration = getJobConf()
+}
+
+private[spark] object HadoopRDD {
+  /**
+   * The three methods below are helpers for accessing the local map, a 
property of the SparkEnv of
+   * the local process.
+   */
+  def getCachedMetadata(key: String) = 
SparkEnv.get.hadoop.hadoopJobMetadata.get(key)
+
+  def containsCachedMetadata(key: String) = 
SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)
+
+  def putCachedMetadata(key: String, value: Any) =
+    SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value)
 }

Reply via email to