jobWaiter.synchronized before jobWaiter.wait
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/aebb123f Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/aebb123f Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/aebb123f Branch: refs/heads/scala-2.10 Commit: aebb123fd3b4bf0d57d867f33ca0325340ee42e4 Parents: 5d46025 Author: Mark Hamstra <[email protected]> Authored: Thu Dec 5 17:16:44 2013 -0800 Committer: Mark Hamstra <[email protected]> Committed: Thu Dec 5 17:16:44 2013 -0800 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/FutureAction.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/aebb123f/core/src/main/scala/org/apache/spark/FutureAction.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 1ad9240..c6b4ac5 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -99,7 +99,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = { if (!atMost.isFinite()) { awaitResult() - } else { + } else jobWaiter.synchronized { val finishTime = System.currentTimeMillis() + atMost.toMillis while (!isCompleted) { val time = System.currentTimeMillis() http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/aebb123f/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index 58f238d..b026f86 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -31,6 +31,7 @@ private[spark] class JobWaiter[T]( private var finishedTasks = 0 // Is the job as a whole finished (succeeded or failed)? + @volatile private var _jobFinished = totalTasks == 0 def jobFinished = _jobFinished
