Repository: spark
Updated Branches:
  refs/heads/master 87cf35c2d -> 058797c17


[Spark-1382] Fix NPE in DStream.slice (updated version of #365)

@zsxwing I cherry-picked your changes and merged the master. #365 had some 
conflicts once again!

Author: zsxwing <[email protected]>
Author: Tathagata Das <[email protected]>

Closes #562 from tdas/SPARK-1382 and squashes the following commits:

e2962c1 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' 
into SPARK-1382
20968d9 [zsxwing] Replace Exception with SparkException in DStream
e476651 [zsxwing] Merge remote-tracking branch 'origin/master' into SPARK-1382
35ba56a [zsxwing] SPARK-1382: Fix NPE in DStream.slice


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/058797c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/058797c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/058797c1

Branch: refs/heads/master
Commit: 058797c1722c9251f6bc6ad2672cb0e79146b04f
Parents: 87cf35c
Author: zsxwing <[email protected]>
Authored: Fri Apr 25 19:04:34 2014 -0700
Committer: Tathagata Das <[email protected]>
Committed: Fri Apr 25 19:04:34 2014 -0700

----------------------------------------------------------------------
 .../spark/streaming/dstream/DStream.scala       | 22 +++++++++++---------
 .../spark/streaming/BasicOperationsSuite.scala  | 12 ++++++++++-
 2 files changed, 23 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/058797c1/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index f69f69e..4709a62 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -18,20 +18,19 @@
 package org.apache.spark.streaming.dstream
 
 
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+
 import scala.deprecated
 import scala.collection.mutable.HashMap
 import scala.reflect.ClassTag
 
-import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
-
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkException}
 import org.apache.spark.rdd.{BlockRDD, RDD}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.MetadataCleaner
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.scheduler.Job
-import org.apache.spark.streaming.Duration
+import org.apache.spark.util.MetadataCleaner
 
 /**
  * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, 
is a continuous
@@ -144,7 +143,7 @@ abstract class DStream[T: ClassTag] (
    */
   private[streaming] def initialize(time: Time) {
     if (zeroTime != null && zeroTime != time) {
-      throw new Exception("ZeroTime is already initialized to " + zeroTime
+      throw new SparkException("ZeroTime is already initialized to " + zeroTime
         + ", cannot initialize it again to " + time)
     }
     zeroTime = time
@@ -220,7 +219,7 @@ abstract class DStream[T: ClassTag] (
         "which requires " + this.getClass.getSimpleName + " to remember 
generated RDDs for more " +
         "than " + rememberDuration.milliseconds / 1000 + " seconds. But 
Spark's metadata cleanup" +
         "delay is set to " + metadataCleanerDelay + " seconds, which is not 
sufficient. Please " +
-        "set the Java property 'spark.cleaner.delay' to more than " +
+        "set the Java cleaner delay to more than " +
         math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds."
     )
 
@@ -235,7 +234,7 @@ abstract class DStream[T: ClassTag] (
 
   private[streaming] def setContext(s: StreamingContext) {
     if (ssc != null && ssc != s) {
-      throw new Exception("Context is already set in " + this + ", cannot set 
it again")
+      throw new SparkException("Context is already set in " + this + ", cannot 
set it again")
     }
     ssc = s
     logInfo("Set context for " + this)
@@ -244,7 +243,7 @@ abstract class DStream[T: ClassTag] (
 
   private[streaming] def setGraph(g: DStreamGraph) {
     if (graph != null && graph != g) {
-      throw new Exception("Graph is already set in " + this + ", cannot set it 
again")
+      throw new SparkException("Graph is already set in " + this + ", cannot 
set it again")
     }
     graph = g
     dependencies.foreach(_.setGraph(graph))
@@ -261,7 +260,7 @@ abstract class DStream[T: ClassTag] (
   /** Checks whether the 'time' is valid wrt slideDuration for generating RDD 
*/
   private[streaming] def isTimeValid(time: Time): Boolean = {
     if (!isInitialized) {
-      throw new Exception (this + " has not been initialized")
+      throw new SparkException (this + " has not been initialized")
     } else if (time <= zeroTime || ! (time - 
zeroTime).isMultipleOf(slideDuration)) {
       logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime +
         " and slideDuration is " + slideDuration + " and difference is " + 
(time - zeroTime))
@@ -728,6 +727,9 @@ abstract class DStream[T: ClassTag] (
    * Return all the RDDs between 'fromTime' to 'toTime' (both included)
    */
   def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
+    if (!isInitialized) {
+      throw new SparkException(this + " has not been initialized")
+    }
     if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
       logWarning("fromTime (" + fromTime + ") is not a multiple of 
slideDuration ("
         + slideDuration + ")")

http://git-wip-us.apache.org/repos/asf/spark/blob/058797c1/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 4792ca1..0492588 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.rdd.{BlockRDD, RDD}
 import org.apache.spark.SparkContext._
 
 import util.ManualClock
-import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.{SparkException, SparkConf}
 import org.apache.spark.streaming.dstream.{WindowedDStream, DStream}
 import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
 import scala.reflect.ClassTag
@@ -398,6 +398,16 @@ class BasicOperationsSuite extends TestSuiteBase {
     Thread.sleep(1000)
   }
 
+  test("slice - has not been initialized") {
+    val ssc = new StreamingContext(conf, Seconds(1))
+    val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4))
+    val stream = new TestInputStream[Int](ssc, input, 2)
+    val thrown = intercept[SparkException] {
+      stream.slice(new Time(0), new Time(1000))
+    }
+    assert(thrown.getMessage.contains("has not been initialized"))
+  }
+
   val cleanupTestInput = (0 until 10).map(x => Seq(x, x + 1)).toSeq
 
   test("rdd cleanup - map and window") {

Reply via email to