This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ce93c9fd8671 [SPARK-47383][CORE] Support `spark.shutdown.timeout` 
config
ce93c9fd8671 is described below

commit ce93c9fd86715e2479552628398f6fc11e83b2af
Author: Rob Reeves <roree...@linkedin.com>
AuthorDate: Mon Mar 18 10:36:38 2024 -0700

    [SPARK-47383][CORE] Support `spark.shutdown.timeout` config
    
    ### What changes were proposed in this pull request?
    Make the shutdown hook timeout configurable. If this is not defined it 
falls back to the existing behavior, which uses a default timeout of 30 
seconds, or whatever is defined in core-site.xml for the 
hadoop.service.shutdown.timeout property.
    
    ### Why are the changes needed?
    Spark sometimes times out during the shutdown process. This can result in 
data left in the queues to be dropped and causes metadata loss (e.g. event 
logs, anything written by custom listeners).
    
    This is not easily configurable before this change. The underlying 
`org.apache.hadoop.util.ShutdownHookManager` has a default timeout of 30 
seconds.  It can be configured by setting hadoop.service.shutdown.timeout, but 
this must be done in the core-site.xml/core-default.xml because a new hadoop 
conf object is created and there is no opportunity to modify it.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, a new config `spark.shutdown.timeout` is added.
    
    ### How was this patch tested?
    Manual testing in spark-shell. This behavior is not practical to write a 
unit test for.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #45504 from robreeves/sc_shutdown_timeout.
    
    Authored-by: Rob Reeves <roree...@linkedin.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/internal/config/package.scala    | 10 ++++++++++
 .../org/apache/spark/util/ShutdownHookManager.scala   | 19 +++++++++++++++++--
 2 files changed, 27 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index aa240b5cc5b5..e72b9cb694eb 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2683,4 +2683,14 @@ package object config {
       .version("4.0.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val SPARK_SHUTDOWN_TIMEOUT_MS =
+    ConfigBuilder("spark.shutdown.timeout")
+      .internal()
+      .doc("Defines the timeout period to wait for all shutdown hooks to be 
executed. " +
+        "This must be passed as a system property argument in the Java 
options, for example " +
+        "spark.driver.extraJavaOptions=\"-Dspark.shutdown.timeout=60s\".")
+      .version("4.0.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createOptional
 }
diff --git 
a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala 
b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index 4db268604a3e..c6cad9440168 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -19,12 +19,16 @@ package org.apache.spark.util
 
 import java.io.File
 import java.util.PriorityQueue
+import java.util.concurrent.TimeUnit
 
 import scala.util.Try
 
 import org.apache.hadoop.fs.FileSystem
 
+import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.SPARK_SHUTDOWN_TIMEOUT_MS
+
 
 /**
  * Various utility methods used by Spark.
@@ -177,8 +181,19 @@ private [util] class SparkShutdownHookManager {
     val hookTask = new Runnable() {
       override def run(): Unit = runAll()
     }
-    org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
-      hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30)
+    val priority = FileSystem.SHUTDOWN_HOOK_PRIORITY + 30
+    // The timeout property must be passed as a Java system property because 
this
+    // is initialized before Spark configurations are registered as system
+    // properties later in initialization.
+    val timeout = new SparkConf().get(SPARK_SHUTDOWN_TIMEOUT_MS)
+
+    timeout.fold {
+      org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
+        hookTask, priority)
+    } { t =>
+      org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
+        hookTask, priority, t, TimeUnit.MILLISECONDS)
+    }
   }
 
   def runAll(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to