Merge pull request #52 from harveyfeng/hadoop-closure

Add an optional closure parameter to HadoopRDD instantiation to use when 
creating local JobConfs.

Having HadoopRDD accept this optional closure eliminates the need for the 
HadoopFileRDD added earlier. It makes the HadoopRDD more general, in that the 
caller can specify any JobConf initialization flow.

(cherry picked from commit 99796904ae9d00405584ac518b6144973a876e3c)
Signed-off-by: Reynold Xin <r...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/00a7551b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/00a7551b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/00a7551b

Branch: refs/heads/branch-0.8
Commit: 00a7551b46dc5acbd3dffd0045fe4091410617af
Parents: 5383a5a
Author: Reynold Xin <r...@apache.org>
Authored: Sat Oct 12 21:23:26 2013 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Sat Oct 12 21:26:22 2013 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 29 ++++-------
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 52 ++++++--------------
 2 files changed, 26 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/00a7551b/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index aacc017..1d16ad3 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -330,7 +330,7 @@ class SparkContext(
   }
 
   /**
-   * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf giving its 
InputFormat and any
+   * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its 
InputFormat and any
    * other necessary info (e.g. file name for a filesystem-based dataset, 
table name for HyperTable,
    * etc).
    */
@@ -356,24 +356,15 @@ class SparkContext(
       ): RDD[(K, V)] = {
     // A Hadoop configuration can be about 10 KB, which is pretty big, so 
broadcast it.
     val confBroadcast = broadcast(new 
SerializableWritable(hadoopConfiguration))
-    hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, 
minSplits)
-  }
-
-  /**
-   * Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a 
Hadoop Configuration
-   * that has already been broadcast, assuming that it's safe to use it to 
construct a
-   * HadoopFileRDD (i.e., except for file 'path', all other configuration 
properties can be resued).
-   */
-  def hadoopFile[K, V](
-      path: String,
-      confBroadcast: Broadcast[SerializableWritable[Configuration]],
-      inputFormatClass: Class[_ <: InputFormat[K, V]],
-      keyClass: Class[K],
-      valueClass: Class[V],
-      minSplits: Int
-      ): RDD[(K, V)] = {
-    new HadoopFileRDD(
-      this, path, confBroadcast, inputFormatClass, keyClass, valueClass, 
minSplits)
+    val setInputPathsFunc = (jobConf: JobConf) => 
FileInputFormat.setInputPaths(jobConf, path)
+    new HadoopRDD(
+      this,
+      confBroadcast,
+      Some(setInputPathsFunc),
+      inputFormatClass,
+      keyClass,
+      valueClass,
+      minSplits)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/00a7551b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index d3b3fff..2d394ab 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -33,41 +33,6 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.util.NextIterator
 import org.apache.hadoop.conf.{Configuration, Configurable}
 
-/**
- * An RDD that reads a file (or multiple files) from Hadoop (e.g. files in 
HDFS, the local file
- * system, or S3).
- * This accepts a general, broadcasted Hadoop Configuration because those tend 
to remain the same
- * across multiple reads; the 'path' is the only variable that is different 
across new JobConfs
- * created from the Configuration.
- */
-class HadoopFileRDD[K, V](
-    sc: SparkContext,
-    path: String,
-    broadcastedConf: Broadcast[SerializableWritable[Configuration]],
-    inputFormatClass: Class[_ <: InputFormat[K, V]],
-    keyClass: Class[K],
-    valueClass: Class[V],
-    minSplits: Int)
-  extends HadoopRDD[K, V](sc, broadcastedConf, inputFormatClass, keyClass, 
valueClass, minSplits) {
-
-  override def getJobConf(): JobConf = {
-    if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
-      // getJobConf() has been called previously, so there is already a local 
cache of the JobConf
-      // needed by this RDD.
-      return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
-    } else {
-      // Create a new JobConf, set the input file/directory paths to read 
from, and cache the
-      // JobConf (i.e., in a shared hash map in the slave's JVM process that's 
accessible through
-      // HadoopRDD.putCachedMetadata()), so that we only create one copy 
across multiple
-      // getJobConf() calls for this RDD in the local process.
-      // The caching helps minimize GC, since a JobConf can contain ~10KB of 
temporary objects.
-      val newJobConf = new JobConf(broadcastedConf.value.value)
-      FileInputFormat.setInputPaths(newJobConf, path)
-      HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
-      return newJobConf
-    }
-  }
-}
 
 /**
  * A Spark split class that wraps around a Hadoop InputSplit.
@@ -83,11 +48,24 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, 
@transient s: InputSp
 }
 
 /**
- * A base class that provides core functionality for reading data partitions 
stored in Hadoop.
+ * An RDD that provides core functionality for reading data stored in Hadoop 
(e.g., files in HDFS,
+ * sources in HBase, or S3).
+ *
+ * @param sc The SparkContext to associate the RDD with.
+ * @param broadCastedConf A general Hadoop Configuration, or a subclass of it. 
If the enclosed
+ *     variabe references an instance of JobConf, then that JobConf will be 
used for the Hadoop job.
+ *     Otherwise, a new JobConf will be created on each slave using the 
enclosed Configuration.
+ * @param initLocalJobConfFuncOpt Optional closure used to initialize any 
JobConf that HadoopRDD
+ *     creates.
+ * @param inputFormatClass Storage format of the data to be read.
+ * @param keyClass Class of the key associated with the inputFormatClass.
+ * @param valueClass Class of the value associated with the inputFormatClass.
+ * @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to 
generate.
  */
 class HadoopRDD[K, V](
     sc: SparkContext,
     broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+    initLocalJobConfFuncOpt: Option[JobConf => Unit],
     inputFormatClass: Class[_ <: InputFormat[K, V]],
     keyClass: Class[K],
     valueClass: Class[V],
@@ -105,6 +83,7 @@ class HadoopRDD[K, V](
       sc,
       sc.broadcast(new SerializableWritable(conf))
         .asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
+      None /* initLocalJobConfFuncOpt */,
       inputFormatClass,
       keyClass,
       valueClass,
@@ -130,6 +109,7 @@ class HadoopRDD[K, V](
       // local process. The local cache is accessed through 
HadoopRDD.putCachedMetadata().
       // The caching helps minimize GC, since a JobConf can contain ~10KB of 
temporary objects.
       val newJobConf = new JobConf(broadcastedConf.value.value)
+      initLocalJobConfFuncOpt.map(f => f(newJobConf))
       HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
       return newJobConf
     }

Reply via email to