This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 82a02321b87 [SPARK-46348][CORE] Support `spark.deploy.recoveryTimeout`
82a02321b87 is described below

commit 82a02321b873cc67a521e98e25b9b07fd84b5684
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Sun Dec 10 00:44:33 2023 -0800

    [SPARK-46348][CORE] Support `spark.deploy.recoveryTimeout`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support a new configuration, `spark.deploy.recoveryTimeout`.
    
    ### Why are the changes needed?
    
    To allow the users to control this independently from 
`spark.worker.timeout`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manual review.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44282 from dongjoon-hyun/SPARK-46348.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 core/src/main/scala/org/apache/spark/deploy/master/Master.scala  | 4 +++-
 .../src/main/scala/org/apache/spark/internal/config/Deploy.scala | 9 +++++++++
 docs/spark-standalone.md                                         | 9 +++++++++
 3 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index be787dd29f8..7346c80aff4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -65,6 +65,8 @@ private[deploy] class Master(
   private val retainedDrivers = conf.get(RETAINED_DRIVERS)
   private val maxDrivers = conf.get(MAX_DRIVERS)
   private val reaperIterations = conf.get(REAPER_ITERATIONS)
+  private val recoveryTimeoutMs =
+    conf.get(RECOVERY_TIMEOUT).map(_ * 1000).getOrElse(workerTimeoutMs)
   private val recoveryMode = conf.get(RECOVERY_MODE)
   private val maxExecutorRetries = conf.get(MAX_EXECUTOR_RETRIES)
 
@@ -246,7 +248,7 @@ private[deploy] class Master(
           override def run(): Unit = Utils.tryLogNonFatalError {
             self.send(CompleteRecovery)
           }
-        }, workerTimeoutMs, TimeUnit.MILLISECONDS)
+        }, recoveryTimeoutMs, TimeUnit.MILLISECONDS)
       }
 
     case CompleteRecovery => completeRecovery()
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
index b52ea356789..6585d62b3b9 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.internal.config
 
 import java.util.Locale
+import java.util.concurrent.TimeUnit
 
 private[spark] object Deploy {
   val RECOVERY_MODE = ConfigBuilder("spark.deploy.recoveryMode")
@@ -56,6 +57,14 @@ private[spark] object Deploy {
     .stringConf
     .createWithDefault("")
 
+  val RECOVERY_TIMEOUT = ConfigBuilder("spark.deploy.recoveryTimeout")
+    .doc("Configures the timeout for recovery process. The default value is 
the same " +
+      "with ${WORKER_TIMEOUT.key}.")
+    .version("4.0.0")
+    .timeConf(TimeUnit.SECONDS)
+    .checkValue(_ > 0, "spark.deploy.recoveryTimeout must be positive.")
+    .createOptional
+
   val ZOOKEEPER_URL = ConfigBuilder("spark.deploy.zookeeper.url")
     .doc(s"When `${RECOVERY_MODE.key}` is set to ZOOKEEPER, this " +
       "configuration is used to set the zookeeper URL to connect to.")
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 19935307221..0bc73978570 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -793,6 +793,15 @@ In order to enable this recovery mode, you can set 
SPARK_DAEMON_JAVA_OPTS in spa
     <td>A compression codec for persistence engines. none (default), lz4, lzf, 
snappy, and zstd. Currently, only FILESYSTEM mode supports this 
configuration.</td>
     <td>4.0.0</td>
   </tr>
+  <tr>
+    <td><code>spark.deploy.recoveryTimeout</code></td>
+    <td>(none)</td>
+    <td>
+      The timeout for recovery process. The default value is the same with
+      <code>spark.worker.timeout</code>.
+    </td>
+    <td>4.0.0</td>
+  </tr>
   <tr>
     <td><code>spark.deploy.recoveryMode.factory</code></td>
     <td>""</td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to