Repository: spark Updated Branches: refs/heads/branch-1.0 e43e31ded -> f0abf5f08
Fixing a race condition in event listener unit test Author: Kan Zhang <[email protected]> Closes #401 from kanzhang/fix-1475 and squashes the following commits: c6058bd [Kan Zhang] Fixing a race condition in event listener unit test (cherry picked from commit 38877ccf394a50bfd37c8433d4aafaa91683d3b8) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0abf5f0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0abf5f0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0abf5f0 Branch: refs/heads/branch-1.0 Commit: f0abf5f08208f19c9ff966c43c68dbdebbd28a07 Parents: e43e31d Author: Kan Zhang <[email protected]> Authored: Wed Apr 16 17:39:11 2014 -0700 Committer: Reynold Xin <[email protected]> Committed: Wed Apr 16 17:39:22 2014 -0700 ---------------------------------------------------------------------- .../spark/scheduler/LiveListenerBus.scala | 4 --- .../spark/scheduler/SparkListenerSuite.scala | 28 +++++++++++++------- 2 files changed, 19 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f0abf5f0/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 545fa45..cbac4c1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -50,9 +50,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { } } - // Exposed for testing - @volatile private[spark] var stopCalled = false - /** * Start sending events to attached listeners. * @@ -97,7 +94,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { } def stop() { - stopCalled = true if (!started) { throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!") } http://git-wip-us.apache.org/repos/asf/spark/blob/f0abf5f0/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 4cdccdd..36511a9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -77,14 +77,21 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc test("bus.stop() waits for the event queue to completely drain") { @volatile var drained = false + // When Listener has started + val listenerStarted = new Semaphore(0) + // Tells the listener to stop blocking - val listenerWait = new Semaphore(1) + val listenerWait = new Semaphore(0) + + // When stopper has started + val stopperStarted = new Semaphore(0) - // When stop has returned - val stopReturned = new Semaphore(1) + // When stopper has returned + val stopperReturned = new Semaphore(0) class BlockingListener extends SparkListener { override def onJobEnd(jobEnd: SparkListenerJobEnd) = { + listenerStarted.release() listenerWait.acquire() drained = true } @@ -97,23 +104,26 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc bus.start() bus.post(SparkListenerJobEnd(0, JobSucceeded)) - // the queue should not drain immediately + listenerStarted.acquire() + // Listener should be blocked after start assert(!drained) new Thread("ListenerBusStopper") { override def run() { + stopperStarted.release() // stop() will block until notify() is called below bus.stop() - stopReturned.release(1) + stopperReturned.release() } }.start() - while (!bus.stopCalled) { - Thread.sleep(10) - } + stopperStarted.acquire() + // Listener should remain blocked after stopper started + assert(!drained) + // unblock Listener to let queue drain listenerWait.release() - stopReturned.acquire() + stopperReturned.acquire() assert(drained) }
