This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new f05aefb86 [KYUUBI #5165][K8S][SPARK] Build Spark Driver/Executor Pod 
Name(Prefix) in process
f05aefb86 is described below

commit f05aefb86643e69c64237ebfb9db208ec509f3b2
Author: zwangsheng <[email protected]>
AuthorDate: Wed Aug 16 21:10:23 2023 +0800

    [KYUUBI #5165][K8S][SPARK] Build Spark Driver/Executor Pod Name(Prefix) in 
process
    
    ### _Why are the changes needed?_
    
    1. Print those pod name/prefix in kyuubi server log
    2. Make sure driver pod name in length limit
    
    close #5165
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    Closes #5168 from zwangsheng/KYUUBI#5165.
    
    Closes #5165
    
    290a46860 [Cheng Pan] nit
    9c9dd2d38 [Cheng Pan] Revert "Add app Name"
    50187cddb [zwangsheng] Add app Name
    f704a9839 [Cheng Pan] app key
    463904bec [Cheng Pan] revise
    33d1ffd9e [Cheng Pan] Apply suggestions from code review
    0b7e64da5 [Cheng Pan] Apply suggestions from code review
    b3b1e08fd [zwangsheng] Fix Test
    742e78461 [zwangsheng] modify long app name length to 266
    90b165e95 [zwangsheng] fix test
    191447058 [zwangsheng] fix test fail
    05698fdb1 [zwangsheng] fix test fail
    39637ae68 [zwangsheng] fix comments
    23513810f [zwangsheng] fix comments
    a56630179 [Cheng Pan] Update 
kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
    8599a1430 [zwangsheng] fix style
    73aec9a03 [zwangsheng] [KYUUBI #5165][K8S][SPARK] Build Spark 
Driver/Executor Pod Name(Prefix) in process
    
    Lead-authored-by: zwangsheng <[email protected]>
    Co-authored-by: Cheng Pan <[email protected]>
    Co-authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../engine/spark/SparkBatchProcessBuilder.scala    |  5 +-
 .../kyuubi/engine/spark/SparkProcessBuilder.scala  | 24 ++++++++-
 .../org/apache/kyuubi/util/KubernetesUtils.scala   | 33 ++++++++++++
 .../engine/spark/SparkProcessBuilderSuite.scala    | 60 +++++++++++++++++++++-
 4 files changed, 119 insertions(+), 3 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
index 7f1be93b5..ef159bb93 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
@@ -51,7 +51,10 @@ class SparkBatchProcessBuilder(
     // tag batch application
     KyuubiApplicationManager.tagApplication(batchId, "spark", 
clusterManager(), batchKyuubiConf)
 
-    (batchKyuubiConf.getAll ++ sparkAppNameConf() ++ 
engineLogPathConf()).foreach { case (k, v) =>
+    (batchKyuubiConf.getAll ++
+      sparkAppNameConf() ++
+      engineLogPathConf() ++
+      appendPodNameConf(batchConf)).foreach { case (k, v) =>
       buffer += CONF
       buffer += s"${convertConfigKey(k)}=$v"
     }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 5998d5d4e..351eddb75 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -21,6 +21,7 @@ import java.io.{File, IOException}
 import java.nio.file.Paths
 import java.util.Locale
 
+import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 import com.google.common.annotations.VisibleForTesting
@@ -34,6 +35,7 @@ import 
org.apache.kyuubi.engine.ProcBuilder.KYUUBI_ENGINE_LOG_PATH_KEY
 import org.apache.kyuubi.ha.HighAvailabilityConf
 import org.apache.kyuubi.ha.client.AuthTypes
 import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.util.KubernetesUtils
 import org.apache.kyuubi.util.Validator
 
 class SparkProcessBuilder(
@@ -118,7 +120,7 @@ class SparkProcessBuilder(
       allConf = allConf ++ zkAuthKeytabFileConf(allConf)
     }
     // pass spark engine log path to spark conf
-    (allConf ++ engineLogPathConf).foreach { case (k, v) =>
+    (allConf ++ engineLogPathConf ++ appendPodNameConf(allConf)).foreach { 
case (k, v) =>
       buffer += CONF
       buffer += s"${convertConfigKey(k)}=$v"
     }
@@ -208,6 +210,24 @@ class SparkProcessBuilder(
       kubernetesNamespace())
   }
 
+  def appendPodNameConf(conf: Map[String, String]): Map[String, String] = {
+    val appName = conf.getOrElse(APP_KEY, "spark")
+    val map = mutable.Map.newBuilder[String, String]
+    if (clusterManager().exists(cm => 
cm.toLowerCase(Locale.ROOT).startsWith("k8s"))) {
+      if (!conf.contains(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)) {
+        val prefix = KubernetesUtils.generateExecutorPodNamePrefix(appName, 
engineRefId)
+        map += (KUBERNETES_EXECUTOR_POD_NAME_PREFIX -> prefix)
+      }
+      if (deployMode().exists(_.toLowerCase(Locale.ROOT) == "cluster")) {
+        if (!conf.contains(KUBERNETES_DRIVER_POD_NAME)) {
+          val name = KubernetesUtils.generateDriverPodName(appName, 
engineRefId)
+          map += (KUBERNETES_DRIVER_POD_NAME -> name)
+        }
+      }
+    }
+    map.result().toMap
+  }
+
   override def clusterManager(): Option[String] = {
     conf.getOption(MASTER_KEY).orElse(defaultsConf.get(MASTER_KEY))
   }
@@ -258,6 +278,8 @@ object SparkProcessBuilder {
   final val DEPLOY_MODE_KEY = "spark.submit.deployMode"
   final val KUBERNETES_CONTEXT_KEY = "spark.kubernetes.context"
   final val KUBERNETES_NAMESPACE_KEY = "spark.kubernetes.namespace"
+  final val KUBERNETES_DRIVER_POD_NAME = "spark.kubernetes.driver.pod.name"
+  final val KUBERNETES_EXECUTOR_POD_NAME_PREFIX = 
"spark.kubernetes.executor.podNamePrefix"
   final val INTERNAL_RESOURCE = "spark-internal"
 
   /**
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/util/KubernetesUtils.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/util/KubernetesUtils.scala
index f9780bb16..9da3408a3 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/util/KubernetesUtils.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/util/KubernetesUtils.scala
@@ -18,6 +18,7 @@
 package org.apache.kyuubi.util
 
 import java.io.File
+import java.util.Locale
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.google.common.base.Charsets
@@ -32,6 +33,10 @@ import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 
 object KubernetesUtils extends Logging {
+  // Kubernetes pod name max length - '-exec-' - Int.MAX_VALUE.length
+  // 253 - 10 - 6
+  final val EXECUTOR_POD_NAME_PREFIX_MAX_LENGTH = 237
+  final val DRIVER_POD_NAME_MAX_LENGTH = 253
 
   def buildKubernetesClient(conf: KyuubiConf): Option[KubernetesClient] = {
     val master = conf.get(KUBERNETES_MASTER)
@@ -114,4 +119,32 @@ object KubernetesUtils extends Logging {
     opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
     opt2.foreach { _ => require(opt1.isEmpty, errMessage) }
   }
+
+  private def getResourceNamePrefix(appName: String, engineRefId: String): 
String = {
+    s"$appName-$engineRefId"
+      .trim
+      .toLowerCase(Locale.ROOT)
+      .replaceAll("[^a-z0-9\\-]", "-")
+      .replaceAll("-+", "-")
+      .replaceAll("^-", "")
+      .replaceAll("^[0-9]", "x")
+  }
+
+  def generateDriverPodName(appName: String, engineRefId: String): String = {
+    val resolvedResourceName = s"kyuubi-${getResourceNamePrefix(appName, 
engineRefId)}-driver"
+    if (resolvedResourceName.length <= DRIVER_POD_NAME_MAX_LENGTH) {
+      resolvedResourceName
+    } else {
+      s"kyuubi-$engineRefId-driver"
+    }
+  }
+
+  def generateExecutorPodNamePrefix(appName: String, engineRefId: String): 
String = {
+    val resolvedResourceName = s"kyuubi-${getResourceNamePrefix(appName, 
engineRefId)}"
+    if (resolvedResourceName.length <= EXECUTOR_POD_NAME_PREFIX_MAX_LENGTH) {
+      resolvedResourceName
+    } else {
+      s"kyuubi-$engineRefId"
+    }
+  }
 }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index cd549c0f3..a4227d26e 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -30,7 +30,7 @@ import org.apache.kyuubi.{KerberizedTestHelper, 
KyuubiSQLException, Utils}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.{ENGINE_LOG_TIMEOUT, 
ENGINE_SPARK_MAIN_RESOURCE}
 import org.apache.kyuubi.engine.ProcBuilder.KYUUBI_ENGINE_LOG_PATH_KEY
-import org.apache.kyuubi.engine.spark.SparkProcessBuilder.CONF
+import org.apache.kyuubi.engine.spark.SparkProcessBuilder._
 import org.apache.kyuubi.ha.HighAvailabilityConf
 import org.apache.kyuubi.ha.client.AuthTypes
 import org.apache.kyuubi.service.ServiceUtils
@@ -305,6 +305,64 @@ class SparkProcessBuilderSuite extends 
KerberizedTestHelper with MockitoSugar {
       b1.toString.contains(
         s"$CONF 
spark.$KYUUBI_ENGINE_LOG_PATH_KEY=${b1.engineLog.getAbsolutePath}"))
   }
+
+  test("[KYUUBI #5165] Test SparkProcessBuilder#appendDriverPodPrefix") {
+    val engineRefId = "kyuubi-test-engine"
+    val appName = "test-app"
+    val processBuilder = new SparkProcessBuilder(
+      "kyuubi",
+      conf.set(MASTER_KEY, "k8s://internal").set(DEPLOY_MODE_KEY, "cluster"),
+      engineRefId)
+    val conf1 = Map(APP_KEY -> "test-app")
+    val driverPodName1 = 
processBuilder.appendPodNameConf(conf1).get(KUBERNETES_DRIVER_POD_NAME)
+    assert(driverPodName1 === Some(s"kyuubi-$appName-$engineRefId-driver"))
+    // respect user specified driver pod name
+    val conf2 = conf1 ++ Map(KUBERNETES_DRIVER_POD_NAME -> 
"kyuubi-test-1-driver")
+    val driverPodName2 = 
processBuilder.appendPodNameConf(conf2).get(KUBERNETES_DRIVER_POD_NAME)
+    assert(driverPodName2 === None)
+    val longAppName = 
"thisisalonglonglonglonglonglonglonglonglonglonglonglong" +
+      "longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
+      "longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
+      "longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
+      "longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
+      "longlonglonglonglonglonglonglonglonglonglonglonglongappname"
+    val conf3 = Map(APP_KEY -> longAppName)
+    val driverPodName3 = 
processBuilder.appendPodNameConf(conf3).get(KUBERNETES_DRIVER_POD_NAME)
+    assert(driverPodName3 === Some(s"kyuubi-$engineRefId-driver"))
+    // scalastyle:off
+    val chineseAppName = "你好_test_任务"
+    // scalastyle:on
+    val conf4 = Map(APP_KEY -> chineseAppName)
+    val driverPodName4 = 
processBuilder.appendPodNameConf(conf4).get(KUBERNETES_DRIVER_POD_NAME)
+    assert(driverPodName4 === Some(s"kyuubi-test-$engineRefId-driver"))
+  }
+
+  test("[KYUUBI #5165] Test SparkProcessBuilder#appendExecutorPodPrefix") {
+    val engineRefId = "kyuubi-test-engine"
+    val appName = "test-app"
+    val processBuilder = new SparkProcessBuilder(
+      "kyuubi",
+      conf.set(MASTER_KEY, "k8s://internal").set(DEPLOY_MODE_KEY, "cluster"),
+      engineRefId)
+    val conf1 = Map(APP_KEY -> "test-app")
+    val execPodNamePrefix1 = processBuilder
+      .appendPodNameConf(conf1).get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+    assert(execPodNamePrefix1 === Some(s"kyuubi-$appName-$engineRefId"))
+    val conf2 = conf1 ++ Map(KUBERNETES_EXECUTOR_POD_NAME_PREFIX -> 
"kyuubi-test")
+    val execPodNamePrefix2 = processBuilder
+      .appendPodNameConf(conf2).get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+    assert(execPodNamePrefix2 === None)
+    val longAppName = 
"thisisalonglonglonglonglonglonglonglonglonglonglonglong" +
+      "longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
+      "longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
+      "longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
+      "longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
+      "longlonglonglonglonglonglonglonglonglonglonglonglongappname"
+    val conf3 = Map(APP_KEY -> longAppName)
+    val execPodNamePrefix3 = processBuilder
+      .appendPodNameConf(conf3).get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+    assert(execPodNamePrefix3 === Some(s"kyuubi-$engineRefId"))
+  }
 }
 
 class FakeSparkProcessBuilder(config: KyuubiConf)

Reply via email to