This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ce93c9fd8671 [SPARK-47383][CORE] Support `spark.shutdown.timeout` config ce93c9fd8671 is described below commit ce93c9fd86715e2479552628398f6fc11e83b2af Author: Rob Reeves <roree...@linkedin.com> AuthorDate: Mon Mar 18 10:36:38 2024 -0700 [SPARK-47383][CORE] Support `spark.shutdown.timeout` config ### What changes were proposed in this pull request? Make the shutdown hook timeout configurable. If this is not defined it falls back to the existing behavior, which uses a default timeout of 30 seconds, or whatever is defined in core-site.xml for the hadoop.service.shutdown.timeout property. ### Why are the changes needed? Spark sometimes times out during the shutdown process. This can result in data left in the queues to be dropped and causes metadata loss (e.g. event logs, anything written by custom listeners). This is not easily configurable before this change. The underlying `org.apache.hadoop.util.ShutdownHookManager` has a default timeout of 30 seconds. It can be configured by setting hadoop.service.shutdown.timeout, but this must be done in the core-site.xml/core-default.xml because a new hadoop conf object is created and there is no opportunity to modify it. ### Does this PR introduce _any_ user-facing change? Yes, a new config `spark.shutdown.timeout` is added. ### How was this patch tested? Manual testing in spark-shell. This behavior is not practical to write a unit test for. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45504 from robreeves/sc_shutdown_timeout. Authored-by: Rob Reeves <roree...@linkedin.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../org/apache/spark/internal/config/package.scala | 10 ++++++++++ .../org/apache/spark/util/ShutdownHookManager.scala | 19 +++++++++++++++++-- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index aa240b5cc5b5..e72b9cb694eb 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2683,4 +2683,14 @@ package object config { .version("4.0.0") .booleanConf .createWithDefault(false) + + private[spark] val SPARK_SHUTDOWN_TIMEOUT_MS = + ConfigBuilder("spark.shutdown.timeout") + .internal() + .doc("Defines the timeout period to wait for all shutdown hooks to be executed. " + + "This must be passed as a system property argument in the Java options, for example " + + "spark.driver.extraJavaOptions=\"-Dspark.shutdown.timeout=60s\".") + .version("4.0.0") + .timeConf(TimeUnit.MILLISECONDS) + .createOptional } diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala index 4db268604a3e..c6cad9440168 100644 --- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala +++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala @@ -19,12 +19,16 @@ package org.apache.spark.util import java.io.File import java.util.PriorityQueue +import java.util.concurrent.TimeUnit import scala.util.Try import org.apache.hadoop.fs.FileSystem +import org.apache.spark.SparkConf import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.SPARK_SHUTDOWN_TIMEOUT_MS + /** * Various utility methods used by Spark. @@ -177,8 +181,19 @@ private [util] class SparkShutdownHookManager { val hookTask = new Runnable() { override def run(): Unit = runAll() } - org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook( - hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30) + val priority = FileSystem.SHUTDOWN_HOOK_PRIORITY + 30 + // The timeout property must be passed as a Java system property because this + // is initialized before Spark configurations are registered as system + // properties later in initialization. + val timeout = new SparkConf().get(SPARK_SHUTDOWN_TIMEOUT_MS) + + timeout.fold { + org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook( + hookTask, priority) + } { t => + org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook( + hookTask, priority, t, TimeUnit.MILLISECONDS) + } } def runAll(): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org