This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2153b31 [SPARK-26892][CORE] Fix saveAsTextFile throws NullPointerException when null row present 2153b31 is described below commit 2153b316bda119ede8c80ceda522027a6581031b Author: liupengcheng <liupengch...@xiaomi.com> AuthorDate: Wed Feb 20 16:42:55 2019 -0600 [SPARK-26892][CORE] Fix saveAsTextFile throws NullPointerException when null row present ## What changes were proposed in this pull request? Currently, RDD.saveAsTextFile may throw NullPointerException then null row is present. ``` scala> sc.parallelize(Seq(1,null),1).saveAsTextFile("/tmp/foobar.dat") 19/02/15 21:39:17 ERROR Utils: Aborting task java.lang.NullPointerException at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$3(RDD.scala:1510) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:129) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1352) at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:127) at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:83) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:425) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1318) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:428) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` This PR write "Null" for null row to avoid NPE and fix it. ## How was this patch tested? NA Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #23799 from liupc/Fix-saveAsTextFile-throws-NullPointerException-when-null-row-present. Lead-authored-by: liupengcheng <liupengch...@xiaomi.com> Co-authored-by: Liupengcheng <liupengch...@xiaomi.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../org/apache/spark/rdd/PairRDDFunctions.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 32 +++------------------- .../test/scala/org/apache/spark/FileSuite.scala | 8 ++++++ 3 files changed, 13 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 8b5f9bb..7f8064f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -1012,7 +1012,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]): Unit = self.withScope { saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, - new JobConf(self.context.hadoopConfiguration), Some(codec)) + new JobConf(self.context.hadoopConfiguration), Option(codec)) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index d39f418..1b67e99 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1492,45 +1492,21 @@ abstract class RDD[T: ClassTag]( * Save this RDD as a text file, using string representations of elements. */ def saveAsTextFile(path: String): Unit = withScope { - // https://issues.apache.org/jira/browse/SPARK-2075 - // - // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit - // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]` - // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an - // Ordering for `NullWritable`. That's why the compiler will generate different anonymous - // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+. - // - // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate - // same bytecodes for `saveAsTextFile`. - val nullWritableClassTag = implicitly[ClassTag[NullWritable]] - val textClassTag = implicitly[ClassTag[Text]] - val r = this.mapPartitions { iter => - val text = new Text() - iter.map { x => - text.set(x.toString) - (NullWritable.get(), text) - } - } - RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) - .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) + saveAsTextFile(path, null) } /** * Save this RDD as a compressed text file, using string representations of elements. */ def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = withScope { - // https://issues.apache.org/jira/browse/SPARK-2075 - val nullWritableClassTag = implicitly[ClassTag[NullWritable]] - val textClassTag = implicitly[ClassTag[Text]] - val r = this.mapPartitions { iter => + this.mapPartitions { iter => val text = new Text() iter.map { x => + require(x != null, "text files do not allow null rows") text.set(x.toString) (NullWritable.get(), text) } - } - RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) - .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec) + }.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec) } /** diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 983a791..c540d7b 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -94,6 +94,14 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { assert(compressedFile.length < normalFile.length) } + test("text files do not allow null rows") { + sc = new SparkContext("local", "test") + val outputDir = new File(tempDir, "output").getAbsolutePath + val nums = sc.makeRDD((1 to 100) ++ Seq(null)) + val exception = intercept[SparkException](nums.saveAsTextFile(outputDir)) + assert(Utils.exceptionString(exception).contains("text files do not allow null rows")) + } + test("SequenceFiles") { sc = new SparkContext("local", "test") val outputDir = new File(tempDir, "output").getAbsolutePath --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org