Repository: spark Updated Branches: refs/heads/master 934876741 -> 757b14b86
[SPARK-5943][Streaming] Update the test to use new API to reduce the warning Author: Saisai Shao <saisai.s...@intel.com> Closes #4722 from jerryshao/SPARK-5943 and squashes the following commits: 1b01233 [Saisai Shao] Update the test to use new API to reduce the warning Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/757b14b8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/757b14b8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/757b14b8 Branch: refs/heads/master Commit: 757b14b862a1d39c1bad7b321dae1a3ea8338fbb Parents: 9348767 Author: Saisai Shao <saisai.s...@intel.com> Authored: Mon Feb 23 11:27:27 2015 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Mon Feb 23 11:27:27 2015 +0000 ---------------------------------------------------------------------- python/pyspark/streaming/context.py | 2 +- .../apache/spark/streaming/StreamingContextSuite.scala | 10 +++++----- .../scala/org/apache/spark/streaming/TestSuiteBase.scala | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/757b14b8/python/pyspark/streaming/context.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index b06ab65..2c73083 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -189,7 +189,7 @@ class StreamingContext(object): if timeout is None: self._jssc.awaitTermination() else: - self._jssc.awaitTermination(int(timeout * 1000)) + self._jssc.awaitTerminationOrTimeout(int(timeout * 1000)) def awaitTerminationOrTimeout(self, timeout): """ http://git-wip-us.apache.org/repos/asf/spark/blob/757b14b8/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 2aa5e08..6a7cd97 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -190,7 +190,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w logInfo("Count = " + count + ", Running count = " + runningCount) } ssc.start() - ssc.awaitTermination(500) + ssc.awaitTerminationOrTimeout(500) ssc.stop(stopSparkContext = false, stopGracefully = true) logInfo("Running count = " + runningCount) logInfo("TestReceiver.counter = " + TestReceiver.counter.get()) @@ -223,7 +223,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w logInfo("Count = " + count + ", Running count = " + runningCount) } ssc.start() - ssc.awaitTermination(500) + ssc.awaitTerminationOrTimeout(500) ssc.stop(stopSparkContext = false, stopGracefully = true) logInfo("Running count = " + runningCount) assert(runningCount > 0) @@ -243,7 +243,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w // test whether awaitTermination() exits after give amount of time failAfter(1000 millis) { - ssc.awaitTermination(500) + ssc.awaitTerminationOrTimeout(500) } // test whether awaitTermination() does not exit if not time is given @@ -288,7 +288,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w val exception = intercept[Exception] { ssc.start() - ssc.awaitTermination(5000) + ssc.awaitTerminationOrTimeout(5000) } assert(exception.getMessage.contains("map task"), "Expected exception not thrown") } @@ -299,7 +299,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w inputStream.transform { rdd => throw new TestException("error in transform"); rdd }.register() val exception = intercept[TestException] { ssc.start() - ssc.awaitTermination(5000) + ssc.awaitTerminationOrTimeout(5000) } assert(exception.getMessage.contains("transform"), "Expected exception not thrown") } http://git-wip-us.apache.org/repos/asf/spark/blob/757b14b8/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index c2375ff..3565d62 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -348,7 +348,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val startTime = System.currentTimeMillis() while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput) - ssc.awaitTermination(50) + ssc.awaitTerminationOrTimeout(50) } val timeTaken = System.currentTimeMillis() - startTime logInfo("Output generated in " + timeTaken + " milliseconds") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org