Repository: spark
Updated Branches:
  refs/heads/master 864de3bf4 -> a16396df7


[SPARK-10772] [STREAMING] [SCALA] NullPointerException when transform function 
in DStream returns NULL

Currently, the ```TransformedDStream``` will using 
```Some(transformFunc(parentRDDs, validTime))``` as compute return value, when 
the ```transformFunc``` somehow returns null as return value, the followed 
operator will have NullPointerExeception.

This fix uses the ```Option()``` instead of ```Some()``` to deal with the 
possible null value. When   ```transformFunc``` returns ```null```, the option 
will transform null to ```None```, the downstream can handle ```None``` 
correctly.

NOTE (2015-09-25): The latest fix will check the return value of transform 
function, if it is ```NULL```, a spark exception will be thrown out

Author: Jacker Hu <[email protected]>
Author: jhu-chang <[email protected]>

Closes #8881 from jhu-chang/Fix_Transform.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a16396df
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a16396df
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a16396df

Branch: refs/heads/master
Commit: a16396df76cc27099011bfb96b28cbdd7f964ca8
Parents: 864de3b
Author: Jacker Hu <[email protected]>
Authored: Sat Oct 10 11:36:18 2015 +0100
Committer: Sean Owen <[email protected]>
Committed: Sat Oct 10 11:36:18 2015 +0100

----------------------------------------------------------------------
 .../spark/streaming/dstream/TransformedDStream.scala   | 12 ++++++++++--
 .../apache/spark/streaming/BasicOperationsSuite.scala  | 13 +++++++++++++
 2 files changed, 23 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a16396df/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index 5d46ca0..ab01f47 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark.streaming.dstream
 
+import scala.reflect.ClassTag
+
+import org.apache.spark.SparkException
 import org.apache.spark.rdd.{PairRDDFunctions, RDD}
 import org.apache.spark.streaming.{Duration, Time}
-import scala.reflect.ClassTag
 
 private[streaming]
 class TransformedDStream[U: ClassTag] (
@@ -38,6 +40,12 @@ class TransformedDStream[U: ClassTag] (
 
   override def compute(validTime: Time): Option[RDD[U]] = {
     val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
-    Some(transformFunc(parentRDDs, validTime))
+    val transformedRDD = transformFunc(parentRDDs, validTime)
+    if (transformedRDD == null) {
+      throw new SparkException("Transform function must not return null. " +
+        "Return SparkContext.emptyRDD() instead to represent no element " +
+        "as the result of transformation.")
+    }
+    Some(transformedRDD)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a16396df/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 2553768..9988f41 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -211,6 +211,19 @@ class BasicOperationsSuite extends TestSuiteBase {
     )
   }
 
+  test("transform with NULL") {
+    val input = Seq(1 to 4)
+    intercept[SparkException] {
+      testOperation(
+        input,
+        (r: DStream[Int]) => r.transform(rdd => null.asInstanceOf[RDD[Int]]),
+        Seq(Seq()),
+        1,
+        false
+      )
+    }
+  }
+
   test("transformWith") {
     val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
     val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("")   )


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to