This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 82a02321b87 [SPARK-46348][CORE] Support `spark.deploy.recoveryTimeout`
82a02321b87 is described below
commit 82a02321b873cc67a521e98e25b9b07fd84b5684
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun Dec 10 00:44:33 2023 -0800
[SPARK-46348][CORE] Support `spark.deploy.recoveryTimeout`
### What changes were proposed in this pull request?
This PR aims to support a new configuration, `spark.deploy.recoveryTimeout`.
### Why are the changes needed?
To allow the users to control this independently from
`spark.worker.timeout`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual review.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44282 from dongjoon-hyun/SPARK-46348.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 4 +++-
.../src/main/scala/org/apache/spark/internal/config/Deploy.scala | 9 +++++++++
docs/spark-standalone.md | 9 +++++++++
3 files changed, 21 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index be787dd29f8..7346c80aff4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -65,6 +65,8 @@ private[deploy] class Master(
private val retainedDrivers = conf.get(RETAINED_DRIVERS)
private val maxDrivers = conf.get(MAX_DRIVERS)
private val reaperIterations = conf.get(REAPER_ITERATIONS)
+ private val recoveryTimeoutMs =
+ conf.get(RECOVERY_TIMEOUT).map(_ * 1000).getOrElse(workerTimeoutMs)
private val recoveryMode = conf.get(RECOVERY_MODE)
private val maxExecutorRetries = conf.get(MAX_EXECUTOR_RETRIES)
@@ -246,7 +248,7 @@ private[deploy] class Master(
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CompleteRecovery)
}
- }, workerTimeoutMs, TimeUnit.MILLISECONDS)
+ }, recoveryTimeoutMs, TimeUnit.MILLISECONDS)
}
case CompleteRecovery => completeRecovery()
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
index b52ea356789..6585d62b3b9 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
@@ -18,6 +18,7 @@
package org.apache.spark.internal.config
import java.util.Locale
+import java.util.concurrent.TimeUnit
private[spark] object Deploy {
val RECOVERY_MODE = ConfigBuilder("spark.deploy.recoveryMode")
@@ -56,6 +57,14 @@ private[spark] object Deploy {
.stringConf
.createWithDefault("")
+ val RECOVERY_TIMEOUT = ConfigBuilder("spark.deploy.recoveryTimeout")
+ .doc("Configures the timeout for recovery process. The default value is
the same " +
+ "with ${WORKER_TIMEOUT.key}.")
+ .version("4.0.0")
+ .timeConf(TimeUnit.SECONDS)
+ .checkValue(_ > 0, "spark.deploy.recoveryTimeout must be positive.")
+ .createOptional
+
val ZOOKEEPER_URL = ConfigBuilder("spark.deploy.zookeeper.url")
.doc(s"When `${RECOVERY_MODE.key}` is set to ZOOKEEPER, this " +
"configuration is used to set the zookeeper URL to connect to.")
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 19935307221..0bc73978570 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -793,6 +793,15 @@ In order to enable this recovery mode, you can set
SPARK_DAEMON_JAVA_OPTS in spa
<td>A compression codec for persistence engines. none (default), lz4, lzf,
snappy, and zstd. Currently, only FILESYSTEM mode supports this
configuration.</td>
<td>4.0.0</td>
</tr>
+ <tr>
+ <td><code>spark.deploy.recoveryTimeout</code></td>
+ <td>(none)</td>
+ <td>
+ The timeout for recovery process. The default value is the same with
+ <code>spark.worker.timeout</code>.
+ </td>
+ <td>4.0.0</td>
+ </tr>
<tr>
<td><code>spark.deploy.recoveryMode.factory</code></td>
<td>""</td>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]