Repository: spark
Updated Branches:
  refs/heads/master ca7e460f7 -> 57264400a


[SPARK-8630] [STREAMING] Prevent from checkpointing QueueInputDStream

This PR throws an exception in `QueueInputDStream.writeObject` so that it can 
fail the application when calling `StreamingContext.start` rather than failing 
it during recovering QueueInputDStream.

Author: zsxwing <[email protected]>

Closes #7016 from zsxwing/queueStream-checkpoint and squashes the following 
commits:

89a3d73 [zsxwing] Fix JavaAPISuite.testQueueStream
cc40fd7 [zsxwing] Prevent from checkpointing QueueInputDStream


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/57264400
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/57264400
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/57264400

Branch: refs/heads/master
Commit: 57264400ac7d9f9c59c387c252a9ed8d93fed4fa
Parents: ca7e460
Author: zsxwing <[email protected]>
Authored: Tue Jun 30 11:14:38 2015 -0700
Committer: Tathagata Das <[email protected]>
Committed: Tue Jun 30 11:14:38 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/streaming/StreamingContext.scala |  8 ++++++++
 .../streaming/api/java/JavaStreamingContext.scala | 18 +++++++++++++++---
 .../streaming/dstream/QueueInputDStream.scala     | 15 ++++++++++-----
 .../org/apache/spark/streaming/JavaAPISuite.java  |  8 ++++++++
 .../spark/streaming/StreamingContextSuite.scala   | 15 +++++++++++++++
 5 files changed, 56 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/57264400/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 1708f30..ec49d0f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -477,6 +477,10 @@ class StreamingContext private[streaming] (
   /**
    * Create an input stream from a queue of RDDs. In each batch,
    * it will process either one or all of the RDDs returned by the queue.
+   *
+   * NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to 
recover data of
+   * those RDDs, so `queueStream` doesn't support checkpointing.
+   *
    * @param queue      Queue of RDDs
    * @param oneAtATime Whether only one RDD should be consumed from the queue 
in every interval
    * @tparam T         Type of objects in the RDD
@@ -491,6 +495,10 @@ class StreamingContext private[streaming] (
   /**
    * Create an input stream from a queue of RDDs. In each batch,
    * it will process either one or all of the RDDs returned by the queue.
+   *
+   * NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to 
recover data of
+   * those RDDs, so `queueStream` doesn't support checkpointing.
+   *
    * @param queue      Queue of RDDs
    * @param oneAtATime Whether only one RDD should be consumed from the queue 
in every interval
    * @param defaultRDD Default RDD is returned by the DStream when the queue 
is empty.

http://git-wip-us.apache.org/repos/asf/spark/blob/57264400/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 989e3a7..40deb6d 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -419,7 +419,11 @@ class JavaStreamingContext(val ssc: StreamingContext) 
extends Closeable {
    * Create an input stream from an queue of RDDs. In each batch,
    * it will process either one or all of the RDDs returned by the queue.
    *
-   * NOTE: changes to the queue after the stream is created will not be 
recognized.
+   * NOTE:
+   * 1. Changes to the queue after the stream is created will not be 
recognized.
+   * 2. Arbitrary RDDs can be added to `queueStream`, there is no way to 
recover data of
+   * those RDDs, so `queueStream` doesn't support checkpointing.
+   *
    * @param queue      Queue of RDDs
    * @tparam T         Type of objects in the RDD
    */
@@ -435,7 +439,11 @@ class JavaStreamingContext(val ssc: StreamingContext) 
extends Closeable {
    * Create an input stream from an queue of RDDs. In each batch,
    * it will process either one or all of the RDDs returned by the queue.
    *
-   * NOTE: changes to the queue after the stream is created will not be 
recognized.
+   * NOTE:
+   * 1. Changes to the queue after the stream is created will not be 
recognized.
+   * 2. Arbitrary RDDs can be added to `queueStream`, there is no way to 
recover data of
+   * those RDDs, so `queueStream` doesn't support checkpointing.
+   *
    * @param queue      Queue of RDDs
    * @param oneAtATime Whether only one RDD should be consumed from the queue 
in every interval
    * @tparam T         Type of objects in the RDD
@@ -455,7 +463,11 @@ class JavaStreamingContext(val ssc: StreamingContext) 
extends Closeable {
    * Create an input stream from an queue of RDDs. In each batch,
    * it will process either one or all of the RDDs returned by the queue.
    *
-   * NOTE: changes to the queue after the stream is created will not be 
recognized.
+   * NOTE:
+   * 1. Changes to the queue after the stream is created will not be 
recognized.
+   * 2. Arbitrary RDDs can be added to `queueStream`, there is no way to 
recover data of
+   * those RDDs, so `queueStream` doesn't support checkpointing.
+   *
    * @param queue      Queue of RDDs
    * @param oneAtATime Whether only one RDD should be consumed from the queue 
in every interval
    * @param defaultRDD Default RDD is returned by the DStream when the queue 
is empty

http://git-wip-us.apache.org/repos/asf/spark/blob/57264400/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index ed7da6d..a2f5d82 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -17,13 +17,14 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.UnionRDD
-import scala.collection.mutable.Queue
-import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.streaming.{Time, StreamingContext}
+import java.io.{NotSerializableException, ObjectOutputStream}
+
+import scala.collection.mutable.{ArrayBuffer, Queue}
 import scala.reflect.ClassTag
 
+import org.apache.spark.rdd.{RDD, UnionRDD}
+import org.apache.spark.streaming.{Time, StreamingContext}
+
 private[streaming]
 class QueueInputDStream[T: ClassTag](
     @transient ssc: StreamingContext,
@@ -36,6 +37,10 @@ class QueueInputDStream[T: ClassTag](
 
   override def stop() { }
 
+  private def writeObject(oos: ObjectOutputStream): Unit = {
+    throw new NotSerializableException("queueStream doesn't support 
checkpointing")
+  }
+
   override def compute(validTime: Time): Option[RDD[T]] = {
     val buffer = new ArrayBuffer[RDD[T]]()
     if (oneAtATime && queue.size > 0) {

http://git-wip-us.apache.org/repos/asf/spark/blob/57264400/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java 
b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 1077b1b..a34f234 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -364,6 +364,14 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
   @SuppressWarnings("unchecked")
   @Test
   public void testQueueStream() {
+    ssc.stop();
+    // Create a new JavaStreamingContext without checkpointing
+    SparkConf conf = new SparkConf()
+        .setMaster("local[2]")
+        .setAppName("test")
+        .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
+    ssc = new JavaStreamingContext(conf, new Duration(1000));
+
     List<List<Integer>> expected = Arrays.asList(
         Arrays.asList(1,2,3),
         Arrays.asList(4,5,6),

http://git-wip-us.apache.org/repos/asf/spark/blob/57264400/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 819dd2c..56b4ce5 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.streaming
 import java.io.{File, NotSerializableException}
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.collection.mutable.Queue
+
 import org.apache.commons.io.FileUtils
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.concurrent.Timeouts
@@ -665,6 +667,19 @@ class StreamingContextSuite extends SparkFunSuite with 
BeforeAndAfter with Timeo
       transformed.foreachRDD { rdd => rdd.collect() } }
   }
 
+  test("queueStream doesn't support checkpointing") {
+    val checkpointDir = Utils.createTempDir()
+    ssc = new StreamingContext(master, appName, batchDuration)
+    val rdd = ssc.sparkContext.parallelize(1 to 10)
+    ssc.queueStream[Int](Queue(rdd)).print()
+    ssc.checkpoint(checkpointDir.getAbsolutePath)
+    val e = intercept[NotSerializableException] {
+      ssc.start()
+    }
+    // StreamingContext.validate changes the message, so use "contains" here
+    assert(e.getMessage.contains("queueStream doesn't support checkpointing"))
+  }
+
   def addInputStream(s: StreamingContext): DStream[Int] = {
     val input = (1 to 100).map(i => 1 to i)
     val inputStream = new TestInputStream(s, input, 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to