Repository: incubator-spark
Updated Branches:
  refs/heads/master d8d190efd -> 4d8803048


For outputformats that are Configurable, call setConf before sending data to 
them.

[SPARK-1108] This allows us to use, e.g. HBase's TableOutputFormat with 
PairRDDFunctions.saveAsNewAPIHadoopFile, which otherwise would throw 
NullPointerException because the output table name hasn't been configured.

Note this bug also affects branch-0.9

Author: Bryn Keller <bryn.kel...@intel.com>

Closes #638 from xoltar/SPARK-1108 and squashes the following commits:

7e94e7d [Bryn Keller] Import, comment, and format cleanup per code review
7cbcaa1 [Bryn Keller] For outputformats that are Configurable, call setConf 
before sending data to them. This allows us to use, e.g. HBase 
TableOutputFormat, which otherwise would throw NullPointerException because the 
output table name hasn't been configured


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/4d880304
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/4d880304
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/4d880304

Branch: refs/heads/master
Commit: 4d880304867b55a4f2138617b30600b7fa013b14
Parents: d8d190e
Author: Bryn Keller <bryn.kel...@intel.com>
Authored: Mon Feb 24 17:35:22 2014 -0800
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Mon Feb 24 17:35:22 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/rdd/PairRDDFunctions.scala |  6 +-
 .../spark/rdd/PairRDDFunctionsSuite.scala       | 75 ++++++++++++++++++++
 2 files changed, 80 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4d880304/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 39c3a49..d29a1a9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -29,7 +29,7 @@ import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog
-import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.CompressionCodec
@@ -618,6 +618,10 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: 
RDD[(K, V)])
         attemptNumber)
       val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
       val format = outputFormatClass.newInstance
+      format match {
+        case c: Configurable => c.setConf(wrappedConf.value)
+        case _ => ()
+      }
       val committer = format.getOutputCommitter(hadoopContext)
       committer.setupTask(hadoopContext)
       val writer = 
format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4d880304/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index fa5c9b1..e3e2377 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -23,6 +23,8 @@ import scala.util.Random
 
 import org.scalatest.FunSuite
 import com.google.common.io.Files
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.conf.{Configuration, Configurable}
 
 import org.apache.spark.SparkContext._
 import org.apache.spark.{Partitioner, SharedSparkContext}
@@ -330,4 +332,77 @@ class PairRDDFunctionsSuite extends FunSuite with 
SharedSparkContext {
       (1, ArrayBuffer(1)),
       (2, ArrayBuffer(1))))
   }
+
+  test("saveNewAPIHadoopFile should call setConf if format is configurable") {
+    val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
+
+    // No error, non-configurable formats still work
+    pairs.saveAsNewAPIHadoopFile[FakeFormat]("ignored")
+
+    /*
+      Check that configurable formats get configured:
+      ConfigTestFormat throws an exception if we try to write
+      to it when setConf hasn't been called first.
+      Assertion is in ConfigTestFormat.getRecordWriter.
+     */
+    pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
+  }
 }
+
+/*
+  These classes are fakes for testing
+    "saveNewAPIHadoopFile should call setConf if format is configurable".
+  Unfortunately, they have to be top level classes, and not defined in
+  the test method, because otherwise Scala won't generate no-args constructors
+  and the test will therefore throw InstantiationException when 
saveAsNewAPIHadoopFile
+  tries to instantiate them with Class.newInstance.
+ */
+class FakeWriter extends RecordWriter[Integer, Integer] {
+
+  def close(p1: TaskAttemptContext) = ()
+
+  def write(p1: Integer, p2: Integer) = ()
+
+}
+
+class FakeCommitter extends OutputCommitter {
+  def setupJob(p1: JobContext) = ()
+
+  def needsTaskCommit(p1: TaskAttemptContext): Boolean = false
+
+  def setupTask(p1: TaskAttemptContext) = ()
+
+  def commitTask(p1: TaskAttemptContext) = ()
+
+  def abortTask(p1: TaskAttemptContext) = ()
+}
+
+class FakeFormat() extends OutputFormat[Integer, Integer]() {
+
+  def checkOutputSpecs(p1: JobContext)  = ()
+
+  def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] 
= {
+    new FakeWriter()
+  }
+
+  def getOutputCommitter(p1: TaskAttemptContext): OutputCommitter = {
+    new FakeCommitter()
+  }
+}
+
+class ConfigTestFormat() extends FakeFormat() with Configurable {
+
+  var setConfCalled = false
+  def setConf(p1: Configuration) = {
+    setConfCalled = true
+    ()
+  }
+
+  def getConf: Configuration = null
+
+  override def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, 
Integer] = {
+    assert(setConfCalled, "setConf was never called")
+    super.getRecordWriter(p1)
+  }
+}
+

Reply via email to