This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 88f5122  [SPARK-37576][K8S] Support built-in K8s executor rolling 
plugin
88f5122 is described below

commit 88f5122d37e3cea783616d3c3a8a6198464f1b4d
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Wed Dec 8 07:23:30 2021 -0800

    [SPARK-37576][K8S] Support built-in K8s executor rolling plugin
    
    ### What changes were proposed in this pull request?
    
    This PR aims to add a built-in plugin for K8s executor rolling decommission 
via the following.
    
    ```
    spark-3.3.0-SNAPSHOT-bin-3.3.1/bin/spark-submit \
    --master k8s://https://kubernetes.docker.internal:6443 \
    --deploy-mode cluster \
    -c spark.decommission.enabled=true \
    -c spark.plugins=org.apache.spark.scheduler.cluster.k8s.ExecutorRollPlugin \
    -c spark.kubernetes.executor.rollInterval=60 \
    -c spark.executor.instances=2 \
    -c spark.kubernetes.container.image=spark:latest \
    --class org.apache.spark.examples.SparkPi \
    local:///opt/spark/examples/jars/spark-examples_2.12-3.3.0-SNAPSHOT.jar 
200000
    ```
    
    ### Why are the changes needed?
    
    This built-in plug-in is helpful when we want to refresh the long-lived 
executors to new ones.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This is a new feature.
    
    ### How was this patch tested?
    
    Pass the K8s IT test.
    
    I verified that newly added test case `Rolling decommissioning (1 minute, 
11 seconds)` passed.
    ```
    [info] KubernetesSuite:
    [info] - Run SparkPi with no resources (16 seconds, 49 milliseconds)
    [info] - Run SparkPi with no resources & statefulset allocation (15 
seconds, 604 milliseconds)
    [info] - Run SparkPi with a very long application name. (16 seconds, 439 
milliseconds)
    [info] - Use SparkLauncher.NO_RESOURCE (15 seconds, 433 milliseconds)
    [info] - Run SparkPi with a master URL without a scheme. (15 seconds, 528 
milliseconds)
    [info] - Run SparkPi with an argument. (16 seconds, 396 milliseconds)
    [info] - Run SparkPi with custom labels, annotations, and environment 
variables. (15 seconds, 436 milliseconds)
    [info] - All pods have the same service account by default (16 seconds, 451 
milliseconds)
    [info] - Run extraJVMOptions check on driver (8 seconds, 361 milliseconds)
    ... (omitted some irrelevant failures) ...
    [info] - Verify logging configuration is picked from the provided 
SPARK_CONF_DIR/log4j.properties (16 seconds, 663 milliseconds)
    [info] - Run SparkPi with env and mount secrets. (26 seconds, 276 
milliseconds)
    [info] - Run PySpark on simple pi.py example (18 seconds, 479 milliseconds)
    [info] - Run PySpark to test a pyfiles example (21 seconds, 673 
milliseconds)
    [info] - Run PySpark with memory customization (16 seconds, 411 
milliseconds)
    [info] - Run in client mode. (12 seconds, 357 milliseconds)
    [info] - Start pod creation from template (15 seconds, 619 milliseconds)
    ... (omitted some irrelevant failures) ...
    [info] - Launcher client dependencies (31 seconds, 421 milliseconds)
    [info] - SPARK-33615: Launcher client archives (1 minute, 17 seconds)
    [info] - SPARK-33748: Launcher python client respecting PYSPARK_PYTHON (37 
seconds, 28 milliseconds)
    [info] - SPARK-33748: Launcher python client respecting 
spark.pyspark.python and spark.pyspark.driver.python (36 seconds, 661 
milliseconds)
    [info] - Launcher python client dependencies using a zip file (36 seconds, 
411 milliseconds)
    [info] - Test basic decommissioning (50 seconds, 105 milliseconds)
    [info] - Test basic decommissioning with shuffle cleanup (51 seconds, 285 
milliseconds)
    ... (omitted some irrelevant failures) ...
    [info] - Test decommissioning timeouts (51 seconds, 40 milliseconds)
    [info] - Rolling decommissioning (1 minute, 11 seconds)
    ... (omitted some irrelevant failures) ...
    ```
    
    Closes #34832 from dongjoon-hyun/SPARK-37576.
    
    Lead-authored-by: Dongjoon Hyun <[email protected]>
    Co-authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../scala/org/apache/spark/deploy/k8s/Config.scala |  8 ++
 .../scheduler/cluster/k8s/ExecutorRollPlugin.scala | 99 ++++++++++++++++++++++
 .../k8s/integrationtest/DecommissionSuite.scala    | 29 +++++++
 3 files changed, 136 insertions(+)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 2458e2d..aff6473 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -137,6 +137,14 @@ private[spark] object Config extends Logging {
       .longConf
       .createWithDefault(1572864) // 1.5 MiB
 
+  val EXECUTOR_ROLL_INTERVAL =
+    ConfigBuilder("spark.kubernetes.executor.rollInterval")
+      .doc("Interval between executor roll operations. To disable, set 0 
(default)")
+      .version("3.3.0")
+      .timeConf(TimeUnit.SECONDS)
+      .checkValue(_ >= 0, "Interval should be non-negative")
+      .createWithDefault(0)
+
   val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = 
"spark.kubernetes.authenticate.driver"
   val KUBERNETES_AUTH_EXECUTOR_CONF_PREFIX = 
"spark.kubernetes.authenticate.executor"
   val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = 
"spark.kubernetes.authenticate.driver.mounted"
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
new file mode 100644
index 0000000..9b76acd
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.{Map => JMap}
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.deploy.k8s.Config.EXECUTOR_ROLL_INTERVAL
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.DECOMMISSION_ENABLED
+import org.apache.spark.scheduler.ExecutorDecommissionInfo
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Spark plugin to roll executor pods periodically.
+ * This is independent from ExecutorPodsAllocator and aims to decommission 
executors
+ * one by one in both static and dynamic allocation.
+ *
+ * To use this plugin, we assume that a user has the required maximum number 
of executors + 1
+ * in both static and dynamic allocation configurations.
+ */
+class ExecutorRollPlugin extends SparkPlugin with Logging {
+  override def driverPlugin(): DriverPlugin = {
+    new DriverPlugin() {
+      private var sparkContext: SparkContext = _
+
+      private val periodicService: ScheduledExecutorService =
+        ThreadUtils.newDaemonSingleThreadScheduledExecutor("executor-roller")
+
+      override def init(sc: SparkContext, ctx: PluginContext): JMap[String, 
String] = {
+        val interval = sc.conf.get(EXECUTOR_ROLL_INTERVAL)
+        if (interval <= 0) {
+          logWarning(s"Disabled due to invalid interval value, '$interval'")
+        } else if (!sc.conf.get(DECOMMISSION_ENABLED)) {
+          logWarning(s"Disabled because ${DECOMMISSION_ENABLED.key} is false.")
+        } else {
+          // Scheduler is not created yet
+          sparkContext = sc
+
+          periodicService.scheduleAtFixedRate(() => {
+            try {
+              sparkContext.schedulerBackend match {
+                case scheduler: KubernetesClusterSchedulerBackend =>
+                  // Roughly assume that the smallest ID executor is the most 
long-lived one.
+                  val smallestID = scheduler
+                    .getExecutorIds()
+                    .filterNot(_.equals(SparkContext.DRIVER_IDENTIFIER))
+                    .map(_.toInt)
+                    .sorted
+                    .headOption
+                  smallestID match {
+                    case Some(id) =>
+                      // Use decommission to be safe.
+                      logInfo(s"Ask to decommission executor $id")
+                      val now = System.currentTimeMillis()
+                      scheduler.decommissionExecutor(
+                        id.toString,
+                        ExecutorDecommissionInfo(s"Rolling at $now"),
+                        adjustTargetNumExecutors = false)
+                    case _ =>
+                      logInfo("There is nothing to roll.")
+                  }
+                case _ =>
+                  logWarning("This plugin expects " +
+                    
s"${classOf[KubernetesClusterSchedulerBackend].getSimpleName}.")
+              }
+            } catch {
+              case e: Throwable => logError("Error in rolling thread", e)
+            }
+          }, interval, interval, TimeUnit.SECONDS)
+        }
+        Map.empty[String, String].asJava
+      }
+
+      override def shutdown(): Unit = periodicService.shutdown()
+    }
+  }
+
+  // No-op
+  override def executorPlugin(): ExecutorPlugin = null
+}
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
index 1250126..ad6654d 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
@@ -24,6 +24,7 @@ import org.scalatest.matchers.should.Matchers._
 import org.scalatest.time.{Minutes, Seconds, Span}
 
 import org.apache.spark.internal.config
+import org.apache.spark.internal.config.PLUGINS
 
 private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite =>
 
@@ -176,6 +177,34 @@ private[spark] trait DecommissionSuite { k8sSuite: 
KubernetesSuite =>
       executorPatience = None,
       decommissioningTest = true)
   }
+
+  test("SPARK-37576: Rolling decommissioning", k8sTestTag) {
+    sparkAppConf
+      .set("spark.kubernetes.container.image", pyImage)
+      .set(config.DECOMMISSION_ENABLED.key, "true")
+      .set(PLUGINS.key, 
"org.apache.spark.scheduler.cluster.k8s.ExecutorRollPlugin")
+      .set("spark.kubernetes.executor.rollInterval", "30s")
+
+    runSparkApplicationAndVerifyCompletion(
+      appResource = PythonTestsSuite.PYSPARK_PI,
+      mainClass = "",
+      expectedDriverLogOnCompletion = Seq(
+        "Initialized driver component for plugin " +
+          "org.apache.spark.scheduler.cluster.k8s.ExecutorRollPlugin",
+        "Ask to decommission executor 1",
+        "Removed 1 successfully in removeExecutor",
+        "Going to request 1 executors",
+        "Ask to decommission executor 2",
+        "Removed 2 successfully in removeExecutor",
+        "Going to request 1 executors"),
+      appArgs = Array("10000"),
+      driverPodChecker = doBasicDriverPyPodCheck,
+      executorPodChecker = doBasicExecutorPyPodCheck,
+      isJVM = false,
+      pyFiles = None,
+      executorPatience = None,
+      decommissioningTest = true)
+  }
 }
 
 private[spark] object DecommissionSuite {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to