This is an automated email from the ASF dual-hosted git repository. feiwang pushed a commit to branch branch-1.10 in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.10 by this push: new 1c7113b1aa [KYUUBI #7027] Support to initialize kubernetes clients on kyuubi server startup 1c7113b1aa is described below commit 1c7113b1aae7eeaab08a0724ed4071ccbc3cd007 Author: Wang, Fei <fwan...@ebay.com> AuthorDate: Tue Apr 15 22:36:16 2025 -0700 [KYUUBI #7027] Support to initialize kubernetes clients on kyuubi server startup ### Why are the changes needed? This ensure the Kyuubi server is promptly informed for any Kubernetes resource changes after startup. It is highly recommend to set it for multiple Kyuubi instances mode. ### How was this patch tested? Existing GA and Integration testing. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #7027 from turboFei/k8s_client_init. Closes #7027 393b9960a [Wang, Fei] server only a640278c4 [Wang, Fei] refresh Authored-by: Wang, Fei <fwan...@ebay.com> Signed-off-by: Wang, Fei <fwan...@ebay.com> (cherry picked from commit 4fc201e85dd37dfbc032f7b30d5e38d9237db282) Signed-off-by: Wang, Fei <fwan...@ebay.com> --- docs/configuration/settings.md | 1 + .../org/apache/kyuubi/config/KyuubiConf.scala | 12 +++++++++++ .../engine/KubernetesApplicationOperation.scala | 23 ++++++++++++++++++++++ .../kyuubi/engine/KyuubiApplicationManager.scala | 5 +++++ .../org/apache/kyuubi/server/KyuubiServer.scala | 2 ++ .../KubernetesApplicationOperationSuite.scala | 17 ++++++++++++++++ 6 files changed, 60 insertions(+) diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index 02902b97df..b143bae04d 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -356,6 +356,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.kubernetes.authenticate.clientKeyFile | <undefined> | Path to the client key file for connecting to the Kubernetes API server over TLS from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not provide a scheme) [...] | kyuubi.kubernetes.authenticate.oauthToken | <undefined> | The OAuth token to use when authenticating against the Kubernetes API server. Note that unlike, the other authentication options, this must be the exact string value of the token to use for the authentication. [...] | kyuubi.kubernetes.authenticate.oauthTokenFile | <undefined> | Path to the file containing the OAuth token to use when authenticating against the Kubernetes API server. Specify this as a path as opposed to a URI (i.e. do not provide a scheme) [...] +| kyuubi.kubernetes.client.initialize.list || The kubernetes client initialize list to register kubernetes resource informers during Kyuubi server startup. This ensure the Kyuubi server is promptly informed for any Kubernetes resource changes after startup. It is highly recommend to set it for multiple Kyuubi instances mode. The format is `context1:namespace1,context2:namespace2`. [...] | kyuubi.kubernetes.context | <undefined> | The desired context from your kubernetes config file used to configure the K8s client for interacting with the cluster. [...] | kyuubi.kubernetes.context.allow.list || The allowed kubernetes context list, if it is empty, there is no kubernetes context limitation. [...] | kyuubi.kubernetes.master.address | <undefined> | The internal Kubernetes master (API server) address to be used for kyuubi. [...] diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 6588b3fb87..38a6384976 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1236,6 +1236,18 @@ object KyuubiConf { .toSet() .createWithDefault(Set.empty) + val KUBERNETES_CLIENT_INITIALIZE_LIST: ConfigEntry[Seq[String]] = + buildConf("kyuubi.kubernetes.client.initialize.list") + .doc("The kubernetes client initialize list to register kubernetes resource informers" + + " during Kyuubi server startup. This ensure the Kyuubi server is promptly informed for" + + " any Kubernetes resource changes after startup. It is highly recommend to set it for" + + " multiple Kyuubi instances mode. The format is `context1:namespace1,context2:namespace2`.") + .version("1.11.0") + .serverOnly + .stringConf + .toSequence() + .createWithDefault(Nil) + val KUBERNETES_MASTER: OptionalConfigEntry[String] = buildConf("kyuubi.kubernetes.master.address") .doc("The internal Kubernetes master (API server) address to be used for kyuubi.") diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index 59faee4868..55f0fa6613 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -167,6 +167,29 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { TimeUnit.MILLISECONDS) cleanupCanceledAppPodExecutor = ThreadUtils.newDaemonCachedThreadPool( "cleanup-canceled-app-pod-thread") + initializeKubernetesClient(kyuubiConf) + } + + private[kyuubi] def getKubernetesClientInitializeInfo( + kyuubiConf: KyuubiConf): Seq[KubernetesInfo] = { + kyuubiConf.get(KyuubiConf.KUBERNETES_CLIENT_INITIALIZE_LIST).map { init => + val (context, namespace) = init.split(":") match { + case Array(ctx, ns) => (Some(ctx).filterNot(_.isEmpty), Some(ns).filterNot(_.isEmpty)) + case Array(ctx) => (Some(ctx).filterNot(_.isEmpty), None) + case _ => (None, None) + } + KubernetesInfo(context, namespace) + } + } + + private[kyuubi] def initializeKubernetesClient(kyuubiConf: KyuubiConf): Unit = { + getKubernetesClientInitializeInfo(kyuubiConf).foreach { kubernetesInfo => + try { + getOrCreateKubernetesClient(kubernetesInfo) + } catch { + case e: Throwable => error(s"Failed to initialize Kubernetes client for $kubernetesInfo", e) + } + } } override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala index a80f13a86c..b4095c424f 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala @@ -96,6 +96,11 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager case None => None } } + + private[kyuubi] def getKubernetesApplicationOperation: Option[KubernetesApplicationOperation] = { + operations.find(_.isInstanceOf[KubernetesApplicationOperation]) + .map(_.asInstanceOf[KubernetesApplicationOperation]) + } } object KyuubiApplicationManager { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala index 338ac6b414..0996472653 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala @@ -124,6 +124,8 @@ object KyuubiServer extends Logging { val refreshedKubernetesConf = createKyuubiConf().getAll.filter(_._1.startsWith(KYUUBI_KUBERNETES_CONF_PREFIX)) refreshConfig("kubernetes", existedKubernetesConf, refreshedKubernetesConf) + kyuubiServer.backendService.sessionManager.asInstanceOf[KyuubiSessionManager].applicationManager + .getKubernetesApplicationOperation.foreach(_.initializeKubernetesClient(kyuubiServer.conf)) } private def refreshConfig( diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala index ab663a0074..f6bfc409db 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala @@ -96,4 +96,21 @@ class KubernetesApplicationOperationSuite extends KyuubiFunSuite { sparkUiPort) === s"http://$sparkDriverSvc.$kubernetesNamespace.svc.$kubernetesContext.k8s.io:$sparkUiPort") } + + test("get kubernetes client initialization info") { + val kyuubiConf = KyuubiConf() + kyuubiConf.set( + KyuubiConf.KUBERNETES_CLIENT_INITIALIZE_LIST.key, + "c1:ns1,c1:ns2,c2:ns1,c2:ns2,c1:,:ns1") + + val operation = new KubernetesApplicationOperation() + assert(operation.getKubernetesClientInitializeInfo(kyuubiConf) === + Array( + KubernetesInfo(Some("c1"), Some("ns1")), + KubernetesInfo(Some("c1"), Some("ns2")), + KubernetesInfo(Some("c2"), Some("ns1")), + KubernetesInfo(Some("c2"), Some("ns2")), + KubernetesInfo(Some("c1"), None), + KubernetesInfo(None, Some("ns1")))) + } }