Repository: spark Updated Branches: refs/heads/branch-1.0 bde9cc11f -> 8ca3b2bc9
SPARK-1407 drain event queue before stopping event logger Author: Kan Zhang <kzh...@apache.org> Closes #366 from kanzhang/SPARK-1407 and squashes the following commits: cd0629f [Kan Zhang] code refactoring and adding test b073ee6 [Kan Zhang] SPARK-1407 drain event queue before stopping event logger Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb5f2b64 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb5f2b64 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb5f2b64 Branch: refs/heads/branch-1.0 Commit: eb5f2b64230faa69a53815cb61bcc87aeb233d20 Parents: bde9cc1 Author: Kan Zhang <kzh...@apache.org> Authored: Wed Apr 9 15:24:33 2014 -0700 Committer: Patrick Wendell <pwend...@gmail.com> Committed: Wed Apr 9 15:25:29 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 4 +- .../spark/scheduler/LiveListenerBus.scala | 32 ++++++++------ .../spark/scheduler/SparkListenerSuite.scala | 45 ++++++++++++++++++++ .../org/apache/spark/examples/SparkHdfsLR.scala | 2 +- 4 files changed, 67 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/eb5f2b64/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f775051..7630523 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -931,7 +931,6 @@ class SparkContext(config: SparkConf) extends Logging { /** Shut down the SparkContext. */ def stop() { ui.stop() - eventLogger.foreach(_.stop()) // Do this only if not stopped already - best case effort. // prevent NPE if stopped more than once. val dagSchedulerCopy = dagScheduler @@ -940,13 +939,14 @@ class SparkContext(config: SparkConf) extends Logging { metadataCleaner.cancel() cleaner.foreach(_.stop()) dagSchedulerCopy.stop() - listenerBus.stop() taskScheduler = null // TODO: Cache.stop()? env.stop() SparkEnv.set(null) ShuffleMapTask.clearCache() ResultTask.clearCache() + listenerBus.stop() + eventLogger.foreach(_.stop()) logInfo("Successfully stopped SparkContext") } else { logInfo("SparkContext already stopped") http://git-wip-us.apache.org/repos/asf/spark/blob/eb5f2b64/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 353a486..76f3e32 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -36,6 +36,22 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) private var queueFullErrorMessageLogged = false private var started = false + private val listenerThread = new Thread("SparkListenerBus") { + setDaemon(true) + override def run() { + while (true) { + val event = eventQueue.take + if (event == SparkListenerShutdown) { + // Get out of the while loop and shutdown the daemon thread + return + } + postToAll(event) + } + } + } + + // Exposed for testing + @volatile private[spark] var stopCalled = false /** * Start sending events to attached listeners. @@ -48,20 +64,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { if (started) { throw new IllegalStateException("Listener bus already started!") } + listenerThread.start() started = true - new Thread("SparkListenerBus") { - setDaemon(true) - override def run() { - while (true) { - val event = eventQueue.take - if (event == SparkListenerShutdown) { - // Get out of the while loop and shutdown the daemon thread - return - } - postToAll(event) - } - } - }.start() } def post(event: SparkListenerEvent) { @@ -93,9 +97,11 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { } def stop() { + stopCalled = true if (!started) { throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!") } post(SparkListenerShutdown) + listenerThread.join() } } http://git-wip-us.apache.org/repos/asf/spark/blob/eb5f2b64/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 7c84377..dc704e0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import java.util.concurrent.Semaphore + import scala.collection.mutable import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} @@ -72,6 +74,49 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc } } + test("bus.stop() waits for the event queue to completely drain") { + @volatile var drained = false + + // Tells the listener to stop blocking + val listenerWait = new Semaphore(1) + + // When stop has returned + val stopReturned = new Semaphore(1) + + class BlockingListener extends SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd) = { + listenerWait.acquire() + drained = true + } + } + + val bus = new LiveListenerBus + val blockingListener = new BlockingListener + + bus.addListener(blockingListener) + bus.start() + bus.post(SparkListenerJobEnd(0, JobSucceeded)) + + // the queue should not drain immediately + assert(!drained) + + new Thread("ListenerBusStopper") { + override def run() { + // stop() will block until notify() is called below + bus.stop() + stopReturned.release(1) + } + }.start() + + while (!bus.stopCalled) { + Thread.sleep(10) + } + + listenerWait.release() + stopReturned.acquire() + assert(drained) + } + test("basic creation of StageInfo") { val listener = new SaveStageAndTaskInfo sc.addSparkListener(listener) http://git-wip-us.apache.org/repos/asf/spark/blob/eb5f2b64/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index e698b9b..038afbc 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -73,6 +73,6 @@ object SparkHdfsLR { } println("Final w: " + w) - System.exit(0) + sc.stop() } }