This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 88f5122 [SPARK-37576][K8S] Support built-in K8s executor rolling
plugin
88f5122 is described below
commit 88f5122d37e3cea783616d3c3a8a6198464f1b4d
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Wed Dec 8 07:23:30 2021 -0800
[SPARK-37576][K8S] Support built-in K8s executor rolling plugin
### What changes were proposed in this pull request?
This PR aims to add a built-in plugin for K8s executor rolling decommission
via the following.
```
spark-3.3.0-SNAPSHOT-bin-3.3.1/bin/spark-submit \
--master k8s://https://kubernetes.docker.internal:6443 \
--deploy-mode cluster \
-c spark.decommission.enabled=true \
-c spark.plugins=org.apache.spark.scheduler.cluster.k8s.ExecutorRollPlugin \
-c spark.kubernetes.executor.rollInterval=60 \
-c spark.executor.instances=2 \
-c spark.kubernetes.container.image=spark:latest \
--class org.apache.spark.examples.SparkPi \
local:///opt/spark/examples/jars/spark-examples_2.12-3.3.0-SNAPSHOT.jar
200000
```
### Why are the changes needed?
This built-in plug-in is helpful when we want to refresh the long-lived
executors to new ones.
### Does this PR introduce _any_ user-facing change?
No. This is a new feature.
### How was this patch tested?
Pass the K8s IT test.
I verified that newly added test case `Rolling decommissioning (1 minute,
11 seconds)` passed.
```
[info] KubernetesSuite:
[info] - Run SparkPi with no resources (16 seconds, 49 milliseconds)
[info] - Run SparkPi with no resources & statefulset allocation (15
seconds, 604 milliseconds)
[info] - Run SparkPi with a very long application name. (16 seconds, 439
milliseconds)
[info] - Use SparkLauncher.NO_RESOURCE (15 seconds, 433 milliseconds)
[info] - Run SparkPi with a master URL without a scheme. (15 seconds, 528
milliseconds)
[info] - Run SparkPi with an argument. (16 seconds, 396 milliseconds)
[info] - Run SparkPi with custom labels, annotations, and environment
variables. (15 seconds, 436 milliseconds)
[info] - All pods have the same service account by default (16 seconds, 451
milliseconds)
[info] - Run extraJVMOptions check on driver (8 seconds, 361 milliseconds)
... (omitted some irrelevant failures) ...
[info] - Verify logging configuration is picked from the provided
SPARK_CONF_DIR/log4j.properties (16 seconds, 663 milliseconds)
[info] - Run SparkPi with env and mount secrets. (26 seconds, 276
milliseconds)
[info] - Run PySpark on simple pi.py example (18 seconds, 479 milliseconds)
[info] - Run PySpark to test a pyfiles example (21 seconds, 673
milliseconds)
[info] - Run PySpark with memory customization (16 seconds, 411
milliseconds)
[info] - Run in client mode. (12 seconds, 357 milliseconds)
[info] - Start pod creation from template (15 seconds, 619 milliseconds)
... (omitted some irrelevant failures) ...
[info] - Launcher client dependencies (31 seconds, 421 milliseconds)
[info] - SPARK-33615: Launcher client archives (1 minute, 17 seconds)
[info] - SPARK-33748: Launcher python client respecting PYSPARK_PYTHON (37
seconds, 28 milliseconds)
[info] - SPARK-33748: Launcher python client respecting
spark.pyspark.python and spark.pyspark.driver.python (36 seconds, 661
milliseconds)
[info] - Launcher python client dependencies using a zip file (36 seconds,
411 milliseconds)
[info] - Test basic decommissioning (50 seconds, 105 milliseconds)
[info] - Test basic decommissioning with shuffle cleanup (51 seconds, 285
milliseconds)
... (omitted some irrelevant failures) ...
[info] - Test decommissioning timeouts (51 seconds, 40 milliseconds)
[info] - Rolling decommissioning (1 minute, 11 seconds)
... (omitted some irrelevant failures) ...
```
Closes #34832 from dongjoon-hyun/SPARK-37576.
Lead-authored-by: Dongjoon Hyun <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../scala/org/apache/spark/deploy/k8s/Config.scala | 8 ++
.../scheduler/cluster/k8s/ExecutorRollPlugin.scala | 99 ++++++++++++++++++++++
.../k8s/integrationtest/DecommissionSuite.scala | 29 +++++++
3 files changed, 136 insertions(+)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 2458e2d..aff6473 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -137,6 +137,14 @@ private[spark] object Config extends Logging {
.longConf
.createWithDefault(1572864) // 1.5 MiB
+ val EXECUTOR_ROLL_INTERVAL =
+ ConfigBuilder("spark.kubernetes.executor.rollInterval")
+ .doc("Interval between executor roll operations. To disable, set 0
(default)")
+ .version("3.3.0")
+ .timeConf(TimeUnit.SECONDS)
+ .checkValue(_ >= 0, "Interval should be non-negative")
+ .createWithDefault(0)
+
val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
"spark.kubernetes.authenticate.driver"
val KUBERNETES_AUTH_EXECUTOR_CONF_PREFIX =
"spark.kubernetes.authenticate.executor"
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
"spark.kubernetes.authenticate.driver.mounted"
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
new file mode 100644
index 0000000..9b76acd
--- /dev/null
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.{Map => JMap}
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin,
PluginContext, SparkPlugin}
+import org.apache.spark.deploy.k8s.Config.EXECUTOR_ROLL_INTERVAL
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.DECOMMISSION_ENABLED
+import org.apache.spark.scheduler.ExecutorDecommissionInfo
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Spark plugin to roll executor pods periodically.
+ * This is independent from ExecutorPodsAllocator and aims to decommission
executors
+ * one by one in both static and dynamic allocation.
+ *
+ * To use this plugin, we assume that a user has the required maximum number
of executors + 1
+ * in both static and dynamic allocation configurations.
+ */
+class ExecutorRollPlugin extends SparkPlugin with Logging {
+ override def driverPlugin(): DriverPlugin = {
+ new DriverPlugin() {
+ private var sparkContext: SparkContext = _
+
+ private val periodicService: ScheduledExecutorService =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("executor-roller")
+
+ override def init(sc: SparkContext, ctx: PluginContext): JMap[String,
String] = {
+ val interval = sc.conf.get(EXECUTOR_ROLL_INTERVAL)
+ if (interval <= 0) {
+ logWarning(s"Disabled due to invalid interval value, '$interval'")
+ } else if (!sc.conf.get(DECOMMISSION_ENABLED)) {
+ logWarning(s"Disabled because ${DECOMMISSION_ENABLED.key} is false.")
+ } else {
+ // Scheduler is not created yet
+ sparkContext = sc
+
+ periodicService.scheduleAtFixedRate(() => {
+ try {
+ sparkContext.schedulerBackend match {
+ case scheduler: KubernetesClusterSchedulerBackend =>
+ // Roughly assume that the smallest ID executor is the most
long-lived one.
+ val smallestID = scheduler
+ .getExecutorIds()
+ .filterNot(_.equals(SparkContext.DRIVER_IDENTIFIER))
+ .map(_.toInt)
+ .sorted
+ .headOption
+ smallestID match {
+ case Some(id) =>
+ // Use decommission to be safe.
+ logInfo(s"Ask to decommission executor $id")
+ val now = System.currentTimeMillis()
+ scheduler.decommissionExecutor(
+ id.toString,
+ ExecutorDecommissionInfo(s"Rolling at $now"),
+ adjustTargetNumExecutors = false)
+ case _ =>
+ logInfo("There is nothing to roll.")
+ }
+ case _ =>
+ logWarning("This plugin expects " +
+
s"${classOf[KubernetesClusterSchedulerBackend].getSimpleName}.")
+ }
+ } catch {
+ case e: Throwable => logError("Error in rolling thread", e)
+ }
+ }, interval, interval, TimeUnit.SECONDS)
+ }
+ Map.empty[String, String].asJava
+ }
+
+ override def shutdown(): Unit = periodicService.shutdown()
+ }
+ }
+
+ // No-op
+ override def executorPlugin(): ExecutorPlugin = null
+}
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
index 1250126..ad6654d 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
@@ -24,6 +24,7 @@ import org.scalatest.matchers.should.Matchers._
import org.scalatest.time.{Minutes, Seconds, Span}
import org.apache.spark.internal.config
+import org.apache.spark.internal.config.PLUGINS
private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite =>
@@ -176,6 +177,34 @@ private[spark] trait DecommissionSuite { k8sSuite:
KubernetesSuite =>
executorPatience = None,
decommissioningTest = true)
}
+
+ test("SPARK-37576: Rolling decommissioning", k8sTestTag) {
+ sparkAppConf
+ .set("spark.kubernetes.container.image", pyImage)
+ .set(config.DECOMMISSION_ENABLED.key, "true")
+ .set(PLUGINS.key,
"org.apache.spark.scheduler.cluster.k8s.ExecutorRollPlugin")
+ .set("spark.kubernetes.executor.rollInterval", "30s")
+
+ runSparkApplicationAndVerifyCompletion(
+ appResource = PythonTestsSuite.PYSPARK_PI,
+ mainClass = "",
+ expectedDriverLogOnCompletion = Seq(
+ "Initialized driver component for plugin " +
+ "org.apache.spark.scheduler.cluster.k8s.ExecutorRollPlugin",
+ "Ask to decommission executor 1",
+ "Removed 1 successfully in removeExecutor",
+ "Going to request 1 executors",
+ "Ask to decommission executor 2",
+ "Removed 2 successfully in removeExecutor",
+ "Going to request 1 executors"),
+ appArgs = Array("10000"),
+ driverPodChecker = doBasicDriverPyPodCheck,
+ executorPodChecker = doBasicExecutorPyPodCheck,
+ isJVM = false,
+ pyFiles = None,
+ executorPatience = None,
+ decommissioningTest = true)
+ }
}
private[spark] object DecommissionSuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]