This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new c2e27304f [KYUUBI #4843] Support multiple kubernetes contexts and
namespaces
c2e27304f is described below
commit c2e27304feda75aa26b42e8512f5de7483104fae
Author: fwang12 <[email protected]>
AuthorDate: Mon Jun 26 15:52:56 2023 +0800
[KYUUBI #4843] Support multiple kubernetes contexts and namespaces
### _Why are the changes needed?_
Close #4843
Support to submit kyuubi engine/batch to multiple kubernetes contexts and
namespaces.
In this pr, the user can config the kubernetes conf for specified
kubernetes context and namespace likes below.
```
kyuubi.kubernetes.<context>.master.address
kyuubi.kubernetes.<context>.<namespace>.authenticate.oauthTokenFile
```
For example:
```
kyuubi.kubernetes.28.master.address=k8s://master
kyuubi.kubernetes.28.ns1.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/token.ns1
kyuubi.kubernetes.28.ns2.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/token.ns2
```
for k8s context=28, namespace=ns1, its kubernetes config is:
```
kyuubi.kubernetes.master.address=k8s://master
kyuubi.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/token.ns1
```
for k8s context=28, namespace=ns2, its kubernetes config is:
```
kyuubi.kubernetes.master.address=k8s://master
kyuubi.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/token.ns2
```
So that, kyuubi server can build kubernetes client for each context and
namespace.
### _How was this patch tested?_
Existing kubernetes integration testing.
Closes #4984 from turboFei/k8s_client_yaml.
Closes #4843
f8ffaeeb9 [fwang12] nit
d25774288 [fwang12] comments
5ae7c8433 [fwang12] save into request conf
fd6c363db [fwang12] save
ff004a529 [fwang12] procebuilder method
6b9520bfd [fwang12] save
58850387e [fwang12] save
98df67e5f [fwang12] ut
da811697c [fwang12] fix
aa568aaa4 [fwang12] save
89656f463 [fwang12] check init
a0ef6894b [fwang12] code style
00abb6568 [fwang12] default namespace
295512987 [fwang12] k8s context namespace
Authored-by: fwang12 <[email protected]>
Signed-off-by: fwang12 <[email protected]>
---
docs/deployment/settings.md | 2 +
.../test/spark/SparkOnKubernetesTestsSuite.scala | 37 ++++--
.../org/apache/kyuubi/config/KyuubiConf.scala | 44 +++++++
.../org/apache/kyuubi/config/KyuubiConfSuite.scala | 21 ++++
.../kyuubi/engine/ApplicationOperation.scala | 32 ++++-
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 8 +-
.../kyuubi/engine/JpsApplicationOperation.scala | 14 ++-
.../engine/KubernetesApplicationOperation.scala | 133 ++++++++++++++-------
.../kyuubi/engine/KyuubiApplicationManager.scala | 14 +--
.../org/apache/kyuubi/engine/ProcBuilder.scala | 1 +
.../kyuubi/engine/YarnApplicationOperation.scala | 14 ++-
.../kyuubi/engine/flink/FlinkProcessBuilder.scala | 6 +-
.../engine/spark/SparkBatchProcessBuilder.scala | 8 ++
.../kyuubi/engine/spark/SparkProcessBuilder.scala | 47 +++++---
.../kyuubi/operation/BatchJobSubmission.scala | 4 +-
.../kyuubi/server/api/v1/BatchesResource.scala | 12 +-
.../kyuubi/server/metadata/api/Metadata.scala | 11 +-
.../apache/kyuubi/session/KyuubiBatchSession.scala | 8 +-
.../org/apache/kyuubi/WithKyuubiServerOnYarn.scala | 10 +-
.../engine/JpsApplicationOperationSuite.scala | 24 ++--
.../KyuubiOperationPerConnectionSuite.scala | 8 +-
.../kyuubi/server/api/v1/AdminResourceSuite.scala | 30 +++--
.../server/api/v1/BatchesResourceSuite.scala | 7 +-
.../kyuubi/server/rest/client/BatchCliSuite.scala | 3 +-
24 files changed, 358 insertions(+), 140 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 97ca5973d..ca29592e2 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -307,8 +307,10 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.kubernetes.authenticate.oauthToken | <undefined> |
The OAuth token to use when authenticating against the Kubernetes API server.
Note that unlike, the other authentication options, this must be the exact
string value of the token to use for the authentication. | string | 1.7.0 |
| kyuubi.kubernetes.authenticate.oauthTokenFile | <undefined> |
Path to the file containing the OAuth token to use when authenticating against
the Kubernetes API server. Specify this as a path as opposed to a URI (i.e. do
not provide a scheme) | string | 1.7.0 |
| kyuubi.kubernetes.context | <undefined> |
The desired context from your kubernetes config file used to configure the K8s
client for interacting with the cluster.
| string | 1.6.0 |
+| kyuubi.kubernetes.context.allow.list ||
The allowed kubernetes context list, if it is empty, there is no kubernetes
context limitation.
| seq | 1.8.0 |
| kyuubi.kubernetes.master.address | <undefined> |
The internal Kubernetes master (API server) address to be used for kyuubi.
| string | 1.7.0 |
| kyuubi.kubernetes.namespace | default |
The namespace that will be used for running the kyuubi pods and find engines.
| string | 1.7.0 |
+| kyuubi.kubernetes.namespace.allow.list ||
The allowed kubernetes namespace list, if it is empty, there is no kubernetes
namespace limitation.
| seq | 1.8.0 |
| kyuubi.kubernetes.terminatedApplicationRetainPeriod | PT5M |
The period for which the Kyuubi server retains application information after
the application terminates.
| duration | 1.7.1 |
| kyuubi.kubernetes.trust.certificates | false | If
set to true then client can submit to kubernetes cluster only with token
| boolean | 1.7.0 |
diff --git
a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
index 74090bc40..6345a05f1 100644
---
a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
+++
b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
@@ -29,7 +29,7 @@ import org.apache.kyuubi._
import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_HOST
-import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationOperation,
KubernetesApplicationOperation}
+import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo,
ApplicationOperation, KubernetesApplicationOperation}
import org.apache.kyuubi.engine.ApplicationState.{FAILED, NOT_FOUND, RUNNING}
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.kubernetes.test.MiniKube
@@ -44,6 +44,9 @@ abstract class SparkOnKubernetesSuiteBase
MiniKube.getKubernetesClient.getMasterUrl.toString
}
+ protected val appMgrInfo =
+ ApplicationManagerInfo(Some(s"k8s://$apiServerAddress"), Some("minikube"),
None)
+
protected def sparkOnK8sConf: KyuubiConf = {
// TODO Support more Spark version
// Spark official docker image: https://hub.docker.com/r/apache/spark/tags
@@ -150,20 +153,28 @@ class KyuubiOperationKubernetesClusterClientModeSuite
batchRequest)
eventually(timeout(3.minutes), interval(50.milliseconds)) {
- val state =
k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
+ val state = k8sOperation.getApplicationInfoByTag(
+ appMgrInfo,
+ sessionHandle.identifier.toString)
assert(state.id != null)
assert(state.name != null)
assert(state.state == RUNNING)
}
- val killResponse =
k8sOperation.killApplicationByTag(sessionHandle.identifier.toString)
+ val killResponse = k8sOperation.killApplicationByTag(
+ appMgrInfo,
+ sessionHandle.identifier.toString)
assert(killResponse._1)
assert(killResponse._2 startsWith "Succeeded to terminate:")
- val appInfo =
k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
+ val appInfo = k8sOperation.getApplicationInfoByTag(
+ appMgrInfo,
+ sessionHandle.identifier.toString)
assert(appInfo == ApplicationInfo(null, null, NOT_FOUND))
- val failKillResponse =
k8sOperation.killApplicationByTag(sessionHandle.identifier.toString)
+ val failKillResponse = k8sOperation.killApplicationByTag(
+ appMgrInfo,
+ sessionHandle.identifier.toString)
assert(!failKillResponse._1)
assert(failKillResponse._2 === ApplicationOperation.NOT_FOUND)
}
@@ -212,24 +223,32 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
// wait for driver pod start
eventually(timeout(3.minutes), interval(5.second)) {
// trigger k8sOperation init here
- val appInfo =
k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
+ val appInfo = k8sOperation.getApplicationInfoByTag(
+ appMgrInfo,
+ sessionHandle.identifier.toString)
assert(appInfo.state == RUNNING)
assert(appInfo.name.startsWith(driverPodNamePrefix))
}
- val killResponse =
k8sOperation.killApplicationByTag(sessionHandle.identifier.toString)
+ val killResponse = k8sOperation.killApplicationByTag(
+ appMgrInfo,
+ sessionHandle.identifier.toString)
assert(killResponse._1)
assert(killResponse._2 endsWith "is completed")
assert(killResponse._2 contains sessionHandle.identifier.toString)
eventually(timeout(3.minutes), interval(50.milliseconds)) {
- val appInfo =
k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
+ val appInfo = k8sOperation.getApplicationInfoByTag(
+ appMgrInfo,
+ sessionHandle.identifier.toString)
// We may kill engine start but not ready
// An EOF Error occurred when the driver was starting
assert(appInfo.state == FAILED || appInfo.state == NOT_FOUND)
}
- val failKillResponse =
k8sOperation.killApplicationByTag(sessionHandle.identifier.toString)
+ val failKillResponse = k8sOperation.killApplicationByTag(
+ appMgrInfo,
+ sessionHandle.identifier.toString)
assert(!failKillResponse._1)
}
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index d51a5f5c3..669f72da0 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -134,6 +134,31 @@ case class KyuubiConf(loadSysDefault: Boolean = true)
extends Logging {
getAllWithPrefix(s"$KYUUBI_BATCH_CONF_PREFIX.$normalizedBatchType", "")
}
+ /** Get the kubernetes conf for specified kubernetes context and namespace.
*/
+ def getKubernetesConf(context: Option[String], namespace: Option[String]):
KyuubiConf = {
+ val conf = this.clone
+ context.foreach { c =>
+ val contextConf =
+ getAllWithPrefix(s"$KYUUBI_KUBERNETES_CONF_PREFIX.$c", "").map { case
(suffix, value) =>
+ s"$KYUUBI_KUBERNETES_CONF_PREFIX.$suffix" -> value
+ }
+ val contextNamespaceConf = namespace.map { ns =>
+ getAllWithPrefix(s"$KYUUBI_KUBERNETES_CONF_PREFIX.$c.$ns", "").map {
+ case (suffix, value) =>
+ s"$KYUUBI_KUBERNETES_CONF_PREFIX.$suffix" -> value
+ }
+ }.getOrElse(Map.empty)
+
+ (contextConf ++ contextNamespaceConf).map { case (key, value) =>
+ conf.set(key, value)
+ }
+ conf.set(KUBERNETES_CONTEXT, c)
+ namespace.foreach(ns => conf.set(KUBERNETES_NAMESPACE, ns))
+ conf
+ }
+ conf
+ }
+
/**
* Retrieve key-value pairs from [[KyuubiConf]] starting with
`dropped.remainder`, and put them to
* the result map with the `dropped` of key being dropped.
@@ -207,6 +232,7 @@ object KyuubiConf {
final val KYUUBI_HOME = "KYUUBI_HOME"
final val KYUUBI_ENGINE_ENV_PREFIX = "kyuubi.engineEnv"
final val KYUUBI_BATCH_CONF_PREFIX = "kyuubi.batchConf"
+ final val KYUUBI_KUBERNETES_CONF_PREFIX = "kyuubi.kubernetes"
final val USER_DEFAULTS_CONF_QUOTE = "___"
private[this] val kyuubiConfEntriesUpdateLock = new Object
@@ -1106,6 +1132,15 @@ object KyuubiConf {
.stringConf
.createOptional
+ val KUBERNETES_CONTEXT_ALLOW_LIST: ConfigEntry[Seq[String]] =
+ buildConf("kyuubi.kubernetes.context.allow.list")
+ .doc("The allowed kubernetes context list, if it is empty," +
+ " there is no kubernetes context limitation.")
+ .version("1.8.0")
+ .stringConf
+ .toSequence()
+ .createWithDefault(Nil)
+
val KUBERNETES_NAMESPACE: ConfigEntry[String] =
buildConf("kyuubi.kubernetes.namespace")
.doc("The namespace that will be used for running the kyuubi pods and
find engines.")
@@ -1113,6 +1148,15 @@ object KyuubiConf {
.stringConf
.createWithDefault("default")
+ val KUBERNETES_NAMESPACE_ALLOW_LIST: ConfigEntry[Seq[String]] =
+ buildConf("kyuubi.kubernetes.namespace.allow.list")
+ .doc("The allowed kubernetes namespace list, if it is empty," +
+ " there is no kubernetes namespace limitation.")
+ .version("1.8.0")
+ .stringConf
+ .toSequence()
+ .createWithDefault(Nil)
+
val KUBERNETES_MASTER: OptionalConfigEntry[String] =
buildConf("kyuubi.kubernetes.master.address")
.doc("The internal Kubernetes master (API server) address to be used for
kyuubi.")
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala
index f05e15d8a..39e68f0ec 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala
@@ -200,4 +200,25 @@ class KyuubiConfSuite extends KyuubiFunSuite {
assertResult(kSeq(1))("kyuubi.efg")
assertResult(kSeq(2))("kyuubi.xyz")
}
+
+ test("KYUUBI #4843 - Support multiple kubernetes contexts and namespaces") {
+ val kyuubiConf = KyuubiConf(false)
+ kyuubiConf.set("kyuubi.kubernetes.28.master.address", "k8s://master")
+ kyuubiConf.set(
+ "kyuubi.kubernetes.28.ns1.authenticate.oauthTokenFile",
+ "/var/run/secrets/kubernetes.io/token.ns1")
+ kyuubiConf.set(
+ "kyuubi.kubernetes.28.ns2.authenticate.oauthTokenFile",
+ "/var/run/secrets/kubernetes.io/token.ns2")
+
+ val kubernetesConf1 = kyuubiConf.getKubernetesConf(Some("28"), Some("ns1"))
+ assert(kubernetesConf1.get(KyuubiConf.KUBERNETES_MASTER) ==
Some("k8s://master"))
+
assert(kubernetesConf1.get(KyuubiConf.KUBERNETES_AUTHENTICATE_OAUTH_TOKEN_FILE)
==
+ Some("/var/run/secrets/kubernetes.io/token.ns1"))
+
+ val kubernetesConf2 = kyuubiConf.getKubernetesConf(Some("28"), Some("ns2"))
+ assert(kubernetesConf2.get(KyuubiConf.KUBERNETES_MASTER) ==
Some("k8s://master"))
+
assert(kubernetesConf2.get(KyuubiConf.KUBERNETES_AUTHENTICATE_OAUTH_TOKEN_FILE)
==
+ Some("/var/run/secrets/kubernetes.io/token.ns2"))
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
index a2b3d0f76..2acce39cc 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
@@ -35,13 +35,14 @@ trait ApplicationOperation {
/**
* Called before other method to do a quick skip
*
- * @param clusterManager the underlying cluster manager or just local
instance
+ * @param appMgrInfo the application manager information
*/
- def isSupported(clusterManager: Option[String]): Boolean
+ def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean
/**
* Kill the app/engine by the unique application tag
*
+ * @param appMgrInfo the application manager information
* @param tag the unique application tag for engine instance.
* For example,
* if the Hadoop Yarn is used, for spark applications,
@@ -50,16 +51,20 @@ trait ApplicationOperation {
*
* @note For implementations, please suppress exceptions and always return
KillResponse
*/
- def killApplicationByTag(tag: String): KillResponse
+ def killApplicationByTag(appMgrInfo: ApplicationManagerInfo, tag: String):
KillResponse
/**
* Get the engine/application status by the unique application tag
*
+ * @param appMgrInfo the application manager information
* @param tag the unique application tag for engine instance.
* @param submitTime engine submit to resourceManager time
* @return [[ApplicationInfo]]
*/
- def getApplicationInfoByTag(tag: String, submitTime: Option[Long] = None):
ApplicationInfo
+ def getApplicationInfoByTag(
+ appMgrInfo: ApplicationManagerInfo,
+ tag: String,
+ submitTime: Option[Long] = None): ApplicationInfo
}
object ApplicationState extends Enumeration {
@@ -108,3 +113,22 @@ object ApplicationInfo {
object ApplicationOperation {
val NOT_FOUND = "APPLICATION_NOT_FOUND"
}
+
+case class KubernetesInfo(context: Option[String] = None, namespace:
Option[String] = None)
+
+case class ApplicationManagerInfo(
+ resourceManager: Option[String],
+ kubernetesInfo: KubernetesInfo = KubernetesInfo())
+
+object ApplicationManagerInfo {
+ final val DEFAULT_KUBERNETES_NAMESPACE = "default"
+
+ def apply(
+ resourceManager: Option[String],
+ kubernetesContext: Option[String],
+ kubernetesNamespace: Option[String]): ApplicationManagerInfo = {
+ new ApplicationManagerInfo(
+ resourceManager,
+ KubernetesInfo(kubernetesContext, kubernetesNamespace))
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 227cdd6c8..387758714 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -223,7 +223,7 @@ private[kyuubi] class EngineRef(
}
if (started + timeout <= System.currentTimeMillis()) {
- val killMessage =
engineManager.killApplication(builder.clusterManager(), engineRefId)
+ val killMessage =
engineManager.killApplication(builder.appMgrInfo(), engineRefId)
process.destroyForcibly()
MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT,
appUser)))
throw KyuubiSQLException(
@@ -242,7 +242,7 @@ private[kyuubi] class EngineRef(
}
val applicationInfo = engineMgr.getApplicationInfo(
- builder.clusterManager(),
+ builder.appMgrInfo(),
engineRefId,
Some(started))
@@ -291,9 +291,9 @@ private[kyuubi] class EngineRef(
def close(): Unit = {
if (shareLevel == CONNECTION && builder != null) {
try {
- val clusterManager = builder.clusterManager()
+ val appMgrInfo = builder.appMgrInfo()
builder.close(true)
- engineManager.killApplication(clusterManager, engineRefId)
+ engineManager.killApplication(appMgrInfo, engineRefId)
} catch {
case e: Exception =>
warn(s"Error closing engine builder, engineRefId: $engineRefId", e)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
index ce2e05461..64dacbb64 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
@@ -41,8 +41,9 @@ class JpsApplicationOperation extends ApplicationOperation {
}
}
- override def isSupported(clusterManager: Option[String]): Boolean = {
- runner != null && (clusterManager.isEmpty || clusterManager.get == "local")
+ override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {
+ runner != null &&
+ (appMgrInfo.resourceManager.isEmpty || appMgrInfo.resourceManager.get ==
"local")
}
private def getEngine(tag: String): Option[String] = {
@@ -80,11 +81,16 @@ class JpsApplicationOperation extends ApplicationOperation {
}
}
- override def killApplicationByTag(tag: String): KillResponse = {
+ override def killApplicationByTag(
+ appMgrInfo: ApplicationManagerInfo,
+ tag: String): KillResponse = {
killJpsApplicationByTag(tag, true)
}
- override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]):
ApplicationInfo = {
+ override def getApplicationInfoByTag(
+ appMgrInfo: ApplicationManagerInfo,
+ tag: String,
+ submitTime: Option[Long]): ApplicationInfo = {
val commandOption = getEngine(tag)
if (commandOption.nonEmpty) {
val idAndCmd = commandOption.get
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index 0bd3127cb..d6dfba2fe 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -20,12 +20,14 @@ package org.apache.kyuubi.engine
import java.util.Locale
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import scala.collection.JavaConverters._
+
import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification}
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.informers.{ResourceEventHandler,
SharedIndexInformer}
-import org.apache.kyuubi.{Logging, Utils}
+import org.apache.kyuubi.{KyuubiException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.ApplicationState.{isTerminated,
ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
import
org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState,
LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL}
@@ -33,10 +35,16 @@ import org.apache.kyuubi.util.KubernetesUtils
class KubernetesApplicationOperation extends ApplicationOperation with Logging
{
- @volatile
- private var kubernetesClient: KubernetesClient = _
- private var enginePodInformer: SharedIndexInformer[Pod] = _
+ private val kubernetesClients: ConcurrentHashMap[KubernetesInfo,
KubernetesClient] =
+ new ConcurrentHashMap[KubernetesInfo, KubernetesClient]
+ private val enginePodInformers: ConcurrentHashMap[KubernetesInfo,
SharedIndexInformer[Pod]] =
+ new ConcurrentHashMap[KubernetesInfo, SharedIndexInformer[Pod]]
+
+ private var allowedContexts: Seq[String] = Seq.empty
+ private var allowedNamespaces: Seq[String] = Seq.empty
+
private var submitTimeout: Long = _
+ private var kyuubiConf: KyuubiConf = _
// key is kyuubi_unique_key
private val appInfoStore: ConcurrentHashMap[String, ApplicationInfo] =
@@ -44,45 +52,74 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
// key is kyuubi_unique_key
private var cleanupTerminatedAppInfoTrigger: Cache[String, ApplicationState]
= _
- override def initialize(conf: KyuubiConf): Unit = {
- info("Start initializing Kubernetes Client.")
- kubernetesClient = KubernetesUtils.buildKubernetesClient(conf) match {
+ private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo):
KubernetesClient = {
+ val context = kubernetesInfo.context
+ val namespace = kubernetesInfo.namespace
+
+ if (allowedContexts.nonEmpty && !allowedContexts.contains(context)) {
+ throw new KyuubiException(
+ s"Kubernetes context $context is not in the allowed
list[$allowedContexts]")
+ }
+
+ if (allowedNamespaces.nonEmpty && !allowedNamespaces.contains(namespace)) {
+ throw new KyuubiException(
+ s"Kubernetes namespace $namespace is not in the allowed
list[$allowedNamespaces]")
+ }
+
+ kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo =>
buildKubernetesClient(kInfo))
+ }
+
+ private def buildKubernetesClient(kubernetesInfo: KubernetesInfo):
KubernetesClient = {
+ val kubernetesConf =
+ kyuubiConf.getKubernetesConf(kubernetesInfo.context,
kubernetesInfo.namespace)
+ KubernetesUtils.buildKubernetesClient(kubernetesConf) match {
case Some(client) =>
- info(s"Initialized Kubernetes Client connect to:
${client.getMasterUrl}")
- submitTimeout = conf.get(KyuubiConf.ENGINE_KUBERNETES_SUBMIT_TIMEOUT)
- // Disable resync, see
https://github.com/fabric8io/kubernetes-client/discussions/5015
- enginePodInformer = client.pods()
+ info(s"[$kubernetesInfo] Initialized Kubernetes Client connect to:
${client.getMasterUrl}")
+ val enginePodInformer = client.pods()
.withLabel(LABEL_KYUUBI_UNIQUE_KEY)
.inform(new SparkEnginePodEventHandler)
- info("Start Kubernetes Client Informer.")
- // Defer cleaning terminated application information
- val retainPeriod =
conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD)
- cleanupTerminatedAppInfoTrigger = CacheBuilder.newBuilder()
- .expireAfterWrite(retainPeriod, TimeUnit.MILLISECONDS)
- .removalListener((notification: RemovalNotification[String,
ApplicationState]) => {
- Option(appInfoStore.remove(notification.getKey)).foreach { removed
=>
- info(s"Remove terminated application ${removed.id} with " +
- s"tag ${notification.getKey} and state ${removed.state}")
- }
- })
- .build()
+ info(s"[$kubernetesInfo] Start Kubernetes Client Informer.")
+ enginePodInformers.put(kubernetesInfo, enginePodInformer)
client
- case None =>
- warn("Fail to init Kubernetes Client for Kubernetes Application
Operation")
- null
+
+ case None => throw new KyuubiException(s"Fail to build Kubernetes client
for $kubernetesInfo")
}
}
- override def isSupported(clusterManager: Option[String]): Boolean = {
+ override def initialize(conf: KyuubiConf): Unit = {
+ kyuubiConf = conf
+ info("Start initializing Kubernetes application operation.")
+ submitTimeout = conf.get(KyuubiConf.ENGINE_KUBERNETES_SUBMIT_TIMEOUT)
+ allowedContexts = conf.get(KyuubiConf.KUBERNETES_CONTEXT_ALLOW_LIST)
+ allowedNamespaces = conf.get(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST)
+ // Defer cleaning terminated application information
+ val retainPeriod =
conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD)
+ cleanupTerminatedAppInfoTrigger = CacheBuilder.newBuilder()
+ .expireAfterWrite(retainPeriod, TimeUnit.MILLISECONDS)
+ .removalListener((notification: RemovalNotification[String,
ApplicationState]) => {
+ Option(appInfoStore.remove(notification.getKey)).foreach { removed =>
+ info(s"Remove terminated application ${removed.id} with " +
+ s"tag ${notification.getKey} and state ${removed.state}")
+ }
+ })
+ .build()
+ }
+
+ override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {
// TODO add deploy mode to check whether is supported
- kubernetesClient != null &&
clusterManager.exists(_.toLowerCase(Locale.ROOT).startsWith("k8s"))
+ kyuubiConf != null &&
+
appMgrInfo.resourceManager.exists(_.toLowerCase(Locale.ROOT).startsWith("k8s"))
}
- override def killApplicationByTag(tag: String): KillResponse = {
- if (kubernetesClient == null) {
+ override def killApplicationByTag(
+ appMgrInfo: ApplicationManagerInfo,
+ tag: String): KillResponse = {
+ if (kyuubiConf == null) {
throw new IllegalStateException("Methods initialize and isSupported must
be called ahead")
}
- debug(s"Deleting application info from Kubernetes cluster by $tag tag")
+ val kubernetesInfo = appMgrInfo.kubernetesInfo
+ val kubernetesClient = getOrCreateKubernetesClient(kubernetesInfo)
+ debug(s"[$kubernetesInfo] Deleting application info from Kubernetes
cluster by $tag tag")
try {
val info = appInfoStore.getOrDefault(tag, ApplicationInfo.NOT_FOUND)
debug(s"Application info[tag: $tag] is in ${info.state}")
@@ -90,24 +127,32 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
case NOT_FOUND | FAILED | UNKNOWN =>
(
false,
- s"Target application[tag: $tag] is in ${info.state} status")
+ s"[$kubernetesInfo] Target application[tag: $tag] is in
${info.state} status")
case _ =>
(
!kubernetesClient.pods.withName(info.name).delete().isEmpty,
- s"Operation of deleted application[appId: ${info.id} ,tag: $tag]
is completed")
+ s"[$kubernetesInfo] Operation of deleted" +
+ s" application[appId: ${info.id} ,tag: $tag] is completed")
}
} catch {
case e: Exception =>
- (false, s"Failed to terminate application with $tag, due to
${e.getMessage}")
+ (
+ false,
+ s"[$kubernetesInfo] Failed to terminate application with $tag, due
to ${e.getMessage}")
}
}
- override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]):
ApplicationInfo = {
- if (kubernetesClient == null) {
+ override def getApplicationInfoByTag(
+ appMgrInfo: ApplicationManagerInfo,
+ tag: String,
+ submitTime: Option[Long]): ApplicationInfo = {
+ if (kyuubiConf == null) {
throw new IllegalStateException("Methods initialize and isSupported must
be called ahead")
}
debug(s"Getting application info from Kubernetes cluster by $tag tag")
try {
+ // need to initialize the kubernetes client if not exists
+ getOrCreateKubernetesClient(appMgrInfo.kubernetesInfo)
val appInfo = appInfoStore.getOrDefault(tag, ApplicationInfo.NOT_FOUND)
(appInfo.state, submitTime) match {
// Kyuubi should wait second if pod is not be created
@@ -136,19 +181,15 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
}
override def stop(): Unit = {
- Utils.tryLogNonFatalError {
- if (enginePodInformer != null) {
- enginePodInformer.stop()
- enginePodInformer = null
- }
+ enginePodInformers.asScala.foreach { case (_, informer) =>
+ Utils.tryLogNonFatalError(informer.stop())
}
+ enginePodInformers.clear()
- Utils.tryLogNonFatalError {
- if (kubernetesClient != null) {
- kubernetesClient.close()
- kubernetesClient = null
- }
+ kubernetesClients.asScala.foreach { case (_, client) =>
+ Utils.tryLogNonFatalError(client.close())
}
+ kubernetesClients.clear()
if (cleanupTerminatedAppInfoTrigger != null) {
cleanupTerminatedAppInfoTrigger.cleanUp()
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index ac7225dd8..4e121d297 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -60,11 +60,11 @@ class KyuubiApplicationManager extends
AbstractService("KyuubiApplicationManager
super.stop()
}
- def killApplication(resourceManager: Option[String], tag: String):
KillResponse = {
+ def killApplication(appMgrInfo: ApplicationManagerInfo, tag: String):
KillResponse = {
var (killed, lastMessage): KillResponse = (false, null)
for (operation <- operations if !killed) {
- if (operation.isSupported(resourceManager)) {
- val (k, m) = operation.killApplicationByTag(tag)
+ if (operation.isSupported(appMgrInfo)) {
+ val (k, m) = operation.killApplicationByTag(appMgrInfo, tag)
killed = k
lastMessage = m
}
@@ -73,7 +73,7 @@ class KyuubiApplicationManager extends
AbstractService("KyuubiApplicationManager
val finalMessage =
if (lastMessage == null) {
s"No ${classOf[ApplicationOperation]} Service found in ServiceLoader" +
- s" for $resourceManager"
+ s" for $appMgrInfo"
} else {
lastMessage
}
@@ -81,12 +81,12 @@ class KyuubiApplicationManager extends
AbstractService("KyuubiApplicationManager
}
def getApplicationInfo(
- clusterManager: Option[String],
+ appMgrInfo: ApplicationManagerInfo,
tag: String,
submitTime: Option[Long] = None): Option[ApplicationInfo] = {
- val operation = operations.find(_.isSupported(clusterManager))
+ val operation = operations.find(_.isSupported(appMgrInfo))
operation match {
- case Some(op) => Some(op.getApplicationInfoByTag(tag, submitTime))
+ case Some(op) => Some(op.getApplicationInfoByTag(appMgrInfo, tag,
submitTime))
case None => None
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 4c7330b4d..d30e72674 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -341,6 +341,7 @@ trait ProcBuilder {
def clusterManager(): Option[String] = None
+ def appMgrInfo(): ApplicationManagerInfo = ApplicationManagerInfo(None)
}
object ProcBuilder extends Logging {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
index 1f06484fc..d87fc406a 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
@@ -47,11 +47,14 @@ class YarnApplicationOperation extends ApplicationOperation
with Logging {
info(s"Successfully initialized yarn client: ${c.getServiceState}")
}
- override def isSupported(clusterManager: Option[String]): Boolean = {
- yarnClient != null &&
clusterManager.exists(_.toLowerCase(Locale.ROOT).startsWith("yarn"))
+ override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {
+ yarnClient != null && appMgrInfo.resourceManager.exists(
+ _.toLowerCase(Locale.ROOT).startsWith("yarn"))
}
- override def killApplicationByTag(tag: String): KillResponse = {
+ override def killApplicationByTag(
+ appMgrInfo: ApplicationManagerInfo,
+ tag: String): KillResponse = {
if (yarnClient != null) {
try {
val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
@@ -79,7 +82,10 @@ class YarnApplicationOperation extends ApplicationOperation
with Logging {
}
}
- override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]):
ApplicationInfo = {
+ override def getApplicationInfoByTag(
+ appMgrInfo: ApplicationManagerInfo,
+ tag: String,
+ submitTime: Option[Long]): ApplicationInfo = {
if (yarnClient != null) {
debug(s"Getting application info from Yarn cluster by $tag tag")
val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index d8d46e427..3da8d1b1a 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -29,7 +29,7 @@ import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
-import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder}
+import org.apache.kyuubi.engine.{ApplicationManagerInfo,
KyuubiApplicationManager, ProcBuilder}
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._
import org.apache.kyuubi.operation.log.OperationLog
@@ -73,6 +73,10 @@ class FlinkProcessBuilder(
}
}
+ override def appMgrInfo(): ApplicationManagerInfo = {
+ ApplicationManagerInfo(clusterManager())
+ }
+
override protected val commands: Array[String] = {
KyuubiApplicationManager.tagApplication(engineRefId, shortName,
clusterManager(), conf)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
index 4a613278d..0d20068de 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala
@@ -77,4 +77,12 @@ class SparkBatchProcessBuilder(
override def clusterManager(): Option[String] = {
batchConf.get(MASTER_KEY).orElse(super.clusterManager())
}
+
+ override def kubernetesContext(): Option[String] = {
+ batchConf.get(KUBERNETES_CONTEXT_KEY).orElse(super.kubernetesContext())
+ }
+
+ override def kubernetesNamespace(): Option[String] = {
+ batchConf.get(KUBERNETES_NAMESPACE_KEY).orElse(super.kubernetesNamespace())
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index b74eab77d..6110c0246 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder}
+import org.apache.kyuubi.engine.{ApplicationManagerInfo,
KyuubiApplicationManager, ProcBuilder}
import
org.apache.kyuubi.engine.KubernetesApplicationOperation.{KUBERNETES_SERVICE_HOST,
KUBERNETES_SERVICE_PORT}
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.AuthTypes
@@ -183,26 +183,39 @@ class SparkProcessBuilder(
override def shortName: String = "spark"
- protected lazy val defaultMaster: Option[String] = {
+ protected lazy val defaultsConf: Map[String, String] = {
val confDir = env.getOrElse(SPARK_CONF_DIR,
s"$sparkHome${File.separator}conf")
- val defaults =
- try {
- val confFile = new
File(s"$confDir${File.separator}$SPARK_CONF_FILE_NAME")
- if (confFile.exists()) {
- Utils.getPropertiesFromFile(Some(confFile))
- } else {
- Map.empty[String, String]
- }
- } catch {
- case _: Exception =>
- warn(s"Failed to load spark configurations from $confDir")
- Map.empty[String, String]
+ try {
+ val confFile = new
File(s"$confDir${File.separator}$SPARK_CONF_FILE_NAME")
+ if (confFile.exists()) {
+ Utils.getPropertiesFromFile(Some(confFile))
+ } else {
+ Map.empty[String, String]
}
- defaults.get(MASTER_KEY)
+ } catch {
+ case _: Exception =>
+ warn(s"Failed to load spark configurations from $confDir")
+ Map.empty[String, String]
+ }
+ }
+
+ override def appMgrInfo(): ApplicationManagerInfo = {
+ ApplicationManagerInfo(
+ clusterManager(),
+ kubernetesContext(),
+ kubernetesNamespace())
}
override def clusterManager(): Option[String] = {
- conf.getOption(MASTER_KEY).orElse(defaultMaster)
+ conf.getOption(MASTER_KEY).orElse(defaultsConf.get(MASTER_KEY))
+ }
+
+ def kubernetesContext(): Option[String] = {
+
conf.getOption(KUBERNETES_CONTEXT_KEY).orElse(defaultsConf.get(KUBERNETES_CONTEXT_KEY))
+ }
+
+ def kubernetesNamespace(): Option[String] = {
+
conf.getOption(KUBERNETES_NAMESPACE_KEY).orElse(defaultsConf.get(KUBERNETES_NAMESPACE_KEY))
}
override def validateConf: Unit = Validator.validateConf(conf)
@@ -224,6 +237,8 @@ object SparkProcessBuilder {
final val APP_KEY = "spark.app.name"
final val TAG_KEY = "spark.yarn.tags"
final val MASTER_KEY = "spark.master"
+ final val KUBERNETES_CONTEXT_KEY = "spark.kubernetes.context"
+ final val KUBERNETES_NAMESPACE_KEY = "spark.kubernetes.namespace"
final val INTERNAL_RESOURCE = "spark-internal"
/**
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index ab2cfa302..82562541d 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -107,7 +107,7 @@ class BatchJobSubmission(
}
val applicationInfo =
applicationManager.getApplicationInfo(
- builder.clusterManager(),
+ builder.appMgrInfo(),
batchId,
Some(_submitTime))
applicationId(applicationInfo).foreach { _ =>
@@ -123,7 +123,7 @@ class BatchJobSubmission(
}
private[kyuubi] def killBatchApplication(): KillResponse = {
- applicationManager.killApplication(builder.clusterManager(), batchId)
+ applicationManager.killApplication(builder.appMgrInfo(), batchId)
}
private val applicationCheckInterval =
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index cb0c63be0..ba043f071 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -40,7 +40,7 @@ import org.apache.kyuubi.client.exception.KyuubiRestException
import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys._
-import org.apache.kyuubi.engine.{ApplicationInfo, KillResponse,
KyuubiApplicationManager}
+import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo,
KillResponse, KyuubiApplicationManager}
import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation,
OperationState}
import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.server.api.v1.BatchesResource._
@@ -289,7 +289,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
case e: KyuubiRestException =>
error(s"Error redirecting get batch[$batchId] to
${metadata.kyuubiInstance}", e)
val batchAppStatus =
sessionManager.applicationManager.getApplicationInfo(
- metadata.clusterManager,
+ metadata.appMgrInfo,
batchId,
// prevent that the batch be marked as terminated if
application state is NOT_FOUND
Some(metadata.engineOpenTime).filter(_ >
0).orElse(Some(System.currentTimeMillis)))
@@ -407,9 +407,9 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
}
}
- def forceKill(clusterManager: Option[String], batchId: String):
KillResponse = {
+ def forceKill(appMgrInfo: ApplicationManagerInfo, batchId: String):
KillResponse = {
val (killed, message) = sessionManager.applicationManager
- .killApplication(clusterManager, batchId)
+ .killApplication(appMgrInfo, batchId)
info(s"Mark batch[$batchId] closed by ${fe.connectionUrl}")
sessionManager.updateMetadata(Metadata(identifier = batchId,
peerInstanceClosed = true))
(killed, message)
@@ -436,12 +436,12 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
} catch {
case e: KyuubiRestException =>
error(s"Error redirecting delete batch[$batchId] to
${metadata.kyuubiInstance}", e)
- val (killed, msg) = forceKill(metadata.clusterManager, batchId)
+ val (killed, msg) = forceKill(metadata.appMgrInfo, batchId)
new CloseBatchResponse(killed, if (killed) msg else
Utils.stringifyException(e))
}
} else { // should not happen, but handle this for safe
warn(s"Something wrong on deleting batch[$batchId], try forcibly
killing application")
- val (killed, msg) = forceKill(metadata.clusterManager, batchId)
+ val (killed, msg) = forceKill(metadata.appMgrInfo, batchId)
new CloseBatchResponse(killed, msg)
}
}.getOrElse {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
index 949e88abd..12759f8cc 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.server.metadata.api
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.ApplicationManagerInfo
import org.apache.kyuubi.session.SessionType.SessionType
/**
@@ -73,4 +75,11 @@ case class Metadata(
engineState: String = null,
engineError: Option[String] = None,
endTime: Long = 0L,
- peerInstanceClosed: Boolean = false)
+ peerInstanceClosed: Boolean = false) {
+ def appMgrInfo: ApplicationManagerInfo = {
+ ApplicationManagerInfo(
+ clusterManager,
+ requestConf.get(KyuubiConf.KUBERNETES_CONTEXT.key),
+ requestConf.get(KyuubiConf.KUBERNETES_NAMESPACE.key))
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
index ded4c8bf4..014bbced3 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
@@ -143,6 +143,12 @@ class KyuubiBatchSession(
traceMetricsOnOpen()
if (recoveryMetadata.isEmpty) {
+ val appMgrInfo = batchJobSubmissionOp.builder.appMgrInfo()
+ val kubernetesInfo = appMgrInfo.kubernetesInfo.context.map { context =>
+ Map(KyuubiConf.KUBERNETES_CONTEXT.key -> context)
+ }.getOrElse(Map.empty) ++ appMgrInfo.kubernetesInfo.namespace.map {
namespace =>
+ Map(KyuubiConf.KUBERNETES_NAMESPACE.key -> namespace)
+ }.getOrElse(Map.empty)
val metaData = Metadata(
identifier = handle.identifier.toString,
sessionType = sessionType,
@@ -154,7 +160,7 @@ class KyuubiBatchSession(
resource = resource,
className = className,
requestName = name.orNull,
- requestConf = optimizedConf,
+ requestConf = optimizedConf ++ kubernetesInfo, // save the kubernetes
info into request conf
requestArgs = batchArgs,
createTime = createTime,
engineType = batchType,
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
index 2ed413dcb..68d95dc80 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
@@ -26,7 +26,7 @@ import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols.FrontendProtocol
-import org.apache.kyuubi.engine.{ApplicationState, YarnApplicationOperation}
+import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState,
YarnApplicationOperation}
import org.apache.kyuubi.engine.ApplicationState._
import org.apache.kyuubi.operation.{FetchOrientation, HiveJDBCTestHelper,
OperationState}
import org.apache.kyuubi.operation.OperationState.ERROR
@@ -134,11 +134,15 @@ class KyuubiOperationYarnClusterSuite extends
WithKyuubiServerOnYarn with HiveJD
assert(metadata.map(_.engineId).get.startsWith("application_"))
}
- val killResponse =
yarnOperation.killApplicationByTag(sessionHandle.identifier.toString)
+ val appMgrInfo = ApplicationManagerInfo(Some("yarn"))
+
+ val killResponse =
+ yarnOperation.killApplicationByTag(appMgrInfo,
sessionHandle.identifier.toString)
assert(killResponse._1)
assert(killResponse._2 startsWith "Succeeded to terminate:")
- val appInfo =
yarnOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
+ val appInfo =
+ yarnOperation.getApplicationInfoByTag(appMgrInfo,
sessionHandle.identifier.toString)
assert(appInfo.state === KILLED)
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
index a6e00bbaf..a0914afcf 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
@@ -38,10 +38,10 @@ class JpsApplicationOperationSuite extends KyuubiFunSuite {
jps.initialize(null)
test("JpsApplicationOperation with jstat") {
- assert(jps.isSupported(None))
- assert(jps.isSupported(Some("local")))
- assert(!jps.killApplicationByTag(null)._1)
- assert(!jps.killApplicationByTag("have a space")._1)
+ assert(jps.isSupported(ApplicationManagerInfo(None)))
+ assert(jps.isSupported(ApplicationManagerInfo(Some("local"))))
+ assert(!jps.killApplicationByTag(ApplicationManagerInfo(None), null)._1)
+ assert(!jps.killApplicationByTag(ApplicationManagerInfo(None), "have a
space")._1)
val currentProcess = ManagementFactory.getRuntimeMXBean.getName
val currentPid = currentProcess.splitAt(currentProcess.indexOf("@"))._1
@@ -52,16 +52,16 @@ class JpsApplicationOperationSuite extends KyuubiFunSuite {
}.start()
eventually(Timeout(10.seconds)) {
- val desc1 = jps.getApplicationInfoByTag("sun.tools.jstat.Jstat")
+ val desc1 = jps.getApplicationInfoByTag(ApplicationManagerInfo(None),
"sun.tools.jstat.Jstat")
assert(desc1.id != null)
assert(desc1.name != null)
assert(desc1.state == ApplicationState.RUNNING)
}
- jps.killApplicationByTag("sun.tools.jstat.Jstat")
+ jps.killApplicationByTag(ApplicationManagerInfo(None),
"sun.tools.jstat.Jstat")
eventually(Timeout(10.seconds)) {
- val desc2 = jps.getApplicationInfoByTag("sun.tools.jstat.Jstat")
+ val desc2 = jps.getApplicationInfoByTag(ApplicationManagerInfo(None),
"sun.tools.jstat.Jstat")
assert(desc2.id == null)
assert(desc2.name == null)
assert(desc2.state == ApplicationState.NOT_FOUND)
@@ -78,25 +78,25 @@ class JpsApplicationOperationSuite extends KyuubiFunSuite {
val builder = new SparkProcessBuilder(user, conf)
builder.start
- assert(jps.isSupported(builder.clusterManager()))
+ assert(jps.isSupported(ApplicationManagerInfo(builder.clusterManager())))
eventually(Timeout(10.seconds)) {
- val desc1 = jps.getApplicationInfoByTag(id)
+ val desc1 = jps.getApplicationInfoByTag(ApplicationManagerInfo(None), id)
assert(desc1.id != null)
assert(desc1.name != null)
assert(desc1.state == ApplicationState.RUNNING)
- val response = jps.killApplicationByTag(id)
+ val response = jps.killApplicationByTag(ApplicationManagerInfo(None), id)
assert(response._1, response._2)
assert(response._2 startsWith "Succeeded to terminate:")
}
eventually(Timeout(10.seconds)) {
- val desc2 = jps.getApplicationInfoByTag(id)
+ val desc2 = jps.getApplicationInfoByTag(ApplicationManagerInfo(None), id)
assert(desc2.id == null)
assert(desc2.name == null)
assert(desc2.state == ApplicationState.NOT_FOUND)
}
- val response2 = jps.killApplicationByTag(id)
+ val response2 = jps.killApplicationByTag(ApplicationManagerInfo(None), id)
assert(!response2._1)
assert(response2._2 === ApplicationOperation.NOT_FOUND)
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
index 9c69817a3..0c180db72 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
@@ -29,7 +29,7 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.{KYUUBI_VERSION, WithKyuubiServer}
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf.SESSION_CONF_ADVISOR
-import org.apache.kyuubi.engine.ApplicationState
+import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState}
import org.apache.kyuubi.jdbc.KyuubiHiveDriver
import org.apache.kyuubi.jdbc.hive.{KyuubiConnection, KyuubiSQLException}
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
@@ -233,9 +233,11 @@ class KyuubiOperationPerConnectionSuite extends
WithKyuubiServer with HiveJDBCTe
}
val engineId =
sessionManager.allSessions().head.handle.identifier.toString
// kill the engine application and wait the engine terminate
- sessionManager.applicationManager.killApplication(None, engineId)
+
sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None),
engineId)
eventually(timeout(30.seconds), interval(100.milliseconds)) {
- assert(sessionManager.applicationManager.getApplicationInfo(None,
engineId)
+ assert(sessionManager.applicationManager.getApplicationInfo(
+ ApplicationManagerInfo(None),
+ engineId)
.exists(_.state == ApplicationState.NOT_FOUND))
}
assert(!conn.isValid(3000))
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
index da9b8ae44..f5bbe640e 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
@@ -33,7 +33,7 @@ import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite,
RestFrontendTestHelper
import org.apache.kyuubi.client.api.v1.dto.{Engine, OperationData, ServerData,
SessionData, SessionHandle, SessionOpenRequest}
import org.apache.kyuubi.config.KyuubiConf
import
org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY
-import org.apache.kyuubi.engine.{ApplicationState, EngineRef,
KyuubiApplicationManager}
+import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState,
EngineRef, KyuubiApplicationManager}
import org.apache.kyuubi.engine.EngineType.SPARK_SQL
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, USER}
import org.apache.kyuubi.ha.HighAvailabilityConf
@@ -293,9 +293,10 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
}
// kill the engine application
- engineMgr.killApplication(None, id)
+ engineMgr.killApplication(ApplicationManagerInfo(None), id)
eventually(timeout(30.seconds), interval(100.milliseconds)) {
- assert(engineMgr.getApplicationInfo(None, id).exists(_.state ==
ApplicationState.NOT_FOUND))
+ assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None),
id).exists(
+ _.state == ApplicationState.NOT_FOUND))
}
}
}
@@ -339,9 +340,10 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
}
// kill the engine application
- engineMgr.killApplication(None, id)
+ engineMgr.killApplication(ApplicationManagerInfo(None), id)
eventually(timeout(30.seconds), interval(100.milliseconds)) {
- assert(engineMgr.getApplicationInfo(None, id).exists(_.state ==
ApplicationState.NOT_FOUND))
+ assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None),
id).exists(
+ _.state == ApplicationState.NOT_FOUND))
}
}
}
@@ -417,9 +419,10 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
assert(engines(0).getSubdomain == "default")
// kill the engine application
- engineMgr.killApplication(None, id)
+ engineMgr.killApplication(ApplicationManagerInfo(None), id)
eventually(timeout(30.seconds), interval(100.milliseconds)) {
- assert(engineMgr.getApplicationInfo(None, id).exists(_.state ==
ApplicationState.NOT_FOUND))
+ assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None),
id).exists(
+ _.state == ApplicationState.NOT_FOUND))
}
}
}
@@ -463,9 +466,10 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
assert(engines(0).getSubdomain == "default")
// kill the engine application
- engineMgr.killApplication(None, id)
+ engineMgr.killApplication(ApplicationManagerInfo(None), id)
eventually(timeout(30.seconds), interval(100.milliseconds)) {
- assert(engineMgr.getApplicationInfo(None, id).exists(_.state ==
ApplicationState.NOT_FOUND))
+ assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None),
id).exists(
+ _.state == ApplicationState.NOT_FOUND))
}
}
}
@@ -528,12 +532,12 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
assert(result1.size == 1)
// kill the engine application
- engineMgr.killApplication(None, id1)
- engineMgr.killApplication(None, id2)
+ engineMgr.killApplication(ApplicationManagerInfo(None), id1)
+ engineMgr.killApplication(ApplicationManagerInfo(None), id2)
eventually(timeout(30.seconds), interval(100.milliseconds)) {
- assert(engineMgr.getApplicationInfo(None, id1)
+ assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id1)
.exists(_.state == ApplicationState.NOT_FOUND))
- assert(engineMgr.getApplicationInfo(None, id2)
+ assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id2)
.exists(_.state == ApplicationState.NOT_FOUND))
}
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index 8e0a80c4d..8a797f842 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -37,7 +37,7 @@ import org.apache.kyuubi.client.util.BatchUtils
import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.engine.{ApplicationInfo, KyuubiApplicationManager}
+import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo,
KyuubiApplicationManager}
import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState}
@@ -64,7 +64,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper wi
}
sessionManager.getBatchesFromMetadataStore(null, null, null, 0, 0, 0,
Int.MaxValue).foreach {
batch =>
- sessionManager.applicationManager.killApplication(None, batch.getId)
+
sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None),
batch.getId)
sessionManager.cleanupMetadata(batch.getId)
}
}
@@ -481,7 +481,8 @@ class BatchesResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper wi
var applicationStatus: Option[ApplicationInfo] = None
eventually(timeout(5.seconds)) {
- applicationStatus =
sessionManager.applicationManager.getApplicationInfo(None, batchId2)
+ applicationStatus =
+
sessionManager.applicationManager.getApplicationInfo(ApplicationManagerInfo(None),
batchId2)
assert(applicationStatus.isDefined)
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
index ff807ef02..4e18951b5 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
@@ -32,6 +32,7 @@ import org.apache.kyuubi.{BatchTestHelper,
RestClientTestHelper, Utils}
import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ctl.{CtlConf, TestPrematureExit}
+import org.apache.kyuubi.engine.ApplicationManagerInfo
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.session.KyuubiSessionManager
@@ -80,7 +81,7 @@ class BatchCliSuite extends RestClientTestHelper with
TestPrematureExit with Bat
}
sessionManager.getBatchesFromMetadataStore(null, null, null, 0, 0, 0,
Int.MaxValue).foreach {
batch =>
- sessionManager.applicationManager.killApplication(None, batch.getId)
+
sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None),
batch.getId)
sessionManager.cleanupMetadata(batch.getId)
}
}