This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b53c6f9d1225 [SPARK-47253][CORE] Allow LiveEventBus to stop without 
the completely draining of event queue
b53c6f9d1225 is described below

commit b53c6f9d122562197321eb7f792ed168865132f7
Author: tatian <tat...@ebay.com>
AuthorDate: Sat Apr 13 00:05:09 2024 -0500

    [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely 
draining of event queue
    
    ### What changes were proposed in this pull request?
    Add config spark.scheduler.listenerbus.exitTimeout(default 0, wait until 
dispatch thread exist).
    Before this PR: The event queue will wait for the event to drain completely 
on stop.
    After this PR: Allow user to control this behavior(wait for completely 
drain or not) by spark config.
    
    ### Why are the changes needed?
    ####Problem statement:
    The SparkContext.stop() hung a long time on LiveEventBus.stop() when 
listeners slow
    
    ####User scenarios:
    We have a centralized service with multiple instances to regularly execute 
user's scheduled tasks.
    For each user task within one service instance, the process is as follows:
    
    1.Create a Spark session directly within the service process with an 
account defined in the task.
    2.Instantiate listeners by class names and register them with the 
SparkContext. The JARs containing the listener classes are uploaded to the 
service by the user.
    3.Prepare resources.
    4.Run user logic (Spark SQL).
    5.Stop the Spark session by invoking SparkSession.stop().
    
    In step 5, it will wait for the LiveEventBus to stop, which requires the 
remaining events to be completely drained by each listener.
    
    Since the listener is implemented by users and we cannot prevent some heavy 
stuffs within the listener on each event, there are cases where a single heavy 
job has over 30,000 tasks,
    and it could take 30 minutes for the listener to process all the remaining 
events, because within the listener, it requires a coarse-grained global lock 
and update the internal status to the remote database.
    
    This kind of delay affects other user tasks in the queue. Therefore, from 
the server side perspective, we need the guarantee that the stop operation 
finishes quickly.
    
    ### Does this PR introduce _any_ user-facing change?
    Add cofig spark.scheduler.listenerbus.exitTimeout.
    Default is `0`, it will wait for the event to drain completely. If set to a 
non negative integer, the LivenEventBus will wait for atleast that duration (in 
ms) before it stops irrespective of whether the events are drained or not.
    
    ### How was this patch tested?
    By UT and verified the feature in out production environment
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #45367 from TakawaAkirayo/SPARK-47253.
    
    Lead-authored-by: tatian <tat...@ebay.com>
    Co-authored-by: TakawaAkirayo 
<153728772+takawaakir...@users.noreply.github.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../org/apache/spark/internal/config/package.scala | 11 ++++++
 .../apache/spark/scheduler/AsyncEventQueue.scala   |  9 ++++-
 .../spark/scheduler/SparkListenerSuite.scala       | 46 ++++++++++++++++++++++
 3 files changed, 64 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index f086038d0f6c..b2cbb6f6deb6 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1041,6 +1041,17 @@ package object config {
       .timeConf(TimeUnit.NANOSECONDS)
       .createWithDefaultString("1s")
 
+  private[spark] val LISTENER_BUS_EXIT_TIMEOUT =
+    ConfigBuilder("spark.scheduler.listenerbus.exitTimeout")
+      .doc("The time that event queue waits until the dispatch thread exits " +
+        "when stop is invoked. " +
+        "This is set to 0 by default for graceful shutdown of the event queue, 
" +
+        "but allow the user to configure the waiting time.")
+      .version("4.0.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .checkValue(_ >= 0, "Listener bus exit timeout must be non-negative 
duration")
+      .createWithDefault(0)
+
   // This property sets the root namespace for metrics reporting
   private[spark] val METRICS_NAMESPACE = 
ConfigBuilder("spark.metrics.namespace")
     .version("2.1.0")
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala 
b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
index 2709fe27f406..271fc9ac92ba 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
@@ -143,10 +143,15 @@ private class AsyncEventQueue(
       eventCount.incrementAndGet()
       eventQueue.put(POISON_PILL)
     }
-    // this thread might be trying to stop itself as part of error handling -- 
we can't join
+    // This thread might be trying to stop itself as part of error handling -- 
we can't join
     // in that case.
     if (Thread.currentThread() != dispatchThread) {
-      dispatchThread.join()
+      // If users don't want to wait for the dispatch to end until all events 
are drained,
+      // they can control the waiting time by themselves.
+      // By default, the `waitingTimeMs` is set to 0,
+      // which means it will wait until all events are drained.
+      val exitTimeoutMs = conf.get(LISTENER_BUS_EXIT_TIMEOUT)
+      dispatchThread.join(exitTimeoutMs)
     }
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 34b2a40d1e3b..6dc4d4da7bfc 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -176,6 +176,52 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
     assert(drained)
   }
 
+  test("allow bus.stop() to not wait for the event queue to completely drain") 
{
+    @volatile var drained = false
+
+    // When Listener has started
+    val listenerStarted = new Semaphore(0)
+
+    // Tells the listener to stop blocking
+    val listenerWait = new Semaphore(0)
+
+    // Make sure the event drained
+    val drainWait = new Semaphore(0)
+
+    class BlockingListener extends SparkListener {
+      override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+        listenerStarted.release()
+        listenerWait.acquire()
+        drained = true
+        drainWait.release()
+      }
+    }
+
+    val sparkConf = new SparkConf().set(LISTENER_BUS_EXIT_TIMEOUT, 100L)
+    val bus = new LiveListenerBus(sparkConf)
+    val blockingListener = new BlockingListener
+
+    bus.addToSharedQueue(blockingListener)
+    bus.start(mockSparkContext, mockMetricsSystem)
+    bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
+
+    listenerStarted.acquire()
+    // if reach here, the dispatch thread should be blocked at onJobEnd
+
+    // stop the bus now, the queue will waiting for event drain with specified 
timeout
+    bus.stop()
+    // if reach here, the bus has exited without draining completely,
+    // otherwise it will hung here forever.
+
+    // the event dispatch thread should remain blocked after the bus has 
stopped.
+    // which means the bus exited upon reaching the timeout
+    // without all the events being completely drained
+    assert(!drained)
+
+    // unblock the dispatch thread
+    listenerWait.release()
+  }
+
   test("metrics for dropped listener events") {
     val bus = new LiveListenerBus(new 
SparkConf().set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 1))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to