This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2a93e46eb062 [SPARK-47207][CORE] Support `spark.driver.timeout` and
`DriverTimeoutPlugin`
2a93e46eb062 is described below
commit 2a93e46eb0627df9cd288156bffa0a0815906c3c
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Wed Feb 28 09:27:53 2024 -0800
[SPARK-47207][CORE] Support `spark.driver.timeout` and `DriverTimeoutPlugin`
### What changes were proposed in this pull request?
This PR aims to support `spark.driver.timeout` and `DriverTimeoutPlugin`.
### Why are the changes needed?
Sometime, Spark applications fall into abnormal situation and hang.
We had better provide a way to guarantee the termination after pre-defined
timeout via a standard way.
- spark.plugins=org.apache.spark.deploy.DriverTimeoutPlugin
- spark.driver.timeout=1min
```
$ bin/spark-shell -c
spark.plugins=org.apache.spark.deploy.DriverTimeoutPlugin -c
spark.driver.timeout=1min
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 4.0.0-SNAPSHOT
/_/
Using Scala version 2.13.12 (OpenJDK 64-Bit Server VM, Java 17.0.10)
Type in expressions to have them evaluated.
Type :help for more information.
24/02/28 06:53:34 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = local[*], app id =
local-1709132014477).
Spark session available as 'spark'.
scala> 24/02/28 06:54:34 WARN DriverTimeoutDriverPlugin: Terminate Driver
JVM because it runs after 1 minute
$ echo $?
124
```
### Does this PR introduce _any_ user-facing change?
No, this is a new feature and a built-in plugin.
### How was this patch tested?
Manually because this invokes `System.exit`.
1. Timeout with 1 minute
```
$ bin/spark-shell -c
spark.plugins=org.apache.spark.deploy.DriverTimeoutPlugin -c
spark.driver.timeout=1min
...
scala> 24/02/28 06:54:34 WARN DriverTimeoutDriverPlugin: Terminate Driver
JVM because it runs after 1 minute
$ echo $?
124
```
2. `DriverTimeoutPlugin` will be ignored if the default value of
`spark.driver.timeout` is used.
```
$ bin/spark-shell -c
spark.plugins=org.apache.spark.deploy.DriverTimeoutPlugin
...
24/02/28 01:02:57 WARN DriverTimeoutDriverPlugin: Disabled with the timeout
value 0.
...
scala>
```
3. `spark.driver.timeout` will be ignored if `DriverTimeoutPlugin` is not
provided.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #45313 from dongjoon-hyun/SPARK-47207.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/deploy/DriverTimeoutPlugin.scala | 62 ++++++++++++++++++++++
.../org/apache/spark/internal/config/package.scala | 9 ++++
.../org/apache/spark/util/SparkExitCode.scala | 3 ++
docs/configuration.md | 11 ++++
4 files changed, 85 insertions(+)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/DriverTimeoutPlugin.scala
b/core/src/main/scala/org/apache/spark/deploy/DriverTimeoutPlugin.scala
new file mode 100644
index 000000000000..9b141d607572
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/DriverTimeoutPlugin.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy
+
+import java.util.{Map => JMap}
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin,
PluginContext, SparkPlugin}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.DRIVER_TIMEOUT
+import org.apache.spark.util.{SparkExitCode, ThreadUtils}
+
+/**
+ * A built-in plugin to provide Driver timeout feature.
+ */
+class DriverTimeoutPlugin extends SparkPlugin {
+ override def driverPlugin(): DriverPlugin = new DriverTimeoutDriverPlugin()
+
+ // No-op
+ override def executorPlugin(): ExecutorPlugin = null
+}
+
+class DriverTimeoutDriverPlugin extends DriverPlugin with Logging {
+
+ private val timeoutService: ScheduledExecutorService =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-timeout")
+
+ override def init(sc: SparkContext, ctx: PluginContext): JMap[String,
String] = {
+ val timeout = sc.conf.get(DRIVER_TIMEOUT)
+ if (timeout == 0) {
+ logWarning("Disabled with the timeout value 0.")
+ } else {
+ val task: Runnable = () => {
+ logWarning(s"Terminate Driver JVM because it runs after $timeout
minute" +
+ (if (timeout == 1) "" else "s"))
+ // We cannot use 'SparkContext.stop' because SparkContext might be in
abnormal situation.
+ System.exit(SparkExitCode.DRIVER_TIMEOUT)
+ }
+ timeoutService.schedule(task, timeout, TimeUnit.MINUTES)
+ }
+ Map.empty[String, String].asJava
+ }
+
+ override def shutdown(): Unit = timeoutService.shutdown()
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 7caac5884c74..1fcf75b02503 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1099,6 +1099,15 @@ package object config {
.booleanConf
.createWithDefault(false)
+ private[spark] val DRIVER_TIMEOUT = ConfigBuilder("spark.driver.timeout")
+ .doc("A timeout for Spark driver in minutes. 0 means infinite. For the
positive time value, " +
+ "terminate the driver with the exit code 124 if it runs after timeout
duration. To use, " +
+ "it's required to set
`spark.plugins=org.apache.spark.deploy.DriverTimeoutPlugin`.")
+ .version("4.0.0")
+ .timeConf(TimeUnit.MINUTES)
+ .checkValue(v => v >= 0, "The value should be a non-negative time value.")
+ .createWithDefaultString("0min")
+
private[spark] val DRIVER_BIND_ADDRESS =
ConfigBuilder("spark.driver.bindAddress")
.doc("Address where to bind network listen sockets on the driver.")
.version("2.1.0")
diff --git a/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala
b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala
index 75b3d134b94d..e8f8788243cd 100644
--- a/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala
+++ b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala
@@ -45,6 +45,9 @@ private[spark] object SparkExitCode {
OutOfMemoryError. */
val OOM = 52
+ /** Exit because the driver is running over the given threshold. */
+ val DRIVER_TIMEOUT = 124
+
/** Exception indicate command not found. */
val ERROR_COMMAND_NOT_FOUND = 127
}
diff --git a/docs/configuration.md b/docs/configuration.md
index f6e1e449e2dc..f0d68c55e7b3 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -434,6 +434,17 @@ of the most common options to set are:
</td>
<td>1.3.0</td>
</tr>
+<tr>
+ <td><code>spark.driver.timeout</code></td>
+ <td>0min</td>
+ <td>
+ A timeout for Spark driver in minutes. 0 means infinite. For the positive
time value,
+ terminate the driver with the exit code 124 if it runs after timeout
duration. To use,
+ it's required to set <code>spark.plugins</code> with
+ <code>org.apache.spark.deploy.DriverTimeoutPlugin</code>.
+ </td>
+ <td>4.0.0</td>
+</tr>
<tr>
<td><code>spark.driver.log.localDir</code></td>
<td>(none)</td>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]