This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b53c6f9d1225 [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue b53c6f9d1225 is described below commit b53c6f9d122562197321eb7f792ed168865132f7 Author: tatian <tat...@ebay.com> AuthorDate: Sat Apr 13 00:05:09 2024 -0500 [SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue ### What changes were proposed in this pull request? Add config spark.scheduler.listenerbus.exitTimeout(default 0, wait until dispatch thread exist). Before this PR: The event queue will wait for the event to drain completely on stop. After this PR: Allow user to control this behavior(wait for completely drain or not) by spark config. ### Why are the changes needed? ####Problem statement: The SparkContext.stop() hung a long time on LiveEventBus.stop() when listeners slow ####User scenarios: We have a centralized service with multiple instances to regularly execute user's scheduled tasks. For each user task within one service instance, the process is as follows: 1.Create a Spark session directly within the service process with an account defined in the task. 2.Instantiate listeners by class names and register them with the SparkContext. The JARs containing the listener classes are uploaded to the service by the user. 3.Prepare resources. 4.Run user logic (Spark SQL). 5.Stop the Spark session by invoking SparkSession.stop(). In step 5, it will wait for the LiveEventBus to stop, which requires the remaining events to be completely drained by each listener. Since the listener is implemented by users and we cannot prevent some heavy stuffs within the listener on each event, there are cases where a single heavy job has over 30,000 tasks, and it could take 30 minutes for the listener to process all the remaining events, because within the listener, it requires a coarse-grained global lock and update the internal status to the remote database. This kind of delay affects other user tasks in the queue. Therefore, from the server side perspective, we need the guarantee that the stop operation finishes quickly. ### Does this PR introduce _any_ user-facing change? Add cofig spark.scheduler.listenerbus.exitTimeout. Default is `0`, it will wait for the event to drain completely. If set to a non negative integer, the LivenEventBus will wait for atleast that duration (in ms) before it stops irrespective of whether the events are drained or not. ### How was this patch tested? By UT and verified the feature in out production environment ### Was this patch authored or co-authored using generative AI tooling? No Closes #45367 from TakawaAkirayo/SPARK-47253. Lead-authored-by: tatian <tat...@ebay.com> Co-authored-by: TakawaAkirayo <153728772+takawaakir...@users.noreply.github.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../org/apache/spark/internal/config/package.scala | 11 ++++++ .../apache/spark/scheduler/AsyncEventQueue.scala | 9 ++++- .../spark/scheduler/SparkListenerSuite.scala | 46 ++++++++++++++++++++++ 3 files changed, 64 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index f086038d0f6c..b2cbb6f6deb6 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1041,6 +1041,17 @@ package object config { .timeConf(TimeUnit.NANOSECONDS) .createWithDefaultString("1s") + private[spark] val LISTENER_BUS_EXIT_TIMEOUT = + ConfigBuilder("spark.scheduler.listenerbus.exitTimeout") + .doc("The time that event queue waits until the dispatch thread exits " + + "when stop is invoked. " + + "This is set to 0 by default for graceful shutdown of the event queue, " + + "but allow the user to configure the waiting time.") + .version("4.0.0") + .timeConf(TimeUnit.MILLISECONDS) + .checkValue(_ >= 0, "Listener bus exit timeout must be non-negative duration") + .createWithDefault(0) + // This property sets the root namespace for metrics reporting private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace") .version("2.1.0") diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index 2709fe27f406..271fc9ac92ba 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -143,10 +143,15 @@ private class AsyncEventQueue( eventCount.incrementAndGet() eventQueue.put(POISON_PILL) } - // this thread might be trying to stop itself as part of error handling -- we can't join + // This thread might be trying to stop itself as part of error handling -- we can't join // in that case. if (Thread.currentThread() != dispatchThread) { - dispatchThread.join() + // If users don't want to wait for the dispatch to end until all events are drained, + // they can control the waiting time by themselves. + // By default, the `waitingTimeMs` is set to 0, + // which means it will wait until all events are drained. + val exitTimeoutMs = conf.get(LISTENER_BUS_EXIT_TIMEOUT) + dispatchThread.join(exitTimeoutMs) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 34b2a40d1e3b..6dc4d4da7bfc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -176,6 +176,52 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match assert(drained) } + test("allow bus.stop() to not wait for the event queue to completely drain") { + @volatile var drained = false + + // When Listener has started + val listenerStarted = new Semaphore(0) + + // Tells the listener to stop blocking + val listenerWait = new Semaphore(0) + + // Make sure the event drained + val drainWait = new Semaphore(0) + + class BlockingListener extends SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + listenerStarted.release() + listenerWait.acquire() + drained = true + drainWait.release() + } + } + + val sparkConf = new SparkConf().set(LISTENER_BUS_EXIT_TIMEOUT, 100L) + val bus = new LiveListenerBus(sparkConf) + val blockingListener = new BlockingListener + + bus.addToSharedQueue(blockingListener) + bus.start(mockSparkContext, mockMetricsSystem) + bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) + + listenerStarted.acquire() + // if reach here, the dispatch thread should be blocked at onJobEnd + + // stop the bus now, the queue will waiting for event drain with specified timeout + bus.stop() + // if reach here, the bus has exited without draining completely, + // otherwise it will hung here forever. + + // the event dispatch thread should remain blocked after the bus has stopped. + // which means the bus exited upon reaching the timeout + // without all the events being completely drained + assert(!drained) + + // unblock the dispatch thread + listenerWait.release() + } + test("metrics for dropped listener events") { val bus = new LiveListenerBus(new SparkConf().set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 1)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org