Repository: spark Updated Branches: refs/heads/branch-0.9 0f0d044c3 -> 6f0db0ada
For outputformats that are Configurable, call setConf before sending data to them. [SPARK-1108] This allows us to use, e.g. HBase's TableOutputFormat with PairRDDFunctions.saveAsNewAPIHadoopFile, which otherwise would throw NullPointerException because the output table name hasn't been configured. Note this bug also affects branch-0.9 Author: Bryn Keller <[email protected]> Closes #638 from xoltar/SPARK-1108 and squashes the following commits: 7e94e7d [Bryn Keller] Import, comment, and format cleanup per code review 7cbcaa1 [Bryn Keller] For outputformats that are Configurable, call setConf before sending data to them. This allows us to use, e.g. HBase TableOutputFormat, which otherwise would throw NullPointerException because the output table name hasn't been configured (cherry picked from commit 4d880304867b55a4f2138617b30600b7fa013b14) Conflicts: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f0db0ad Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f0db0ad Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f0db0ad Branch: refs/heads/branch-0.9 Commit: 6f0db0ada859e9f5493acc087061f7c529b706a0 Parents: 0f0d044 Author: Bryn Keller <[email protected]> Authored: Mon Feb 24 17:35:22 2014 -0800 Committer: Patrick Wendell <[email protected]> Committed: Sun Mar 9 17:47:46 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/rdd/PairRDDFunctions.scala | 6 +- .../spark/rdd/PairRDDFunctionsSuite.scala | 76 ++++++++++++++++++++ 2 files changed, 81 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6f0db0ad/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 9bafe41..0b2917b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -28,7 +28,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import scala.reflect.{ClassTag, classTag} -import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.fs.Path import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.CompressionCodec @@ -620,6 +620,10 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) attemptNumber) val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) val format = outputFormatClass.newInstance + format match { + case c: Configurable => c.setConf(wrappedConf.value) + case _ => () + } val committer = format.getOutputCommitter(hadoopContext) committer.setupTask(hadoopContext) val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]] http://git-wip-us.apache.org/repos/asf/spark/blob/6f0db0ad/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 5da538a..9c78630 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -24,6 +24,9 @@ import scala.util.Random import org.scalatest.FunSuite import com.google.common.io.Files +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.conf.{Configuration, Configurable} + import org.apache.spark.SparkContext._ import org.apache.spark.{Partitioner, SharedSparkContext} @@ -331,4 +334,77 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { (1, ArrayBuffer(1)), (2, ArrayBuffer(1)))) } + + test("saveNewAPIHadoopFile should call setConf if format is configurable") { + val pairs = sc.parallelize(Array((new Integer(1), new Integer(1)))) + + // No error, non-configurable formats still work + pairs.saveAsNewAPIHadoopFile[FakeFormat]("ignored") + + /* + Check that configurable formats get configured: + ConfigTestFormat throws an exception if we try to write + to it when setConf hasn't been called first. + Assertion is in ConfigTestFormat.getRecordWriter. + */ + pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored") + } +} + +/* + These classes are fakes for testing + "saveNewAPIHadoopFile should call setConf if format is configurable". + Unfortunately, they have to be top level classes, and not defined in + the test method, because otherwise Scala won't generate no-args constructors + and the test will therefore throw InstantiationException when saveAsNewAPIHadoopFile + tries to instantiate them with Class.newInstance. + */ +class FakeWriter extends RecordWriter[Integer, Integer] { + + def close(p1: TaskAttemptContext) = () + + def write(p1: Integer, p2: Integer) = () + +} + +class FakeCommitter extends OutputCommitter { + def setupJob(p1: JobContext) = () + + def needsTaskCommit(p1: TaskAttemptContext): Boolean = false + + def setupTask(p1: TaskAttemptContext) = () + + def commitTask(p1: TaskAttemptContext) = () + + def abortTask(p1: TaskAttemptContext) = () +} + +class FakeFormat() extends OutputFormat[Integer, Integer]() { + + def checkOutputSpecs(p1: JobContext) = () + + def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = { + new FakeWriter() + } + + def getOutputCommitter(p1: TaskAttemptContext): OutputCommitter = { + new FakeCommitter() + } } + +class ConfigTestFormat() extends FakeFormat() with Configurable { + + var setConfCalled = false + def setConf(p1: Configuration) = { + setConfCalled = true + () + } + + def getConf: Configuration = null + + override def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = { + assert(setConfCalled, "setConf was never called") + super.getRecordWriter(p1) + } +} +
