Updated Branches: refs/heads/branch-0.8 17ca8a1c5 -> d77c3371b
Merge pull request #232 from markhamstra/FiniteWait jobWaiter.synchronized before jobWaiter.wait ...else ``IllegalMonitorStateException`` in ``SimpleFutureAction#ready``. (cherry picked from commit 078049877e123fe7e4c4553e36055de572cab7c4) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d77c3371 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d77c3371 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d77c3371 Branch: refs/heads/branch-0.8 Commit: d77c3371b6b189351a2c23cb533777bbd08684c8 Parents: 17ca8a1 Author: Reynold Xin <[email protected]> Authored: Thu Dec 5 23:29:42 2013 -0800 Committer: Reynold Xin <[email protected]> Committed: Thu Dec 5 23:30:11 2013 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/FutureAction.scala | 2 +- .../org/apache/spark/scheduler/JobWaiter.scala | 1 + .../apache/spark/rdd/AsyncRDDActionsSuite.scala | 26 ++++++++++++++++++++ 3 files changed, 28 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d77c3371/core/src/main/scala/org/apache/spark/FutureAction.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 1ad9240..c6b4ac5 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -99,7 +99,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = { if (!atMost.isFinite()) { awaitResult() - } else { + } else jobWaiter.synchronized { val finishTime = System.currentTimeMillis() + atMost.toMillis while (!isCompleted) { val time = System.currentTimeMillis() http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d77c3371/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index 58f238d..b026f86 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -31,6 +31,7 @@ private[spark] class JobWaiter[T]( private var finishedTasks = 0 // Is the job as a whole finished (succeeded or failed)? + @volatile private var _jobFinished = totalTasks == 0 def jobFinished = _jobFinished http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d77c3371/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index da032b1..0d4c10d 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.rdd import java.util.concurrent.Semaphore +import scala.concurrent.{Await, TimeoutException} +import scala.concurrent.duration.Duration import scala.concurrent.ExecutionContext.Implicits.global import org.scalatest.{BeforeAndAfterAll, FunSuite} @@ -173,4 +175,28 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll with Timeouts sem.acquire(2) } } + + /** + * Awaiting FutureAction results + */ + test("FutureAction result, infinite wait") { + val f = sc.parallelize(1 to 100, 4) + .countAsync() + assert(Await.result(f, Duration.Inf) === 100) + } + + test("FutureAction result, finite wait") { + val f = sc.parallelize(1 to 100, 4) + .countAsync() + assert(Await.result(f, Duration(30, "seconds")) === 100) + } + + test("FutureAction result, timeout") { + val f = sc.parallelize(1 to 100, 4) + .mapPartitions(itr => { Thread.sleep(20); itr }) + .countAsync() + intercept[TimeoutException] { + Await.result(f, Duration(20, "milliseconds")) + } + } }
