Repository: spark
Updated Branches:
  refs/heads/master 718bbc939 -> c6ff59a23


[SPARK-18838][CORE] Add separate listener queues to LiveListenerBus.

This change modifies the live listener bus so that all listeners are
added to queues; each queue has its own thread to dispatch events,
making it possible to separate slow listeners from other more
performance-sensitive ones.

The public API has not changed - all listeners added with the existing
"addListener" method, which after this change mostly means all
user-defined listeners, end up in a default queue. Internally, there's
an API allowing listeners to be added to specific queues, and that API
is used to separate the internal Spark listeners into 3 categories:
application status listeners (e.g. UI), executor management (e.g. dynamic
allocation), and the event log.

The queueing logic, while abstracted away in a separate class, is kept
as much as possible hidden away from consumers. Aside from choosing their
queue, there's no code change needed to take advantage of queues.

Test coverage relies on existing tests; a few tests had to be tweaked
because they relied on `LiveListenerBus.postToAll` being synchronous,
and the change makes that method asynchronous. Other tests were simplified
not to use the asynchronous LiveListenerBus.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #19211 from vanzin/SPARK-18838.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6ff59a2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6ff59a2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6ff59a2

Branch: refs/heads/master
Commit: c6ff59a230758b409fa9cc548b7d283eeb7ebe5d
Parents: 718bbc9
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Wed Sep 20 13:41:29 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Sep 20 13:41:29 2017 +0800

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       |   2 +-
 .../org/apache/spark/HeartbeatReceiver.scala    |   2 +-
 .../scala/org/apache/spark/SparkContext.scala   |  13 +-
 .../spark/scheduler/AsyncEventQueue.scala       | 196 +++++++++++++
 .../spark/scheduler/LiveListenerBus.scala       | 277 ++++++++-----------
 .../scala/org/apache/spark/ui/SparkUI.scala     |  24 +-
 .../spark/ExecutorAllocationManagerSuite.scala  | 128 +++++----
 .../spark/scheduler/DAGSchedulerSuite.scala     |   2 +-
 .../scheduler/EventLoggingListenerSuite.scala   |   6 +-
 .../spark/scheduler/SparkListenerSuite.scala    |  95 ++++---
 .../spark/ui/storage/StorageTabSuite.scala      |   4 +-
 .../streaming/StreamingQueryListenerBus.scala   |   2 +-
 .../apache/spark/sql/internal/SharedState.scala |   3 +-
 .../spark/streaming/StreamingContext.scala      |   3 +-
 .../scheduler/StreamingListenerBus.scala        |   2 +-
 .../spark/streaming/StreamingContextSuite.scala |   4 +-
 16 files changed, 473 insertions(+), 290 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 7a5fb9a..119b426 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -217,7 +217,7 @@ private[spark] class ExecutorAllocationManager(
    * the scheduling task.
    */
   def start(): Unit = {
-    listenerBus.addListener(listener)
+    listenerBus.addToManagementQueue(listener)
 
     val scheduleTask = new Runnable() {
       override def run(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala 
b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 5242ab6..ff960b3 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -63,7 +63,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
     this(sc, new SystemClock)
   }
 
-  sc.addSparkListener(this)
+  sc.listenerBus.addToManagementQueue(this)
 
   override val rpcEnv: RpcEnv = sc.env.rpcEnv
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 136f0af..1821bc8 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -419,7 +419,7 @@ class SparkContext(config: SparkConf) extends Logging {
     // "_jobProgressListener" should be set up before creating SparkEnv 
because when creating
     // "SparkEnv", some messages will be posted to "listenerBus" and we should 
not miss them.
     _jobProgressListener = new JobProgressListener(_conf)
-    listenerBus.addListener(jobProgressListener)
+    listenerBus.addToStatusQueue(jobProgressListener)
 
     // Create the Spark execution environment (cache, map output tracker, etc)
     _env = createSparkEnv(_conf, isLocal, listenerBus)
@@ -442,7 +442,7 @@ class SparkContext(config: SparkConf) extends Logging {
 
     _ui =
       if (conf.getBoolean("spark.ui.enabled", true)) {
-        Some(SparkUI.createLiveUI(this, _conf, listenerBus, 
_jobProgressListener,
+        Some(SparkUI.createLiveUI(this, _conf, _jobProgressListener,
           _env.securityManager, appName, startTime = startTime))
       } else {
         // For tests, do not enable the UI
@@ -522,7 +522,7 @@ class SparkContext(config: SparkConf) extends Logging {
           new EventLoggingListener(_applicationId, _applicationAttemptId, 
_eventLogDir.get,
             _conf, _hadoopConfiguration)
         logger.start()
-        listenerBus.addListener(logger)
+        listenerBus.addToEventLogQueue(logger)
         Some(logger)
       } else {
         None
@@ -1563,7 +1563,7 @@ class SparkContext(config: SparkConf) extends Logging {
    */
   @DeveloperApi
   def addSparkListener(listener: SparkListenerInterface) {
-    listenerBus.addListener(listener)
+    listenerBus.addToSharedQueue(listener)
   }
 
   /**
@@ -1879,8 +1879,7 @@ class SparkContext(config: SparkConf) extends Logging {
    */
   def stop(): Unit = {
     if (LiveListenerBus.withinListenerThread.value) {
-      throw new SparkException(
-        s"Cannot stop SparkContext within listener thread of 
${LiveListenerBus.name}")
+      throw new SparkException(s"Cannot stop SparkContext within listener bus 
thread.")
     }
     // Use the stopping variable to ensure no contention for the stop scenario.
     // Still track the stopped variable for use elsewhere in the code.
@@ -2378,7 +2377,7 @@ class SparkContext(config: SparkConf) extends Logging {
                 " parameter from breaking Spark's ability to find a valid 
constructor.")
           }
         }
-        listenerBus.addListener(listener)
+        listenerBus.addToSharedQueue(listener)
         logInfo(s"Registered listener $className")
       }
     } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala 
b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
new file mode 100644
index 0000000..8605e1d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
+import com.codahale.metrics.{Gauge, Timer}
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
+
+/**
+ * An asynchronous queue for events. All events posted to this queue will be 
delivered to the child
+ * listeners in a separate thread.
+ *
+ * Delivery will only begin when the `start()` method is called. The `stop()` 
method should be
+ * called when no more events need to be delivered.
+ */
+private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: 
LiveListenerBusMetrics)
+  extends SparkListenerBus
+  with Logging {
+
+  import AsyncEventQueue._
+
+  // Cap the capacity of the queue so we get an explicit error (rather than an 
OOM exception) if
+  // it's perpetually being added to more quickly than it's being drained.
+  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
+    conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
+
+  // Keep the event count separately, so that waitUntilEmpty() can be 
implemented properly;
+  // this allows that method to return only when the events in the queue have 
been fully
+  // processed (instead of just dequeued).
+  private val eventCount = new AtomicLong()
+
+  /** A counter for dropped events. It will be reset every time we log it. */
+  private val droppedEventsCounter = new AtomicLong(0L)
+
+  /** When `droppedEventsCounter` was logged last time in milliseconds. */
+  @volatile private var lastReportTimestamp = 0L
+
+  private val logDroppedEvent = new AtomicBoolean(false)
+
+  private var sc: SparkContext = null
+
+  private val started = new AtomicBoolean(false)
+  private val stopped = new AtomicBoolean(false)
+
+  private val droppedEvents = 
metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents")
+  private val processingTime = 
metrics.metricRegistry.timer(s"queue.$name.listenerProcessingTime")
+
+  // Remove the queue size gauge first, in case it was created by a previous 
incarnation of
+  // this queue that was removed from the listener bus.
+  metrics.metricRegistry.remove(s"queue.$name.size")
+  metrics.metricRegistry.register(s"queue.$name.size", new Gauge[Int] {
+    override def getValue: Int = eventQueue.size()
+  })
+
+  private val dispatchThread = new Thread(s"spark-listener-group-$name") {
+    setDaemon(true)
+    override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
+      dispatch()
+    }
+  }
+
+  private def dispatch(): Unit = 
LiveListenerBus.withinListenerThread.withValue(true) {
+    try {
+      var next: SparkListenerEvent = eventQueue.take()
+      while (next != POISON_PILL) {
+        val ctx = processingTime.time()
+        try {
+          super.postToAll(next)
+        } finally {
+          ctx.stop()
+        }
+        eventCount.decrementAndGet()
+        next = eventQueue.take()
+      }
+      eventCount.decrementAndGet()
+    } catch {
+      case ie: InterruptedException =>
+        logInfo(s"Stopping listener queue $name.", ie)
+    }
+  }
+
+  override protected def getTimer(listener: SparkListenerInterface): 
Option[Timer] = {
+    
metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface]))
+  }
+
+  /**
+   * Start an asynchronous thread to dispatch events to the underlying 
listeners.
+   *
+   * @param sc Used to stop the SparkContext in case the async dispatcher 
fails.
+   */
+  private[scheduler] def start(sc: SparkContext): Unit = {
+    if (started.compareAndSet(false, true)) {
+      this.sc = sc
+      dispatchThread.start()
+    } else {
+      throw new IllegalStateException(s"$name already started!")
+    }
+  }
+
+  /**
+   * Stop the listener bus. It will wait until the queued events have been 
processed, but new
+   * events will be dropped.
+   */
+  private[scheduler] def stop(): Unit = {
+    if (!started.get()) {
+      throw new IllegalStateException(s"Attempted to stop $name that has not 
yet started!")
+    }
+    if (stopped.compareAndSet(false, true)) {
+      eventQueue.put(POISON_PILL)
+      eventCount.incrementAndGet()
+    }
+    dispatchThread.join()
+  }
+
+  def post(event: SparkListenerEvent): Unit = {
+    if (stopped.get()) {
+      return
+    }
+
+    eventCount.incrementAndGet()
+    if (eventQueue.offer(event)) {
+      return
+    }
+
+    eventCount.decrementAndGet()
+    droppedEvents.inc()
+    droppedEventsCounter.incrementAndGet()
+    if (logDroppedEvent.compareAndSet(false, true)) {
+      // Only log the following message once to avoid duplicated annoying logs.
+      logError(s"Dropping event from queue $name. " +
+        "This likely means one of the listeners is too slow and cannot keep up 
with " +
+        "the rate at which tasks are being started by the scheduler.")
+    }
+    logTrace(s"Dropping event $event")
+
+    val droppedCount = droppedEventsCounter.get
+    if (droppedCount > 0) {
+      // Don't log too frequently
+      if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
+        // There may be multiple threads trying to decrease 
droppedEventsCounter.
+        // Use "compareAndSet" to make sure only one thread can win.
+        // And if another thread is increasing droppedEventsCounter, 
"compareAndSet" will fail and
+        // then that thread will update it.
+        if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
+          val prevLastReportTimestamp = lastReportTimestamp
+          lastReportTimestamp = System.currentTimeMillis()
+          val previous = new java.util.Date(prevLastReportTimestamp)
+          logWarning(s"Dropped $droppedEvents events from $name since 
$previous.")
+        }
+      }
+    }
+  }
+
+  /**
+   * For testing only. Wait until there are no more events in the queue.
+   *
+   * @return true if the queue is empty.
+   */
+  def waitUntilEmpty(deadline: Long): Boolean = {
+    while (eventCount.get() != 0) {
+      if (System.currentTimeMillis > deadline) {
+        return false
+      }
+      Thread.sleep(10)
+    }
+    true
+  }
+
+}
+
+private object AsyncEventQueue {
+
+  val POISON_PILL = new SparkListenerEvent() { }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 7d5e980..2f93c49 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -17,20 +17,22 @@
 
 package org.apache.spark.scheduler
 
+import java.util.{List => JList}
 import java.util.concurrent._
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.reflect.ClassTag
 import scala.util.DynamicVariable
 
-import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer}
+import com.codahale.metrics.{Counter, MetricRegistry, Timer}
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.metrics.source.Source
-import org.apache.spark.util.Utils
 
 /**
  * Asynchronously passes SparkListenerEvents to registered SparkListeners.
@@ -39,20 +41,13 @@ import org.apache.spark.util.Utils
  * has started will events be actually propagated to all attached listeners. 
This listener bus
  * is stopped when `stop()` is called, and it will drop further events after 
stopping.
  */
-private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus 
{
-
-  self =>
+private[spark] class LiveListenerBus(conf: SparkConf) {
 
   import LiveListenerBus._
 
   private var sparkContext: SparkContext = _
 
-  // Cap the capacity of the event queue so we get an explicit error (rather 
than
-  // an OOM exception) if it's perpetually being added to more quickly than 
it's being drained.
-  private val eventQueue =
-    new 
LinkedBlockingQueue[SparkListenerEvent](conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
-
-  private[spark] val metrics = new LiveListenerBusMetrics(conf, eventQueue)
+  private[spark] val metrics = new LiveListenerBusMetrics(conf)
 
   // Indicate if `start()` is called
   private val started = new AtomicBoolean(false)
@@ -65,53 +60,76 @@ private[spark] class LiveListenerBus(conf: SparkConf) 
extends SparkListenerBus {
   /** When `droppedEventsCounter` was logged last time in milliseconds. */
   @volatile private var lastReportTimestamp = 0L
 
-  // Indicate if we are processing some event
-  // Guarded by `self`
-  private var processingEvent = false
-
-  private val logDroppedEvent = new AtomicBoolean(false)
-
-  // A counter that represents the number of events produced and consumed in 
the queue
-  private val eventLock = new Semaphore(0)
-
-  private val listenerThread = new Thread(name) {
-    setDaemon(true)
-    override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
-      LiveListenerBus.withinListenerThread.withValue(true) {
-        val timer = metrics.eventProcessingTime
-        while (true) {
-          eventLock.acquire()
-          self.synchronized {
-            processingEvent = true
-          }
-          try {
-            val event = eventQueue.poll
-            if (event == null) {
-              // Get out of the while loop and shutdown the daemon thread
-              if (!stopped.get) {
-                throw new IllegalStateException("Polling `null` from 
eventQueue means" +
-                  " the listener bus has been stopped. So `stopped` must be 
true")
-              }
-              return
-            }
-            val timerContext = timer.time()
-            try {
-              postToAll(event)
-            } finally {
-              timerContext.stop()
-            }
-          } finally {
-            self.synchronized {
-              processingEvent = false
-            }
-          }
+  private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
+
+  /** Add a listener to queue shared by all non-internal listeners. */
+  def addToSharedQueue(listener: SparkListenerInterface): Unit = {
+    addToQueue(listener, SHARED_QUEUE)
+  }
+
+  /** Add a listener to the executor management queue. */
+  def addToManagementQueue(listener: SparkListenerInterface): Unit = {
+    addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE)
+  }
+
+  /** Add a listener to the application status queue. */
+  def addToStatusQueue(listener: SparkListenerInterface): Unit = {
+    addToQueue(listener, APP_STATUS_QUEUE)
+  }
+
+  /** Add a listener to the event log queue. */
+  def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
+    addToQueue(listener, EVENT_LOG_QUEUE)
+  }
+
+  /**
+   * Add a listener to a specific queue, creating a new queue if needed. 
Queues are independent
+   * of each other (each one uses a separate thread for delivering events), 
allowing slower
+   * listeners to be somewhat isolated from others.
+   */
+  private def addToQueue(listener: SparkListenerInterface, queue: String): 
Unit = synchronized {
+    if (stopped.get()) {
+      throw new IllegalStateException("LiveListenerBus is stopped.")
+    }
+
+    queues.asScala.find(_.name == queue) match {
+      case Some(queue) =>
+        queue.addListener(listener)
+
+      case None =>
+        val newQueue = new AsyncEventQueue(queue, conf, metrics)
+        newQueue.addListener(listener)
+        if (started.get()) {
+          newQueue.start(sparkContext)
         }
-      }
+        queues.add(newQueue)
     }
   }
 
-  override protected def getTimer(listener: SparkListenerInterface): 
Option[Timer] = {
-    
metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface]))
+  def removeListener(listener: SparkListenerInterface): Unit = synchronized {
+    // Remove listener from all queues it was added to, and stop queues that 
have become empty.
+    queues.asScala
+      .filter { queue =>
+        queue.removeListener(listener)
+        queue.listeners.isEmpty()
+      }
+      .foreach { toRemove =>
+        if (started.get() && !stopped.get()) {
+          toRemove.stop()
+        }
+        queues.remove(toRemove)
+      }
+  }
+
+  /** Post an event to all queues. */
+  def post(event: SparkListenerEvent): Unit = {
+    if (!stopped.get()) {
+      metrics.numEventsPosted.inc()
+      val it = queues.iterator()
+      while (it.hasNext()) {
+        it.next().post(event)
+      }
+    }
   }
 
   /**
@@ -123,46 +141,14 @@ private[spark] class LiveListenerBus(conf: SparkConf) 
extends SparkListenerBus {
    *
    * @param sc Used to stop the SparkContext in case the listener thread dies.
    */
-  def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = {
-    if (started.compareAndSet(false, true)) {
-      sparkContext = sc
-      metricsSystem.registerSource(metrics)
-      listenerThread.start()
-    } else {
-      throw new IllegalStateException(s"$name already started!")
-    }
-  }
-
-  def post(event: SparkListenerEvent): Unit = {
-    if (stopped.get) {
-      // Drop further events to make `listenerThread` exit ASAP
-      logDebug(s"$name has already stopped! Dropping event $event")
-      return
-    }
-    metrics.numEventsPosted.inc()
-    val eventAdded = eventQueue.offer(event)
-    if (eventAdded) {
-      eventLock.release()
-    } else {
-      onDropEvent(event)
+  def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = 
synchronized {
+    if (!started.compareAndSet(false, true)) {
+      throw new IllegalStateException("LiveListenerBus already started.")
     }
 
-    val droppedEvents = droppedEventsCounter.get
-    if (droppedEvents > 0) {
-      // Don't log too frequently
-      if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
-        // There may be multiple threads trying to decrease 
droppedEventsCounter.
-        // Use "compareAndSet" to make sure only one thread can win.
-        // And if another thread is increasing droppedEventsCounter, 
"compareAndSet" will fail and
-        // then that thread will update it.
-        if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
-          val prevLastReportTimestamp = lastReportTimestamp
-          lastReportTimestamp = System.currentTimeMillis()
-          logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
-            new java.util.Date(prevLastReportTimestamp))
-        }
-      }
-    }
+    this.sparkContext = sc
+    queues.asScala.foreach(_.start(sc))
+    metricsSystem.registerSource(metrics)
   }
 
   /**
@@ -173,80 +159,64 @@ private[spark] class LiveListenerBus(conf: SparkConf) 
extends SparkListenerBus {
    */
   @throws(classOf[TimeoutException])
   def waitUntilEmpty(timeoutMillis: Long): Unit = {
-    val finishTime = System.currentTimeMillis + timeoutMillis
-    while (!queueIsEmpty) {
-      if (System.currentTimeMillis > finishTime) {
-        throw new TimeoutException(
-          s"The event queue is not empty after $timeoutMillis milliseconds")
+    val deadline = System.currentTimeMillis + timeoutMillis
+    queues.asScala.foreach { queue =>
+      if (!queue.waitUntilEmpty(deadline)) {
+        throw new TimeoutException(s"The event queue is not empty after 
$timeoutMillis ms.")
       }
-      /* Sleep rather than using wait/notify, because this is used only for 
testing and
-       * wait/notify add overhead in the general case. */
-      Thread.sleep(10)
     }
   }
 
   /**
-   * For testing only. Return whether the listener daemon thread is still 
alive.
-   * Exposed for testing.
-   */
-  def listenerThreadIsAlive: Boolean = listenerThread.isAlive
-
-  /**
-   * Return whether the event queue is empty.
-   *
-   * The use of synchronized here guarantees that all events that once 
belonged to this queue
-   * have already been processed by all attached listeners, if this returns 
true.
-   */
-  private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && 
!processingEvent }
-
-  /**
    * Stop the listener bus. It will wait until the queued events have been 
processed, but drop the
    * new events after stopping.
    */
   def stop(): Unit = {
     if (!started.get()) {
-      throw new IllegalStateException(s"Attempted to stop $name that has not 
yet started!")
+      throw new IllegalStateException(s"Attempted to stop bus that has not yet 
started!")
     }
-    if (stopped.compareAndSet(false, true)) {
-      // Call eventLock.release() so that listenerThread will poll `null` from 
`eventQueue` and know
-      // `stop` is called.
-      eventLock.release()
-      listenerThread.join()
-    } else {
-      // Keep quiet
+
+    if (!stopped.compareAndSet(false, true)) {
+      return
     }
-  }
 
-  /**
-   * If the event queue exceeds its capacity, the new events will be dropped. 
The subclasses will be
-   * notified with the dropped events.
-   *
-   * Note: `onDropEvent` can be called in any thread.
-   */
-  def onDropEvent(event: SparkListenerEvent): Unit = {
-    metrics.numDroppedEvents.inc()
-    droppedEventsCounter.incrementAndGet()
-    if (logDroppedEvent.compareAndSet(false, true)) {
-      // Only log the following message once to avoid duplicated annoying logs.
-      logError("Dropping SparkListenerEvent because no remaining room in event 
queue. " +
-        "This likely means one of the SparkListeners is too slow and cannot 
keep up with " +
-        "the rate at which tasks are being started by the scheduler.")
+    synchronized {
+      queues.asScala.foreach(_.stop())
+      queues.clear()
     }
-    logTrace(s"Dropping event $event")
   }
+
+  // For testing only.
+  private[spark] def findListenersByClass[T <: SparkListenerInterface : 
ClassTag](): Seq[T] = {
+    queues.asScala.flatMap { queue => queue.findListenersByClass[T]() }
+  }
+
+  // For testing only.
+  private[spark] def listeners: JList[SparkListenerInterface] = {
+    queues.asScala.flatMap(_.listeners.asScala).asJava
+  }
+
+  // For testing only.
+  private[scheduler] def activeQueues(): Set[String] = {
+    queues.asScala.map(_.name).toSet
+  }
+
 }
 
 private[spark] object LiveListenerBus {
   // Allows for Context to check whether stop() call is made within listener 
thread
   val withinListenerThread: DynamicVariable[Boolean] = new 
DynamicVariable[Boolean](false)
 
-  /** The thread name of Spark listener bus */
-  val name = "SparkListenerBus"
+  private[scheduler] val SHARED_QUEUE = "shared"
+
+  private[scheduler] val APP_STATUS_QUEUE = "appStatus"
+
+  private[scheduler] val EXECUTOR_MANAGEMENT_QUEUE = "executorManagement"
+
+  private[scheduler] val EVENT_LOG_QUEUE = "eventLog"
 }
 
-private[spark] class LiveListenerBusMetrics(
-    conf: SparkConf,
-    queue: LinkedBlockingQueue[_])
+private[spark] class LiveListenerBusMetrics(conf: SparkConf)
   extends Source with Logging {
 
   override val sourceName: String = "LiveListenerBus"
@@ -260,25 +230,6 @@ private[spark] class LiveListenerBusMetrics(
    */
   val numEventsPosted: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsPosted"))
 
-  /**
-   * The total number of events that were dropped without being delivered to 
listeners.
-   */
-  val numDroppedEvents: Counter = 
metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
-
-  /**
-   * The amount of time taken to post a single event to all listeners.
-   */
-  val eventProcessingTime: Timer = 
metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
-
-  /**
-   * The number of messages waiting in the queue.
-   */
-  val queueSize: Gauge[Int] = {
-    metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{
-      override def getValue: Int = queue.size()
-    })
-  }
-
   // Guarded by synchronization.
   private val perListenerClassTimers = mutable.Map[String, Timer]()
 
@@ -303,5 +254,5 @@ private[spark] class LiveListenerBusMetrics(
       }
     }
   }
-}
 
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala 
b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index f3fcf27..6e94073 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -162,13 +162,14 @@ private[spark] object SparkUI {
   def createLiveUI(
       sc: SparkContext,
       conf: SparkConf,
-      listenerBus: SparkListenerBus,
       jobProgressListener: JobProgressListener,
       securityManager: SecurityManager,
       appName: String,
       startTime: Long): SparkUI = {
-    create(Some(sc), conf, listenerBus, securityManager, appName,
-      jobProgressListener = Some(jobProgressListener), startTime = startTime)
+    create(Some(sc), conf,
+      sc.listenerBus.addToStatusQueue,
+      securityManager, appName, jobProgressListener = 
Some(jobProgressListener),
+      startTime = startTime)
   }
 
   def createHistoryUI(
@@ -179,8 +180,7 @@ private[spark] object SparkUI {
       basePath: String,
       lastUpdateTime: Option[Long],
       startTime: Long): SparkUI = {
-    val sparkUI = create(
-      None, conf, listenerBus, securityManager, appName, basePath,
+    val sparkUI = create(None, conf, listenerBus.addListener, securityManager, 
appName, basePath,
       lastUpdateTime = lastUpdateTime, startTime = startTime)
 
     val listenerFactories = 
ServiceLoader.load(classOf[SparkHistoryListenerFactory],
@@ -202,7 +202,7 @@ private[spark] object SparkUI {
   private def create(
       sc: Option[SparkContext],
       conf: SparkConf,
-      listenerBus: SparkListenerBus,
+      addListenerFn: SparkListenerInterface => Unit,
       securityManager: SecurityManager,
       appName: String,
       basePath: String = "",
@@ -212,7 +212,7 @@ private[spark] object SparkUI {
 
     val _jobProgressListener: JobProgressListener = 
jobProgressListener.getOrElse {
       val listener = new JobProgressListener(conf)
-      listenerBus.addListener(listener)
+      addListenerFn(listener)
       listener
     }
 
@@ -222,11 +222,11 @@ private[spark] object SparkUI {
     val storageListener = new StorageListener(storageStatusListener)
     val operationGraphListener = new RDDOperationGraphListener(conf)
 
-    listenerBus.addListener(environmentListener)
-    listenerBus.addListener(storageStatusListener)
-    listenerBus.addListener(executorsListener)
-    listenerBus.addListener(storageListener)
-    listenerBus.addListener(operationGraphListener)
+    addListenerFn(environmentListener)
+    addListenerFn(storageStatusListener)
+    addListenerFn(executorsListener)
+    addListenerFn(storageListener)
+    addListenerFn(operationGraphListener)
 
     new SparkUI(sc, conf, securityManager, environmentListener, 
storageStatusListener,
       executorsListener, _jobProgressListener, storageListener, 
operationGraphListener,

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 7da4bae..a91e09b 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -49,6 +49,11 @@ class ExecutorAllocationManagerSuite
     contexts.foreach(_.stop())
   }
 
+  private def post(bus: LiveListenerBus, event: SparkListenerEvent): Unit = {
+    bus.post(event)
+    bus.waitUntilEmpty(1000)
+  }
+
   test("verify min/max executors") {
     val conf = new SparkConf()
       .setMaster("myDummyLocalExternalClusterManager")
@@ -95,7 +100,7 @@ class ExecutorAllocationManagerSuite
   test("add executors") {
     sc = createSparkContext(1, 10, 1)
     val manager = sc.executorAllocationManager.get
-    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 
1000)))
+    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1000)))
 
     // Keep adding until the limit is reached
     assert(numExecutorsTarget(manager) === 1)
@@ -140,7 +145,7 @@ class ExecutorAllocationManagerSuite
   test("add executors capped by num pending tasks") {
     sc = createSparkContext(0, 10, 0)
     val manager = sc.executorAllocationManager.get
-    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 
5)))
+    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 5)))
 
     // Verify that we're capped at number of tasks in the stage
     assert(numExecutorsTarget(manager) === 0)
@@ -156,10 +161,10 @@ class ExecutorAllocationManagerSuite
     assert(numExecutorsToAdd(manager) === 1)
 
     // Verify that running a task doesn't affect the target
-    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 
3)))
-    sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 3)))
+    post(sc.listenerBus, SparkListenerExecutorAdded(
       0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
-    sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, 
"executor-1")))
+    post(sc.listenerBus, SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, 
"executor-1")))
     assert(numExecutorsTarget(manager) === 5)
     assert(addExecutors(manager) === 1)
     assert(numExecutorsTarget(manager) === 6)
@@ -172,9 +177,9 @@ class ExecutorAllocationManagerSuite
     assert(numExecutorsToAdd(manager) === 1)
 
     // Verify that re-running a task doesn't blow things up
-    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 
3)))
-    sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, 
"executor-1")))
-    sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, 
"executor-1")))
+    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 3)))
+    post(sc.listenerBus, SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, 
"executor-1")))
+    post(sc.listenerBus, SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, 
"executor-1")))
     assert(addExecutors(manager) === 1)
     assert(numExecutorsTarget(manager) === 9)
     assert(numExecutorsToAdd(manager) === 2)
@@ -183,7 +188,7 @@ class ExecutorAllocationManagerSuite
     assert(numExecutorsToAdd(manager) === 1)
 
     // Verify that running a task once we're at our limit doesn't blow things 
up
-    sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, 
"executor-1")))
+    post(sc.listenerBus, SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, 
"executor-1")))
     assert(addExecutors(manager) === 0)
     assert(numExecutorsTarget(manager) === 10)
   }
@@ -193,13 +198,13 @@ class ExecutorAllocationManagerSuite
     val manager = sc.executorAllocationManager.get
 
     // Verify that we're capped at number of tasks including the speculative 
ones in the stage
-    sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+    post(sc.listenerBus, SparkListenerSpeculativeTaskSubmitted(1))
     assert(numExecutorsTarget(manager) === 0)
     assert(numExecutorsToAdd(manager) === 1)
     assert(addExecutors(manager) === 1)
-    sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
-    sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
-    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 
2)))
+    post(sc.listenerBus, SparkListenerSpeculativeTaskSubmitted(1))
+    post(sc.listenerBus, SparkListenerSpeculativeTaskSubmitted(1))
+    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 2)))
     assert(numExecutorsTarget(manager) === 1)
     assert(numExecutorsToAdd(manager) === 2)
     assert(addExecutors(manager) === 2)
@@ -210,13 +215,13 @@ class ExecutorAllocationManagerSuite
     assert(numExecutorsToAdd(manager) === 1)
 
     // Verify that running a task doesn't affect the target
-    sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, 
"executor-1")))
+    post(sc.listenerBus, SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, 
"executor-1")))
     assert(numExecutorsTarget(manager) === 5)
     assert(addExecutors(manager) === 0)
     assert(numExecutorsToAdd(manager) === 1)
 
     // Verify that running a speculative task doesn't affect the target
-    sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, 
"executor-2", true)))
+    post(sc.listenerBus, SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, 
"executor-2", true)))
     assert(numExecutorsTarget(manager) === 5)
     assert(addExecutors(manager) === 0)
     assert(numExecutorsToAdd(manager) === 1)
@@ -225,7 +230,7 @@ class ExecutorAllocationManagerSuite
   test("cancel pending executors when no longer needed") {
     sc = createSparkContext(0, 10, 0)
     val manager = sc.executorAllocationManager.get
-    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 
5)))
+    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 5)))
 
     assert(numExecutorsTarget(manager) === 0)
     assert(numExecutorsToAdd(manager) === 1)
@@ -236,15 +241,15 @@ class ExecutorAllocationManagerSuite
     assert(numExecutorsTarget(manager) === 3)
 
     val task1Info = createTaskInfo(0, 0, "executor-1")
-    sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task1Info))
+    post(sc.listenerBus, SparkListenerTaskStart(2, 0, task1Info))
 
     assert(numExecutorsToAdd(manager) === 4)
     assert(addExecutors(manager) === 2)
 
     val task2Info = createTaskInfo(1, 0, "executor-1")
-    sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task2Info))
-    sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, Success, 
task1Info, null))
-    sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, Success, 
task2Info, null))
+    post(sc.listenerBus, SparkListenerTaskStart(2, 0, task2Info))
+    post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, task1Info, 
null))
+    post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, task2Info, 
null))
 
     assert(adjustRequestedExecutors(manager) === -1)
   }
@@ -352,21 +357,22 @@ class ExecutorAllocationManagerSuite
     sc = createSparkContext(5, 12, 5)
     val manager = sc.executorAllocationManager.get
 
-    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 
8)))
+    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 8)))
 
     // Remove when numExecutorsTarget is the same as the current number of 
executors
     assert(addExecutors(manager) === 1)
     assert(addExecutors(manager) === 2)
     (1 to 8).map { i => createTaskInfo(i, i, s"$i") }.foreach {
-      info => sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, info)) }
+      info => post(sc.listenerBus, SparkListenerTaskStart(0, 0, info)) }
     assert(executorIds(manager).size === 8)
     assert(numExecutorsTarget(manager) === 8)
     assert(maxNumExecutorsNeeded(manager) == 8)
     assert(!removeExecutor(manager, "1")) // won't work since 
numExecutorsTarget == numExecutors
 
     // Remove executors when numExecutorsTarget is lower than current number 
of executors
-    (1 to 3).map { i => createTaskInfo(i, i, s"$i") }.foreach {
-      info => sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, 
Success, info, null)) }
+    (1 to 3).map { i => createTaskInfo(i, i, s"$i") }.foreach { info =>
+      post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, Success, info, 
null))
+    }
     adjustRequestedExecutors(manager)
     assert(executorIds(manager).size === 8)
     assert(numExecutorsTarget(manager) === 5)
@@ -378,7 +384,7 @@ class ExecutorAllocationManagerSuite
     onExecutorRemoved(manager, "3")
 
     // numExecutorsTarget is lower than minNumExecutors
-    sc.listenerBus.postToAll(
+    post(sc.listenerBus,
       SparkListenerTaskEnd(0, 0, null, Success, createTaskInfo(4, 4, "4"), 
null))
     assert(executorIds(manager).size === 5)
     assert(numExecutorsTarget(manager) === 5)
@@ -390,7 +396,7 @@ class ExecutorAllocationManagerSuite
   test ("interleaving add and remove") {
     sc = createSparkContext(5, 12, 5)
     val manager = sc.executorAllocationManager.get
-    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 
1000)))
+    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1000)))
 
     // Add a few executors
     assert(addExecutors(manager) === 1)
@@ -569,7 +575,7 @@ class ExecutorAllocationManagerSuite
     val clock = new ManualClock(2020L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
-    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 
1000)))
+    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1000)))
 
     // Scheduler queue backlogged
     onSchedulerBacklogged(manager)
@@ -682,26 +688,26 @@ class ExecutorAllocationManagerSuite
 
     // Starting a stage should start the add timer
     val numTasks = 10
-    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 
numTasks)))
+    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 
numTasks)))
     assert(addTime(manager) !== NOT_SET)
 
     // Starting a subset of the tasks should not cancel the add timer
     val taskInfos = (0 to numTasks - 1).map { i => createTaskInfo(i, i, 
"executor-1") }
-    taskInfos.tail.foreach { info => 
sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, info)) }
+    taskInfos.tail.foreach { info => post(sc.listenerBus, 
SparkListenerTaskStart(0, 0, info)) }
     assert(addTime(manager) !== NOT_SET)
 
     // Starting all remaining tasks should cancel the add timer
-    sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, taskInfos.head))
+    post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfos.head))
     assert(addTime(manager) === NOT_SET)
 
     // Start two different stages
     // The add timer should be canceled only if all tasks in both stages start 
running
-    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 
numTasks)))
-    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 
numTasks)))
+    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 
numTasks)))
+    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 
numTasks)))
     assert(addTime(manager) !== NOT_SET)
-    taskInfos.foreach { info => 
sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, info)) }
+    taskInfos.foreach { info => post(sc.listenerBus, SparkListenerTaskStart(1, 
0, info)) }
     assert(addTime(manager) !== NOT_SET)
-    taskInfos.foreach { info => 
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, info)) }
+    taskInfos.foreach { info => post(sc.listenerBus, SparkListenerTaskStart(2, 
0, info)) }
     assert(addTime(manager) === NOT_SET)
   }
 
@@ -715,22 +721,22 @@ class ExecutorAllocationManagerSuite
     assert(removeTimes(manager).size === 5)
 
     // Starting a task cancel the remove timer for that executor
-    sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, 
"executor-1")))
-    sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, 
"executor-1")))
-    sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, 
"executor-2")))
+    post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, 
"executor-1")))
+    post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, 
"executor-1")))
+    post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, 
"executor-2")))
     assert(removeTimes(manager).size === 3)
     assert(!removeTimes(manager).contains("executor-1"))
     assert(!removeTimes(manager).contains("executor-2"))
 
     // Finishing all tasks running on an executor should start the remove 
timer for that executor
-    sc.listenerBus.postToAll(SparkListenerTaskEnd(
+    post(sc.listenerBus, SparkListenerTaskEnd(
       0, 0, "task-type", Success, createTaskInfo(0, 0, "executor-1"), new 
TaskMetrics))
-    sc.listenerBus.postToAll(SparkListenerTaskEnd(
+    post(sc.listenerBus, SparkListenerTaskEnd(
       0, 0, "task-type", Success, createTaskInfo(2, 2, "executor-2"), new 
TaskMetrics))
     assert(removeTimes(manager).size === 4)
     assert(!removeTimes(manager).contains("executor-1")) // executor-1 has not 
finished yet
     assert(removeTimes(manager).contains("executor-2"))
-    sc.listenerBus.postToAll(SparkListenerTaskEnd(
+    post(sc.listenerBus, SparkListenerTaskEnd(
       0, 0, "task-type", Success, createTaskInfo(1, 1, "executor-1"), new 
TaskMetrics))
     assert(removeTimes(manager).size === 5)
     assert(removeTimes(manager).contains("executor-1")) // executor-1 has now 
finished
@@ -743,13 +749,13 @@ class ExecutorAllocationManagerSuite
     assert(removeTimes(manager).isEmpty)
 
     // New executors have registered
-    sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+    post(sc.listenerBus, SparkListenerExecutorAdded(
       0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
     assert(executorIds(manager).size === 1)
     assert(executorIds(manager).contains("executor-1"))
     assert(removeTimes(manager).size === 1)
     assert(removeTimes(manager).contains("executor-1"))
-    sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+    post(sc.listenerBus, SparkListenerExecutorAdded(
       0L, "executor-2", new ExecutorInfo("host2", 1, Map.empty)))
     assert(executorIds(manager).size === 2)
     assert(executorIds(manager).contains("executor-2"))
@@ -757,14 +763,14 @@ class ExecutorAllocationManagerSuite
     assert(removeTimes(manager).contains("executor-2"))
 
     // Existing executors have disconnected
-    sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-1", 
""))
+    post(sc.listenerBus, SparkListenerExecutorRemoved(0L, "executor-1", ""))
     assert(executorIds(manager).size === 1)
     assert(!executorIds(manager).contains("executor-1"))
     assert(removeTimes(manager).size === 1)
     assert(!removeTimes(manager).contains("executor-1"))
 
     // Unknown executor has disconnected
-    sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-3", 
""))
+    post(sc.listenerBus, SparkListenerExecutorRemoved(0L, "executor-3", ""))
     assert(executorIds(manager).size === 1)
     assert(removeTimes(manager).size === 1)
   }
@@ -775,8 +781,8 @@ class ExecutorAllocationManagerSuite
     assert(executorIds(manager).isEmpty)
     assert(removeTimes(manager).isEmpty)
 
-    sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, 
"executor-1")))
-    sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+    post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, 
"executor-1")))
+    post(sc.listenerBus, SparkListenerExecutorAdded(
       0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
     assert(executorIds(manager).size === 1)
     assert(executorIds(manager).contains("executor-1"))
@@ -788,15 +794,15 @@ class ExecutorAllocationManagerSuite
     val manager = sc.executorAllocationManager.get
     assert(executorIds(manager).isEmpty)
     assert(removeTimes(manager).isEmpty)
-    sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+    post(sc.listenerBus, SparkListenerExecutorAdded(
       0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
-    sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, 
"executor-1")))
+    post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, 
"executor-1")))
 
     assert(executorIds(manager).size === 1)
     assert(executorIds(manager).contains("executor-1"))
     assert(removeTimes(manager).size === 0)
 
-    sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+    post(sc.listenerBus, SparkListenerExecutorAdded(
       0L, "executor-2", new ExecutorInfo("host1", 1, Map.empty)))
     assert(executorIds(manager).size === 2)
     assert(executorIds(manager).contains("executor-2"))
@@ -809,7 +815,7 @@ class ExecutorAllocationManagerSuite
     sc = createSparkContext(0, 100000, 0)
     val manager = sc.executorAllocationManager.get
     val stage1 = createStageInfo(0, 1000)
-    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1))
+    post(sc.listenerBus, SparkListenerStageSubmitted(stage1))
 
     assert(addExecutors(manager) === 1)
     assert(addExecutors(manager) === 2)
@@ -820,12 +826,12 @@ class ExecutorAllocationManagerSuite
       onExecutorAdded(manager, s"executor-$i")
     }
     assert(executorIds(manager).size === 15)
-    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1))
+    post(sc.listenerBus, SparkListenerStageCompleted(stage1))
 
     adjustRequestedExecutors(manager)
     assert(numExecutorsTarget(manager) === 0)
 
-    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 
1000)))
+    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 1000)))
     addExecutors(manager)
     assert(numExecutorsTarget(manager) === 16)
   }
@@ -842,7 +848,7 @@ class ExecutorAllocationManagerSuite
     // Verify whether the initial number of executors is kept with no pending 
tasks
     assert(numExecutorsTarget(manager) === 3)
 
-    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 
2)))
+    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 2)))
     clock.advance(100L)
 
     assert(maxNumExecutorsNeeded(manager) === 2)
@@ -892,7 +898,7 @@ class ExecutorAllocationManagerSuite
       Seq.empty
     )
     val stageInfo1 = createStageInfo(1, 5, localityPreferences1)
-    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo1))
+    post(sc.listenerBus, SparkListenerStageSubmitted(stageInfo1))
 
     assert(localityAwareTasks(manager) === 3)
     assert(hostToLocalTaskCount(manager) ===
@@ -904,13 +910,13 @@ class ExecutorAllocationManagerSuite
       Seq.empty
     )
     val stageInfo2 = createStageInfo(2, 3, localityPreferences2)
-    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo2))
+    post(sc.listenerBus, SparkListenerStageSubmitted(stageInfo2))
 
     assert(localityAwareTasks(manager) === 5)
     assert(hostToLocalTaskCount(manager) ===
       Map("host1" -> 2, "host2" -> 4, "host3" -> 4, "host4" -> 3, "host5" -> 
2))
 
-    sc.listenerBus.postToAll(SparkListenerStageCompleted(stageInfo1))
+    post(sc.listenerBus, SparkListenerStageCompleted(stageInfo1))
     assert(localityAwareTasks(manager) === 2)
     assert(hostToLocalTaskCount(manager) ===
       Map("host2" -> 1, "host3" -> 2, "host4" -> 1, "host5" -> 2))
@@ -921,16 +927,16 @@ class ExecutorAllocationManagerSuite
     val manager = sc.executorAllocationManager.get
     assert(maxNumExecutorsNeeded(manager) === 0)
 
-    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 
1)))
+    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1)))
     assert(maxNumExecutorsNeeded(manager) === 1)
 
     val taskInfo = createTaskInfo(1, 1, "executor-1")
-    sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, taskInfo))
+    post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo))
     assert(maxNumExecutorsNeeded(manager) === 1)
 
     // If the task is failed, we expect it to be resubmitted later.
     val taskEndReason = ExceptionFailure(null, null, null, null, None)
-    sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, taskEndReason, 
taskInfo, null))
+    post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, taskEndReason, 
taskInfo, null))
     assert(maxNumExecutorsNeeded(manager) === 1)
   }
 
@@ -942,7 +948,7 @@ class ExecutorAllocationManagerSuite
 
     // Allocation manager is reset when adding executor requests are sent 
without reporting back
     // executor added.
-    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 
10)))
+    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 10)))
 
     assert(addExecutors(manager) === 1)
     assert(numExecutorsTarget(manager) === 2)
@@ -957,7 +963,7 @@ class ExecutorAllocationManagerSuite
     assert(executorIds(manager) === Set.empty)
 
     // Allocation manager is reset when executors are added.
-    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 
10)))
+    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 10)))
 
     addExecutors(manager)
     addExecutors(manager)

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 703fc1b..6222e57 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -751,7 +751,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 
   // Helper functions to extract commonly used code in Fetch Failure test cases
   private def setupStageAbortTest(sc: SparkContext) {
-    sc.listenerBus.addListener(new EndListener())
+    sc.listenerBus.addToSharedQueue(new EndListener())
     ended = false
     jobResult = null
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 0afd07b..6b42775 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -164,9 +164,9 @@ class EventLoggingListenerSuite extends SparkFunSuite with 
LocalSparkContext wit
     // A comprehensive test on JSON de/serialization of all events is in 
JsonProtocolSuite
     eventLogger.start()
     listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
-    listenerBus.addListener(eventLogger)
-    listenerBus.postToAll(applicationStart)
-    listenerBus.postToAll(applicationEnd)
+    listenerBus.addToEventLogQueue(eventLogger)
+    listenerBus.post(applicationStart)
+    listenerBus.post(applicationEnd)
     listenerBus.stop()
     eventLogger.stop()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 995df1d..d061c78 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -34,6 +34,8 @@ import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
 class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with 
Matchers
   with ResetSystemProperties {
 
+  import LiveListenerBus._
+
   /** Length of time to wait while draining listener events. */
   val WAIT_TIMEOUT_MILLIS = 10000
 
@@ -42,18 +44,28 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
   private val mockSparkContext: SparkContext = 
Mockito.mock(classOf[SparkContext])
   private val mockMetricsSystem: MetricsSystem = 
Mockito.mock(classOf[MetricsSystem])
 
+  private def numDroppedEvents(bus: LiveListenerBus): Long = {
+    
bus.metrics.metricRegistry.counter(s"queue.$SHARED_QUEUE.numDroppedEvents").getCount
+  }
+
+  private def queueSize(bus: LiveListenerBus): Int = {
+    
bus.metrics.metricRegistry.getGauges().get(s"queue.$SHARED_QUEUE.size").getValue()
+      .asInstanceOf[Int]
+  }
+
+  private def eventProcessingTimeCount(bus: LiveListenerBus): Long = {
+    
bus.metrics.metricRegistry.timer(s"queue.$SHARED_QUEUE.listenerProcessingTime").getCount()
+  }
+
   test("don't call sc.stop in listener") {
     sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
     val listener = new SparkContextStoppingListener(sc)
-    val bus = new LiveListenerBus(sc.conf)
-    bus.addListener(listener)
 
-    // Starting listener bus should flush all buffered events
-    bus.start(sc, sc.env.metricsSystem)
-    bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
-    bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+    sc.listenerBus.addToSharedQueue(listener)
+    sc.listenerBus.post(SparkListenerJobEnd(0, jobCompletionTime, 
JobSucceeded))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+    sc.stop()
 
-    bus.stop()
     assert(listener.sparkExSeen)
   }
 
@@ -61,13 +73,13 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
     val conf = new SparkConf()
     val counter = new BasicJobCounter
     val bus = new LiveListenerBus(conf)
-    bus.addListener(counter)
+    bus.addToSharedQueue(counter)
 
     // Metrics are initially empty.
     assert(bus.metrics.numEventsPosted.getCount === 0)
-    assert(bus.metrics.numDroppedEvents.getCount === 0)
-    assert(bus.metrics.queueSize.getValue === 0)
-    assert(bus.metrics.eventProcessingTime.getCount === 0)
+    assert(numDroppedEvents(bus) === 0)
+    assert(queueSize(bus) === 0)
+    assert(eventProcessingTimeCount(bus) === 0)
 
     // Post five events:
     (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, 
JobSucceeded)) }
@@ -75,7 +87,7 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
     // Five messages should be marked as received and queued, but no messages 
should be posted to
     // listeners yet because the the listener bus hasn't been started.
     assert(bus.metrics.numEventsPosted.getCount === 5)
-    assert(bus.metrics.queueSize.getValue === 5)
+    assert(queueSize(bus) === 5)
     assert(counter.count === 0)
 
     // Starting listener bus should flush all buffered events
@@ -83,18 +95,14 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
     Mockito.verify(mockMetricsSystem).registerSource(bus.metrics)
     bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     assert(counter.count === 5)
-    assert(bus.metrics.queueSize.getValue === 0)
-    assert(bus.metrics.eventProcessingTime.getCount === 5)
+    assert(queueSize(bus) === 0)
+    assert(eventProcessingTimeCount(bus) === 5)
 
     // After listener bus has stopped, posting events should not increment 
counter
     bus.stop()
     (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, 
JobSucceeded)) }
     assert(counter.count === 5)
-    assert(bus.metrics.numEventsPosted.getCount === 5)
-
-    // Make sure per-listener-class timers were created:
-    assert(bus.metrics.getTimerForListenerClass(
-      
classOf[BasicJobCounter].asSubclass(classOf[SparkListenerInterface])).get.getCount
 == 5)
+    assert(eventProcessingTimeCount(bus) === 5)
 
     // Listener bus must not be started twice
     intercept[IllegalStateException] {
@@ -135,7 +143,7 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
     val bus = new LiveListenerBus(new SparkConf())
     val blockingListener = new BlockingListener
 
-    bus.addListener(blockingListener)
+    bus.addToSharedQueue(blockingListener)
     bus.start(mockSparkContext, mockMetricsSystem)
     bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
 
@@ -168,7 +176,7 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
     val listenerStarted = new Semaphore(0)
     val listenerWait = new Semaphore(0)
 
-    bus.addListener(new SparkListener {
+    bus.addToSharedQueue(new SparkListener {
       override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
         listenerStarted.release()
         listenerWait.acquire()
@@ -180,20 +188,19 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
     // Post a message to the listener bus and wait for processing to begin:
     bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
     listenerStarted.acquire()
-    assert(bus.metrics.queueSize.getValue === 0)
-    assert(bus.metrics.numDroppedEvents.getCount === 0)
+    assert(queueSize(bus) === 0)
+    assert(numDroppedEvents(bus) === 0)
 
     // If we post an additional message then it should remain in the queue 
because the listener is
     // busy processing the first event:
     bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
-    assert(bus.metrics.queueSize.getValue === 1)
-    assert(bus.metrics.numDroppedEvents.getCount === 0)
+    assert(queueSize(bus) === 1)
+    assert(numDroppedEvents(bus) === 0)
 
     // The queue is now full, so any additional events posted to the listener 
will be dropped:
     bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
-    assert(bus.metrics.queueSize.getValue === 1)
-    assert(bus.metrics.numDroppedEvents.getCount === 1)
-
+    assert(queueSize(bus) === 1)
+    assert(numDroppedEvents(bus) === 1)
 
     // Allow the the remaining events to be processed so we can stop the 
listener bus:
     listenerWait.release(2)
@@ -419,9 +426,9 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
     val bus = new LiveListenerBus(new SparkConf())
 
     // Propagate events to bad listener first
-    bus.addListener(badListener)
-    bus.addListener(jobCounter1)
-    bus.addListener(jobCounter2)
+    bus.addToSharedQueue(badListener)
+    bus.addToSharedQueue(jobCounter1)
+    bus.addToSharedQueue(jobCounter2)
     bus.start(mockSparkContext, mockMetricsSystem)
 
     // Post events to all listeners, and wait until the queue is drained
@@ -429,7 +436,6 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
     bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
 
     // The exception should be caught, and the event should be propagated to 
other listeners
-    assert(bus.listenerThreadIsAlive)
     assert(jobCounter1.count === 5)
     assert(jobCounter2.count === 5)
   }
@@ -449,6 +455,31 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
         .count(_.isInstanceOf[FirehoseListenerThatAcceptsSparkConf]) should be 
(1)
   }
 
+  test("add and remove listeners to/from LiveListenerBus queues") {
+    val bus = new LiveListenerBus(new SparkConf(false))
+    val counter1 = new BasicJobCounter()
+    val counter2 = new BasicJobCounter()
+    val counter3 = new BasicJobCounter()
+
+    bus.addToSharedQueue(counter1)
+    bus.addToStatusQueue(counter2)
+    bus.addToStatusQueue(counter3)
+    assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE))
+    assert(bus.findListenersByClass[BasicJobCounter]().size === 3)
+
+    bus.removeListener(counter1)
+    assert(bus.activeQueues() === Set(APP_STATUS_QUEUE))
+    assert(bus.findListenersByClass[BasicJobCounter]().size === 2)
+
+    bus.removeListener(counter2)
+    assert(bus.activeQueues() === Set(APP_STATUS_QUEUE))
+    assert(bus.findListenersByClass[BasicJobCounter]().size === 1)
+
+    bus.removeListener(counter3)
+    assert(bus.activeQueues().isEmpty)
+    assert(bus.findListenersByClass[BasicJobCounter]().isEmpty)
+  }
+
   /**
    * Assert that the given list of numbers has an average that is greater than 
zero.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index 1cb5259..79f02f2 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.storage._
  * Test various functionality in the StorageListener that supports the 
StorageTab.
  */
 class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
-  private var bus: LiveListenerBus = _
+  private var bus: SparkListenerBus = _
   private var storageStatusListener: StorageStatusListener = _
   private var storageListener: StorageListener = _
   private val memAndDisk = StorageLevel.MEMORY_AND_DISK
@@ -43,7 +43,7 @@ class StorageTabSuite extends SparkFunSuite with 
BeforeAndAfter {
 
   before {
     val conf = new SparkConf()
-    bus = new LiveListenerBus(conf)
+    bus = new ReplayListenerBus()
     storageStatusListener = new StorageStatusListener(conf)
     storageListener = new StorageListener(storageStatusListener)
     bus.addListener(storageStatusListener)

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
index 4207013..07e3902 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
 
   import StreamingQueryListener._
 
-  sparkListenerBus.addListener(this)
+  sparkListenerBus.addToSharedQueue(this)
 
   /**
    * RunIds of active queries whose events are supposed to be forwarded by 
this ListenerBus

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 7202f12..ad9db30 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FsUrlStreamHandlerFactory
 
 import org.apache.spark.{SparkConf, SparkContext, SparkException}
 import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.sql.{SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.execution.CacheManager
@@ -148,7 +149,7 @@ private[sql] class SharedState(val sparkContext: 
SparkContext) extends Logging {
     if (SparkSession.sqlListener.get() == null) {
       val listener = new SQLListener(sc.conf)
       if (SparkSession.sqlListener.compareAndSet(null, listener)) {
-        sc.addSparkListener(listener)
+        sc.listenerBus.addToStatusQueue(listener)
         sc.ui.foreach(new SQLTab(listener, _))
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index f3b4ff2..8c7418e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -659,8 +659,7 @@ class StreamingContext private[streaming] (
   def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
     var shutdownHookRefToRemove: AnyRef = null
     if (LiveListenerBus.withinListenerThread.value) {
-      throw new SparkException(
-        s"Cannot stop StreamingContext within listener thread of 
${LiveListenerBus.name}")
+      throw new SparkException(s"Cannot stop StreamingContext within listener 
bus thread.")
     }
     synchronized {
       // The state should always be Stopped after calling `stop()`, even if we 
haven't started yet

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 5fb0bd0..6a70bf7 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -76,7 +76,7 @@ private[streaming] class 
StreamingListenerBus(sparkListenerBus: LiveListenerBus)
    * forward them to StreamingListeners.
    */
   def start(): Unit = {
-    sparkListenerBus.addListener(this) // for getting callbacks on spark events
+    sparkListenerBus.addToStatusQueue(this)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 96ab5a2..5810e73 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -575,8 +575,6 @@ class StreamingContextSuite extends SparkFunSuite with 
BeforeAndAfter with TimeL
 
   test("getActive and getActiveOrCreate") {
     require(StreamingContext.getActive().isEmpty, "context exists from before")
-    sc = new SparkContext(conf)
-
     var newContextCreated = false
 
     def creatingFunc(): StreamingContext = {
@@ -603,6 +601,7 @@ class StreamingContextSuite extends SparkFunSuite with 
BeforeAndAfter with TimeL
     // getActiveOrCreate should create new context and getActive should return 
it only
     // after starting the context
     testGetActiveOrCreate {
+      sc = new SparkContext(conf)
       ssc = StreamingContext.getActiveOrCreate(creatingFunc _)
       assert(ssc != null, "no context created")
       assert(newContextCreated === true, "new context not created")
@@ -622,6 +621,7 @@ class StreamingContextSuite extends SparkFunSuite with 
BeforeAndAfter with TimeL
 
     // getActiveOrCreate and getActive should return independently created 
context after activating
     testGetActiveOrCreate {
+      sc = new SparkContext(conf)
       ssc = creatingFunc()  // Create
       assert(StreamingContext.getActive().isEmpty,
         "new initialized context returned before starting")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to