Repository: spark
Updated Branches:
  refs/heads/master bde9cc11f -> eb5f2b642


SPARK-1407 drain event queue before stopping event logger

Author: Kan Zhang <kzh...@apache.org>

Closes #366 from kanzhang/SPARK-1407 and squashes the following commits:

cd0629f [Kan Zhang] code refactoring and adding test
b073ee6 [Kan Zhang] SPARK-1407 drain event queue before stopping event logger


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb5f2b64
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb5f2b64
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb5f2b64

Branch: refs/heads/master
Commit: eb5f2b64230faa69a53815cb61bcc87aeb233d20
Parents: bde9cc1
Author: Kan Zhang <kzh...@apache.org>
Authored: Wed Apr 9 15:24:33 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Wed Apr 9 15:25:29 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  4 +-
 .../spark/scheduler/LiveListenerBus.scala       | 32 ++++++++------
 .../spark/scheduler/SparkListenerSuite.scala    | 45 ++++++++++++++++++++
 .../org/apache/spark/examples/SparkHdfsLR.scala |  2 +-
 4 files changed, 67 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/eb5f2b64/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f775051..7630523 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -931,7 +931,6 @@ class SparkContext(config: SparkConf) extends Logging {
   /** Shut down the SparkContext. */
   def stop() {
     ui.stop()
-    eventLogger.foreach(_.stop())
     // Do this only if not stopped already - best case effort.
     // prevent NPE if stopped more than once.
     val dagSchedulerCopy = dagScheduler
@@ -940,13 +939,14 @@ class SparkContext(config: SparkConf) extends Logging {
       metadataCleaner.cancel()
       cleaner.foreach(_.stop())
       dagSchedulerCopy.stop()
-      listenerBus.stop()
       taskScheduler = null
       // TODO: Cache.stop()?
       env.stop()
       SparkEnv.set(null)
       ShuffleMapTask.clearCache()
       ResultTask.clearCache()
+      listenerBus.stop()
+      eventLogger.foreach(_.stop())
       logInfo("Successfully stopped SparkContext")
     } else {
       logInfo("SparkContext already stopped")

http://git-wip-us.apache.org/repos/asf/spark/blob/eb5f2b64/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 353a486..76f3e32 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -36,6 +36,22 @@ private[spark] class LiveListenerBus extends 
SparkListenerBus with Logging {
   private val eventQueue = new 
LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
   private var queueFullErrorMessageLogged = false
   private var started = false
+  private val listenerThread = new Thread("SparkListenerBus") {
+    setDaemon(true)
+    override def run() {
+      while (true) {
+        val event = eventQueue.take
+        if (event == SparkListenerShutdown) {
+          // Get out of the while loop and shutdown the daemon thread
+          return
+        }
+        postToAll(event)
+      }
+    }
+  }
+
+  // Exposed for testing
+  @volatile private[spark] var stopCalled = false
 
   /**
    * Start sending events to attached listeners.
@@ -48,20 +64,8 @@ private[spark] class LiveListenerBus extends 
SparkListenerBus with Logging {
     if (started) {
       throw new IllegalStateException("Listener bus already started!")
     }
+    listenerThread.start()
     started = true
-    new Thread("SparkListenerBus") {
-      setDaemon(true)
-      override def run() {
-        while (true) {
-          val event = eventQueue.take
-          if (event == SparkListenerShutdown) {
-            // Get out of the while loop and shutdown the daemon thread
-            return
-          }
-          postToAll(event)
-        }
-      }
-    }.start()
   }
 
   def post(event: SparkListenerEvent) {
@@ -93,9 +97,11 @@ private[spark] class LiveListenerBus extends 
SparkListenerBus with Logging {
   }
 
   def stop() {
+    stopCalled = true
     if (!started) {
       throw new IllegalStateException("Attempted to stop a listener bus that 
has not yet started!")
     }
     post(SparkListenerShutdown)
+    listenerThread.join()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/eb5f2b64/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 7c84377..dc704e0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.scheduler
 
+import java.util.concurrent.Semaphore
+
 import scala.collection.mutable
 
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
@@ -72,6 +74,49 @@ class SparkListenerSuite extends FunSuite with 
LocalSparkContext with ShouldMatc
     }
   }
 
+  test("bus.stop() waits for the event queue to completely drain") {
+    @volatile var drained = false
+
+    // Tells the listener to stop blocking
+    val listenerWait = new Semaphore(1)
+
+    // When stop has returned
+    val stopReturned = new Semaphore(1)
+
+    class BlockingListener extends SparkListener {
+      override def onJobEnd(jobEnd: SparkListenerJobEnd) = {
+        listenerWait.acquire()
+        drained = true
+      }
+    }
+
+    val bus = new LiveListenerBus
+    val blockingListener = new BlockingListener
+
+    bus.addListener(blockingListener)
+    bus.start()
+    bus.post(SparkListenerJobEnd(0, JobSucceeded))
+
+    // the queue should not drain immediately
+    assert(!drained)
+
+    new Thread("ListenerBusStopper") {
+      override def run() {
+        // stop() will block until notify() is called below
+        bus.stop()
+        stopReturned.release(1)
+      }
+    }.start()
+
+    while (!bus.stopCalled) {
+      Thread.sleep(10)
+    }
+
+    listenerWait.release()
+    stopReturned.acquire()
+    assert(drained)
+  }
+
   test("basic creation of StageInfo") {
     val listener = new SaveStageAndTaskInfo
     sc.addSparkListener(listener)

http://git-wip-us.apache.org/repos/asf/spark/blob/eb5f2b64/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala 
b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index e698b9b..038afbc 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -73,6 +73,6 @@ object SparkHdfsLR {
     }
 
     println("Final w: " + w)
-    System.exit(0)
+    sc.stop()
   }
 }

Reply via email to