This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 82a02321b87 [SPARK-46348][CORE] Support `spark.deploy.recoveryTimeout` 82a02321b87 is described below commit 82a02321b873cc67a521e98e25b9b07fd84b5684 Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Sun Dec 10 00:44:33 2023 -0800 [SPARK-46348][CORE] Support `spark.deploy.recoveryTimeout` ### What changes were proposed in this pull request? This PR aims to support a new configuration, `spark.deploy.recoveryTimeout`. ### Why are the changes needed? To allow the users to control this independently from `spark.worker.timeout`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual review. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44282 from dongjoon-hyun/SPARK-46348. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 4 +++- .../src/main/scala/org/apache/spark/internal/config/Deploy.scala | 9 +++++++++ docs/spark-standalone.md | 9 +++++++++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index be787dd29f8..7346c80aff4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -65,6 +65,8 @@ private[deploy] class Master( private val retainedDrivers = conf.get(RETAINED_DRIVERS) private val maxDrivers = conf.get(MAX_DRIVERS) private val reaperIterations = conf.get(REAPER_ITERATIONS) + private val recoveryTimeoutMs = + conf.get(RECOVERY_TIMEOUT).map(_ * 1000).getOrElse(workerTimeoutMs) private val recoveryMode = conf.get(RECOVERY_MODE) private val maxExecutorRetries = conf.get(MAX_EXECUTOR_RETRIES) @@ -246,7 +248,7 @@ private[deploy] class Master( override def run(): Unit = Utils.tryLogNonFatalError { self.send(CompleteRecovery) } - }, workerTimeoutMs, TimeUnit.MILLISECONDS) + }, recoveryTimeoutMs, TimeUnit.MILLISECONDS) } case CompleteRecovery => completeRecovery() diff --git a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala index b52ea356789..6585d62b3b9 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala @@ -18,6 +18,7 @@ package org.apache.spark.internal.config import java.util.Locale +import java.util.concurrent.TimeUnit private[spark] object Deploy { val RECOVERY_MODE = ConfigBuilder("spark.deploy.recoveryMode") @@ -56,6 +57,14 @@ private[spark] object Deploy { .stringConf .createWithDefault("") + val RECOVERY_TIMEOUT = ConfigBuilder("spark.deploy.recoveryTimeout") + .doc("Configures the timeout for recovery process. The default value is the same " + + "with ${WORKER_TIMEOUT.key}.") + .version("4.0.0") + .timeConf(TimeUnit.SECONDS) + .checkValue(_ > 0, "spark.deploy.recoveryTimeout must be positive.") + .createOptional + val ZOOKEEPER_URL = ConfigBuilder("spark.deploy.zookeeper.url") .doc(s"When `${RECOVERY_MODE.key}` is set to ZOOKEEPER, this " + "configuration is used to set the zookeeper URL to connect to.") diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 19935307221..0bc73978570 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -793,6 +793,15 @@ In order to enable this recovery mode, you can set SPARK_DAEMON_JAVA_OPTS in spa <td>A compression codec for persistence engines. none (default), lz4, lzf, snappy, and zstd. Currently, only FILESYSTEM mode supports this configuration.</td> <td>4.0.0</td> </tr> + <tr> + <td><code>spark.deploy.recoveryTimeout</code></td> + <td>(none)</td> + <td> + The timeout for recovery process. The default value is the same with + <code>spark.worker.timeout</code>. + </td> + <td>4.0.0</td> + </tr> <tr> <td><code>spark.deploy.recoveryMode.factory</code></td> <td>""</td> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org