Repository: spark
Updated Branches:
  refs/heads/master 80429f3e2 -> ee6f7e22a


[SPARK-1615] Synchronize accesses to the LiveListenerBus' event queue

Original poster is @zsxwing, who reported this bug in #516.

Much of SparkListenerSuite relies on LiveListenerBus's `waitUntilEmpty()` 
method. As the name suggests, this waits until the event queue is empty. 
However, the following race condition could happen:

(1) We dequeue an event
(2) The queue is empty, we return true (even though the event has not been 
processed)
(3) The test asserts something assuming that all listeners have finished 
executing (and fails)
(4) The listeners receive and process the event

This PR makes (1) and (4) atomic by synchronizing around it. To do that, 
however, we must avoid using `eventQueue.take`, which is blocking and will 
cause a deadlock if we synchronize around it. As a workaround, we use the 
non-blocking `eventQueue.poll` + a semaphore to provide the same semantics.

This has been a possible race condition for a long time, but for some reason 
we've never run into it.

Author: Andrew Or <andrewo...@gmail.com>

Closes #544 from andrewor14/stage-info-test-fix and squashes the following 
commits:

3cbe40c [Andrew Or] Merge github.com:apache/spark into stage-info-test-fix
56dbbcb [Andrew Or] Check if event is actually added before releasing semaphore
eb486ae [Andrew Or] Synchronize accesses to the LiveListenerBus' event queue


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee6f7e22
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee6f7e22
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee6f7e22

Branch: refs/heads/master
Commit: ee6f7e22a449837864072e3cd2b6696005f134f1
Parents: 80429f3
Author: Andrew Or <andrewo...@gmail.com>
Authored: Thu Apr 24 20:18:15 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Thu Apr 24 20:18:15 2014 -0700

----------------------------------------------------------------------
 .../spark/scheduler/LiveListenerBus.scala       | 38 ++++++++++++++------
 .../spark/scheduler/SparkListenerSuite.scala    |  5 +--
 2 files changed, 31 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ee6f7e22/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index cbac4c1..dec3316 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler
 
-import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.{LinkedBlockingQueue, Semaphore}
 
 import org.apache.spark.Logging
 
@@ -36,16 +36,24 @@ private[spark] class LiveListenerBus extends 
SparkListenerBus with Logging {
   private val eventQueue = new 
LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
   private var queueFullErrorMessageLogged = false
   private var started = false
+
+  // A counter that represents the number of events produced and consumed in 
the queue
+  private val eventLock = new Semaphore(0)
+
   private val listenerThread = new Thread("SparkListenerBus") {
     setDaemon(true)
     override def run() {
       while (true) {
-        val event = eventQueue.take
-        if (event == SparkListenerShutdown) {
-          // Get out of the while loop and shutdown the daemon thread
-          return
+        eventLock.acquire()
+        // Atomically remove and process this event
+        LiveListenerBus.this.synchronized {
+          val event = eventQueue.poll
+          if (event == SparkListenerShutdown) {
+            // Get out of the while loop and shutdown the daemon thread
+            return
+          }
+          Option(event).foreach(postToAll)
         }
-        postToAll(event)
       }
     }
   }
@@ -67,7 +75,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus 
with Logging {
 
   def post(event: SparkListenerEvent) {
     val eventAdded = eventQueue.offer(event)
-    if (!eventAdded && !queueFullErrorMessageLogged) {
+    if (eventAdded) {
+      eventLock.release()
+    } else if (!queueFullErrorMessageLogged) {
       logError("Dropping SparkListenerEvent because no remaining room in event 
queue. " +
         "This likely means one of the SparkListeners is too slow and cannot 
keep up with the " +
         "rate at which tasks are being started by the scheduler.")
@@ -76,13 +86,13 @@ private[spark] class LiveListenerBus extends 
SparkListenerBus with Logging {
   }
 
   /**
-   * Waits until there are no more events in the queue, or until the specified 
time has elapsed.
-   * Used for testing only. Returns true if the queue has emptied and false is 
the specified time
+   * For testing only. Wait until there are no more events in the queue, or 
until the specified
+   * time has elapsed. Return true if the queue has emptied and false is the 
specified time
    * elapsed before the queue emptied.
    */
   def waitUntilEmpty(timeoutMillis: Int): Boolean = {
     val finishTime = System.currentTimeMillis + timeoutMillis
-    while (!eventQueue.isEmpty) {
+    while (!queueIsEmpty) {
       if (System.currentTimeMillis > finishTime) {
         return false
       }
@@ -93,6 +103,14 @@ private[spark] class LiveListenerBus extends 
SparkListenerBus with Logging {
     true
   }
 
+  /**
+   * Return whether the event queue is empty.
+   *
+   * The use of synchronized here guarantees that all events that once 
belonged to this queue
+   * have already been processed by all attached listeners, if this returns 
true.
+   */
+  def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }
+
   def stop() {
     if (!started) {
       throw new IllegalStateException("Attempted to stop a listener bus that 
has not yet started!")

http://git-wip-us.apache.org/repos/asf/spark/blob/ee6f7e22/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index ba048ce..4e9fd07 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -29,7 +29,8 @@ import org.apache.spark.SparkContext._
 import org.apache.spark.executor.TaskMetrics
 
 class SparkListenerSuite extends FunSuite with LocalSparkContext with 
ShouldMatchers
-    with BeforeAndAfter with BeforeAndAfterAll {
+  with BeforeAndAfter with BeforeAndAfterAll {
+
   /** Length of time to wait while draining listener events. */
   val WAIT_TIMEOUT_MILLIS = 10000
 
@@ -37,7 +38,7 @@ class SparkListenerSuite extends FunSuite with 
LocalSparkContext with ShouldMatc
     sc = new SparkContext("local", "SparkListenerSuite")
   }
 
-  override def afterAll {
+  override def afterAll() {
     System.clearProperty("spark.akka.frameSize")
   }
 

Reply via email to