Repository: spark
Updated Branches:
  refs/heads/master ef1047fca -> 1b144455b


[SPARK-13399][STREAMING] Fix checkpointsuite type erasure warnings

## What changes were proposed in this pull request?

Change the checkpointsuite getting the outputstreams to explicitly be unchecked 
on the generic type so as to avoid the warnings. This only impacts test code.

Alternatively we could encode the type tag in the 
TestOutputStreamWithPartitions and filter the type tag as well - but this is 
unnecessary since multiple testoutputstreams are not registered and the 
previous code was not actually checking this type.

## How was the this patch tested?

unit tests (streaming/testOnly org.apache.spark.streaming.CheckpointSuite)

Author: Holden Karau <hol...@us.ibm.com>

Closes #11286 from holdenk/SPARK-13399-checkpointsuite-type-erasure.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b144455
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b144455
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b144455

Branch: refs/heads/master
Commit: 1b144455b620861d8cc790d3fc69902717f14524
Parents: ef1047f
Author: Holden Karau <hol...@us.ibm.com>
Authored: Mon Feb 22 09:50:51 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Feb 22 09:50:51 2016 +0000

----------------------------------------------------------------------
 .../apache/spark/streaming/CheckpointSuite.scala | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1b144455/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index dada495..ca716cf 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -133,6 +133,17 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
     new StreamingContext(SparkContext.getOrCreate(conf), batchDuration)
   }
 
+  /**
+   * Get the first TestOutputStreamWithPartitions, does not check the provided 
generic type.
+   */
+  protected def getTestOutputStream[V: ClassTag](streams: Array[DStream[_]]):
+    TestOutputStreamWithPartitions[V] = {
+    streams.collect {
+      case ds: TestOutputStreamWithPartitions[V @unchecked] => ds
+    }.head
+  }
+
+
   protected def generateOutput[V: ClassTag](
       ssc: StreamingContext,
       targetBatchTime: Time,
@@ -150,9 +161,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
       clock.setTime(targetBatchTime.milliseconds)
       logInfo("Manual clock after advancing = " + clock.getTimeMillis())
 
-      val outputStream = ssc.graph.getOutputStreams().filter { dstream =>
-        dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
-      }.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
+      val outputStream = getTestOutputStream[V](ssc.graph.getOutputStreams())
 
       eventually(timeout(10 seconds)) {
         ssc.awaitTerminationOrTimeout(10)
@@ -908,9 +917,7 @@ class CheckpointSuite extends TestSuiteBase with 
DStreamCheckpointTester
     logInfo("Manual clock after advancing = " + clock.getTimeMillis())
     Thread.sleep(batchDuration.milliseconds)
 
-    val outputStream = ssc.graph.getOutputStreams().filter { dstream =>
-      dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
-    }.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
+    val outputStream = getTestOutputStream[V](ssc.graph.getOutputStreams())
     outputStream.output.asScala.map(_.flatten)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to