This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new f05aefb86 [KYUUBI #5165][K8S][SPARK] Build Spark Driver/Executor Pod
Name(Prefix) in process
f05aefb86 is described below
commit f05aefb86643e69c64237ebfb9db208ec509f3b2
Author: zwangsheng <[email protected]>
AuthorDate: Wed Aug 16 21:10:23 2023 +0800
[KYUUBI #5165][K8S][SPARK] Build Spark Driver/Executor Pod Name(Prefix) in
process
### _Why are the changes needed?_
1. Print those pod name/prefix in kyuubi server log
2. Make sure driver pod name in length limit
close #5165
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
Closes #5168 from zwangsheng/KYUUBI#5165.
Closes #5165
290a46860 [Cheng Pan] nit
9c9dd2d38 [Cheng Pan] Revert "Add app Name"
50187cddb [zwangsheng] Add app Name
f704a9839 [Cheng Pan] app key
463904bec [Cheng Pan] revise
33d1ffd9e [Cheng Pan] Apply suggestions from code review
0b7e64da5 [Cheng Pan] Apply suggestions from code review
b3b1e08fd [zwangsheng] Fix Test
742e78461 [zwangsheng] modify long app name length to 266
90b165e95 [zwangsheng] fix test
191447058 [zwangsheng] fix test fail
05698fdb1 [zwangsheng] fix test fail
39637ae68 [zwangsheng] fix comments
23513810f [zwangsheng] fix comments
a56630179 [Cheng Pan] Update
kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
8599a1430 [zwangsheng] fix style
73aec9a03 [zwangsheng] [KYUUBI #5165][K8S][SPARK] Build Spark
Driver/Executor Pod Name(Prefix) in process
Lead-authored-by: zwangsheng <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../engine/spark/SparkBatchProcessBuilder.scala | 5 +-
.../kyuubi/engine/spark/SparkProcessBuilder.scala | 24 ++++++++-
.../org/apache/kyuubi/util/KubernetesUtils.scala | 33 ++++++++++++
.../engine/spark/SparkProcessBuilderSuite.scala | 60 +++++++++++++++++++++-
4 files changed, 119 insertions(+), 3 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
index 7f1be93b5..ef159bb93 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
@@ -51,7 +51,10 @@ class SparkBatchProcessBuilder(
// tag batch application
KyuubiApplicationManager.tagApplication(batchId, "spark",
clusterManager(), batchKyuubiConf)
- (batchKyuubiConf.getAll ++ sparkAppNameConf() ++
engineLogPathConf()).foreach { case (k, v) =>
+ (batchKyuubiConf.getAll ++
+ sparkAppNameConf() ++
+ engineLogPathConf() ++
+ appendPodNameConf(batchConf)).foreach { case (k, v) =>
buffer += CONF
buffer += s"${convertConfigKey(k)}=$v"
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 5998d5d4e..351eddb75 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -21,6 +21,7 @@ import java.io.{File, IOException}
import java.nio.file.Paths
import java.util.Locale
+import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import com.google.common.annotations.VisibleForTesting
@@ -34,6 +35,7 @@ import
org.apache.kyuubi.engine.ProcBuilder.KYUUBI_ENGINE_LOG_PATH_KEY
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.util.KubernetesUtils
import org.apache.kyuubi.util.Validator
class SparkProcessBuilder(
@@ -118,7 +120,7 @@ class SparkProcessBuilder(
allConf = allConf ++ zkAuthKeytabFileConf(allConf)
}
// pass spark engine log path to spark conf
- (allConf ++ engineLogPathConf).foreach { case (k, v) =>
+ (allConf ++ engineLogPathConf ++ appendPodNameConf(allConf)).foreach {
case (k, v) =>
buffer += CONF
buffer += s"${convertConfigKey(k)}=$v"
}
@@ -208,6 +210,24 @@ class SparkProcessBuilder(
kubernetesNamespace())
}
+ def appendPodNameConf(conf: Map[String, String]): Map[String, String] = {
+ val appName = conf.getOrElse(APP_KEY, "spark")
+ val map = mutable.Map.newBuilder[String, String]
+ if (clusterManager().exists(cm =>
cm.toLowerCase(Locale.ROOT).startsWith("k8s"))) {
+ if (!conf.contains(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)) {
+ val prefix = KubernetesUtils.generateExecutorPodNamePrefix(appName,
engineRefId)
+ map += (KUBERNETES_EXECUTOR_POD_NAME_PREFIX -> prefix)
+ }
+ if (deployMode().exists(_.toLowerCase(Locale.ROOT) == "cluster")) {
+ if (!conf.contains(KUBERNETES_DRIVER_POD_NAME)) {
+ val name = KubernetesUtils.generateDriverPodName(appName,
engineRefId)
+ map += (KUBERNETES_DRIVER_POD_NAME -> name)
+ }
+ }
+ }
+ map.result().toMap
+ }
+
override def clusterManager(): Option[String] = {
conf.getOption(MASTER_KEY).orElse(defaultsConf.get(MASTER_KEY))
}
@@ -258,6 +278,8 @@ object SparkProcessBuilder {
final val DEPLOY_MODE_KEY = "spark.submit.deployMode"
final val KUBERNETES_CONTEXT_KEY = "spark.kubernetes.context"
final val KUBERNETES_NAMESPACE_KEY = "spark.kubernetes.namespace"
+ final val KUBERNETES_DRIVER_POD_NAME = "spark.kubernetes.driver.pod.name"
+ final val KUBERNETES_EXECUTOR_POD_NAME_PREFIX =
"spark.kubernetes.executor.podNamePrefix"
final val INTERNAL_RESOURCE = "spark-internal"
/**
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/util/KubernetesUtils.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/util/KubernetesUtils.scala
index f9780bb16..9da3408a3 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/util/KubernetesUtils.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/util/KubernetesUtils.scala
@@ -18,6 +18,7 @@
package org.apache.kyuubi.util
import java.io.File
+import java.util.Locale
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.base.Charsets
@@ -32,6 +33,10 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
object KubernetesUtils extends Logging {
+ // Kubernetes pod name max length - '-exec-' - Int.MAX_VALUE.length
+ // 253 - 10 - 6
+ final val EXECUTOR_POD_NAME_PREFIX_MAX_LENGTH = 237
+ final val DRIVER_POD_NAME_MAX_LENGTH = 253
def buildKubernetesClient(conf: KyuubiConf): Option[KubernetesClient] = {
val master = conf.get(KUBERNETES_MASTER)
@@ -114,4 +119,32 @@ object KubernetesUtils extends Logging {
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
opt2.foreach { _ => require(opt1.isEmpty, errMessage) }
}
+
+ private def getResourceNamePrefix(appName: String, engineRefId: String):
String = {
+ s"$appName-$engineRefId"
+ .trim
+ .toLowerCase(Locale.ROOT)
+ .replaceAll("[^a-z0-9\\-]", "-")
+ .replaceAll("-+", "-")
+ .replaceAll("^-", "")
+ .replaceAll("^[0-9]", "x")
+ }
+
+ def generateDriverPodName(appName: String, engineRefId: String): String = {
+ val resolvedResourceName = s"kyuubi-${getResourceNamePrefix(appName,
engineRefId)}-driver"
+ if (resolvedResourceName.length <= DRIVER_POD_NAME_MAX_LENGTH) {
+ resolvedResourceName
+ } else {
+ s"kyuubi-$engineRefId-driver"
+ }
+ }
+
+ def generateExecutorPodNamePrefix(appName: String, engineRefId: String):
String = {
+ val resolvedResourceName = s"kyuubi-${getResourceNamePrefix(appName,
engineRefId)}"
+ if (resolvedResourceName.length <= EXECUTOR_POD_NAME_PREFIX_MAX_LENGTH) {
+ resolvedResourceName
+ } else {
+ s"kyuubi-$engineRefId"
+ }
+ }
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index cd549c0f3..a4227d26e 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -30,7 +30,7 @@ import org.apache.kyuubi.{KerberizedTestHelper,
KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_LOG_TIMEOUT,
ENGINE_SPARK_MAIN_RESOURCE}
import org.apache.kyuubi.engine.ProcBuilder.KYUUBI_ENGINE_LOG_PATH_KEY
-import org.apache.kyuubi.engine.spark.SparkProcessBuilder.CONF
+import org.apache.kyuubi.engine.spark.SparkProcessBuilder._
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.service.ServiceUtils
@@ -305,6 +305,64 @@ class SparkProcessBuilderSuite extends
KerberizedTestHelper with MockitoSugar {
b1.toString.contains(
s"$CONF
spark.$KYUUBI_ENGINE_LOG_PATH_KEY=${b1.engineLog.getAbsolutePath}"))
}
+
+ test("[KYUUBI #5165] Test SparkProcessBuilder#appendDriverPodPrefix") {
+ val engineRefId = "kyuubi-test-engine"
+ val appName = "test-app"
+ val processBuilder = new SparkProcessBuilder(
+ "kyuubi",
+ conf.set(MASTER_KEY, "k8s://internal").set(DEPLOY_MODE_KEY, "cluster"),
+ engineRefId)
+ val conf1 = Map(APP_KEY -> "test-app")
+ val driverPodName1 =
processBuilder.appendPodNameConf(conf1).get(KUBERNETES_DRIVER_POD_NAME)
+ assert(driverPodName1 === Some(s"kyuubi-$appName-$engineRefId-driver"))
+ // respect user specified driver pod name
+ val conf2 = conf1 ++ Map(KUBERNETES_DRIVER_POD_NAME ->
"kyuubi-test-1-driver")
+ val driverPodName2 =
processBuilder.appendPodNameConf(conf2).get(KUBERNETES_DRIVER_POD_NAME)
+ assert(driverPodName2 === None)
+ val longAppName =
"thisisalonglonglonglonglonglonglonglonglonglonglonglong" +
+ "longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
+ "longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
+ "longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
+ "longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
+ "longlonglonglonglonglonglonglonglonglonglonglonglongappname"
+ val conf3 = Map(APP_KEY -> longAppName)
+ val driverPodName3 =
processBuilder.appendPodNameConf(conf3).get(KUBERNETES_DRIVER_POD_NAME)
+ assert(driverPodName3 === Some(s"kyuubi-$engineRefId-driver"))
+ // scalastyle:off
+ val chineseAppName = "你好_test_任务"
+ // scalastyle:on
+ val conf4 = Map(APP_KEY -> chineseAppName)
+ val driverPodName4 =
processBuilder.appendPodNameConf(conf4).get(KUBERNETES_DRIVER_POD_NAME)
+ assert(driverPodName4 === Some(s"kyuubi-test-$engineRefId-driver"))
+ }
+
+ test("[KYUUBI #5165] Test SparkProcessBuilder#appendExecutorPodPrefix") {
+ val engineRefId = "kyuubi-test-engine"
+ val appName = "test-app"
+ val processBuilder = new SparkProcessBuilder(
+ "kyuubi",
+ conf.set(MASTER_KEY, "k8s://internal").set(DEPLOY_MODE_KEY, "cluster"),
+ engineRefId)
+ val conf1 = Map(APP_KEY -> "test-app")
+ val execPodNamePrefix1 = processBuilder
+ .appendPodNameConf(conf1).get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+ assert(execPodNamePrefix1 === Some(s"kyuubi-$appName-$engineRefId"))
+ val conf2 = conf1 ++ Map(KUBERNETES_EXECUTOR_POD_NAME_PREFIX ->
"kyuubi-test")
+ val execPodNamePrefix2 = processBuilder
+ .appendPodNameConf(conf2).get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+ assert(execPodNamePrefix2 === None)
+ val longAppName =
"thisisalonglonglonglonglonglonglonglonglonglonglonglong" +
+ "longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
+ "longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
+ "longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
+ "longlonglonglonglonglonglonglonglonglonglonglonglonglong" +
+ "longlonglonglonglonglonglonglonglonglonglonglonglongappname"
+ val conf3 = Map(APP_KEY -> longAppName)
+ val execPodNamePrefix3 = processBuilder
+ .appendPodNameConf(conf3).get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+ assert(execPodNamePrefix3 === Some(s"kyuubi-$engineRefId"))
+ }
}
class FakeSparkProcessBuilder(config: KyuubiConf)