This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new 3ab8733ba [KYUUBI #4623][K8S] KubernetesApplicationOperation uses
Informer instead of list
3ab8733ba is described below
commit 3ab8733ba72c4956d0f0d36c6a4d2a7401d95cf4
Author: zwangsheng <[email protected]>
AuthorDate: Fri Mar 31 15:21:59 2023 +0800
[KYUUBI #4623][K8S] KubernetesApplicationOperation uses Informer instead of
list
Close #4623
To reduce the pressure on the Api Server (which use the kubernetes client
polls with label to find the `spark driver pod` when multiple Rest Application
are running at the same time), use informer, the kubernetes-recommended method
of maintaining the application state.
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
- [x] Run CI
Closes #4625 from zwangsheng/KYUUBI_4623.
Closes #4623
a415bef7f [Cheng Pan] nit
136d0db4d [Cheng Pan] 171
b5d3c237a [Cheng Pan] re-generate conf
bf14ad870 [Cheng Pan] nit
9ee7e04f9 [Cheng Pan] nit
301162ea0 [Cheng Pan] nit
1d426922b [Cheng Pan] nit
b95d7a650 [Cheng Pan] improve
cc8d2c7f4 [zwangsheng] fix comments
d017bafdf [zwangsheng] Set resycn 0
28f9a70d9 [zwangsheng] Reorder func & slow get app info
22d9c1662 [zwangsheng] fix setting
8e0940334 [zwangsheng] fix comments
10965d3df [zwangsheng] Rename fileter function => isSparkEnginePod
b02677154 [zwangsheng] rename
78c9fdb17 [zwangsheng] fix comments
6d31f70d1 [zwangsheng] Fix IT Test
f43bba2b9 [zwangsheng] fix
17e4f55eb [zwangsheng] debug
be8da790e [zwangsheng] debug
0db45a513 [zwangsheng] retest
a93786abc [zwangsheng] Fix style
652ee837e [zwangsheng] Add Setting & Debug
4add7e4e2 [zwangsheng] improve
1f4341237 [zwangsheng] remove unused import
35acd6106 [zwangsheng] fix compile
05dfc598e [zwangsheng] [KYUUBI #4623][Improvement][K8S] Remove cached app
info when out of time
4ab530e99 [zwangsheng] [KYUUBI #4623][Improvement][K8S]
kubernetesApplicationOperation Using Informer instead of list
Lead-authored-by: zwangsheng <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
docs/deployment/settings.md | 91 ++++++++--
.../test/spark/SparkOnKubernetesTestsSuite.scala | 17 +-
.../org/apache/kyuubi/config/KyuubiConf.scala | 11 +-
.../kyuubi/engine/ApplicationOperation.scala | 5 +
.../engine/KubernetesApplicationOperation.scala | 194 +++++++++++++--------
5 files changed, 229 insertions(+), 89 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 840d7d518..5fc1c58b6 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -118,6 +118,7 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
### Engine
+<<<<<<< HEAD
| Key | Default
|
[...]
|----------------------------------------------------------|---------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
| kyuubi.engine.connection.url.use.hostname | true
| (deprecated) When true, the engine registers with hostname to
zookeeper. When Spark runs on K8s with cluster mode, set to false to ensure
that server can connect to engine
[...]
@@ -172,6 +173,73 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.user.isolated.spark.session | true
| When set to false, if the engine is running in a group or server
share level, all the JDBC/ODBC connections will be isolated against the user.
Including the temporary views, function registries, SQL configuration, and the
current database. Note that, it does not affect if the share level is
connection or user.
[...]
| kyuubi.engine.user.isolated.spark.session.idle.interval | PT1M
| The interval to check if the user-isolated Spark session is timeout.
[...]
| kyuubi.engine.user.isolated.spark.session.idle.timeout | PT6H
| If kyuubi.engine.user.isolated.spark.session is false, we will
release the Spark session if its corresponding user is inactive after this
configured timeout.
[...]
+============================================================================================================================================================================================================================================================================================================================================================================================================================================================================================================
[...]
+
+| Key | Default
|
[...]
+|----------------------------------------------------------|---------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| kyuubi.engine.chat.extra.classpath | <undefined>
| The extra classpath for the Chat engine, for configuring the location
of the SDK and etc.
[...]
+| kyuubi.engine.chat.gpt.apiKey | <undefined>
| The key to access OpenAI open API, which could be got at
https://platform.openai.com/account/api-keys
[...]
+| kyuubi.engine.chat.gpt.http.connect.timeout | PT2M
| The timeout[ms] for establishing the connection with the Chat GPT
server. A timeout value of zero is interpreted as an infinite timeout.
[...]
+| kyuubi.engine.chat.gpt.http.proxy | <undefined>
| HTTP proxy url for API calling in Chat GPT engine. e.g.
http://127.0.0.1:1087
[...]
+| kyuubi.engine.chat.gpt.http.socket.timeout | PT2M
| The timeout[ms] for waiting for data packets after Chat GPT server
connection is established. A timeout value of zero is interpreted as an
infinite timeout.
[...]
+| kyuubi.engine.chat.gpt.model | gpt-3.5-turbo
| ID of the model used in ChatGPT. Available models refer to OpenAI's
[Model overview](https://platform.openai.com/docs/models/overview).
[...]
+| kyuubi.engine.chat.java.options | <undefined>
| The extra Java options for the Chat engine
[...]
+| kyuubi.engine.chat.memory | 1g
| The heap memory for the Chat engine
[...]
+| kyuubi.engine.chat.provider | ECHO
| The provider for the Chat engine. Candidates: <ul> <li>ECHO: simply
replies a welcome message.</li> <li>GPT: a.k.a ChatGPT, powered by
OpenAI.</li></ul>
[...]
+| kyuubi.engine.connection.url.use.hostname | true
| (deprecated) When true, the engine registers with hostname to
zookeeper. When Spark runs on K8s with cluster mode, set to false to ensure
that server can connect to engine
[...]
+| kyuubi.engine.deregister.exception.classes
|| A comma-separated list of exception classes. If there is any
exception thrown, whose class matches the specified classes, the engine would
deregister itself.
[...]
+| kyuubi.engine.deregister.exception.messages
|| A comma-separated list of exception messages. If there is any
exception thrown, whose message or stacktrace matches the specified message
list, the engine would deregister itself.
[...]
+| kyuubi.engine.deregister.exception.ttl | PT30M
| Time to live(TTL) for exceptions pattern specified in
kyuubi.engine.deregister.exception.classes and
kyuubi.engine.deregister.exception.messages to deregister engines. Once the
total error count hits the kyuubi.engine.deregister.job.max.failures within the
TTL, an engine will deregister itself and wait for self-terminated. Otherwise,
we suppose that the engine has recovered from temporary failures. [...]
+| kyuubi.engine.deregister.job.max.failures | 4
| Number of failures of job before deregistering the engine.
[...]
+| kyuubi.engine.event.json.log.path |
file:///tmp/kyuubi/events | The location where all the engine events go for the
built-in JSON logger.<ul><li>Local Path: start with 'file://'</li><li>HDFS
Path: start with 'hdfs://'</li></ul>
[...]
+| kyuubi.engine.event.loggers | SPARK
| A comma-separated list of engine history loggers, where
engine/session/operation etc events go.<ul> <li>SPARK: the events will be
written to the Spark listener bus.</li> <li>JSON: the events will be written to
the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be
done</li> <li>CUSTOM: User-defined event handlers.</li></ul> Note that: Kyuubi
supports custom event handlers with the Jav [...]
+| kyuubi.engine.flink.extra.classpath | <undefined>
| The extra classpath for the Flink SQL engine, for configuring the
location of hadoop client jars, etc
[...]
+| kyuubi.engine.flink.java.options | <undefined>
| The extra Java options for the Flink SQL engine
[...]
+| kyuubi.engine.flink.memory | 1g
| The heap memory for the Flink SQL engine
[...]
+| kyuubi.engine.hive.event.loggers | JSON
| A comma-separated list of engine history loggers, where
engine/session/operation etc events go.<ul> <li>JSON: the events will be
written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to
be done</li> <li>CUSTOM: to be done.</li></ul>
[...]
+| kyuubi.engine.hive.extra.classpath | <undefined>
| The extra classpath for the Hive query engine, for configuring
location of the hadoop client jars and etc.
[...]
+| kyuubi.engine.hive.java.options | <undefined>
| The extra Java options for the Hive query engine
[...]
+| kyuubi.engine.hive.memory | 1g
| The heap memory for the Hive query engine
[...]
+| kyuubi.engine.initialize.sql | SHOW DATABASES
| SemiColon-separated list of SQL statements to be initialized in the
newly created engine before queries. i.e. use `SHOW DATABASES` to eagerly
active HiveClient. This configuration can not be used in JDBC url due to the
limitation of Beeline/JDBC driver.
[...]
+| kyuubi.engine.jdbc.connection.password | <undefined>
| The password is used for connecting to server
[...]
+| kyuubi.engine.jdbc.connection.properties
|| The additional properties are used for connecting to server
[...]
+| kyuubi.engine.jdbc.connection.provider | <undefined>
| The connection provider is used for getting a connection from the
server
[...]
+| kyuubi.engine.jdbc.connection.url | <undefined>
| The server url that engine will connect to
[...]
+| kyuubi.engine.jdbc.connection.user | <undefined>
| The user is used for connecting to server
[...]
+| kyuubi.engine.jdbc.driver.class | <undefined>
| The driver class for JDBC engine connection
[...]
+| kyuubi.engine.jdbc.extra.classpath | <undefined>
| The extra classpath for the JDBC query engine, for configuring the
location of the JDBC driver and etc.
[...]
+| kyuubi.engine.jdbc.java.options | <undefined>
| The extra Java options for the JDBC query engine
[...]
+| kyuubi.engine.jdbc.memory | 1g
| The heap memory for the JDBC query engine
[...]
+| kyuubi.engine.jdbc.type | <undefined>
| The short name of JDBC type
[...]
+| kyuubi.engine.operation.convert.catalog.database.enabled | true
| When set to true, The engine converts the JDBC methods of set/get
Catalog and set/get Schema to the implementation of different engines
[...]
+| kyuubi.engine.operation.log.dir.root |
engine_operation_logs | Root directory for query operation log at
engine-side.
[...]
+| kyuubi.engine.pool.name | engine-pool
| The name of the engine pool.
[...]
+| kyuubi.engine.pool.selectPolicy | RANDOM
| The select policy of an engine from the corresponding engine pool
engine for a session. <ul><li>RANDOM - Randomly use the engine in the
pool</li><li>POLLING - Polling use the engine in the pool</li></ul>
[...]
+| kyuubi.engine.pool.size | -1
| The size of the engine pool. Note that, if the size is less than 1,
the engine pool will not be enabled; otherwise, the size of the engine pool
will be min(this, kyuubi.engine.pool.size.threshold).
[...]
+| kyuubi.engine.pool.size.threshold | 9
| This parameter is introduced as a server-side parameter controlling
the upper limit of the engine pool.
[...]
+| kyuubi.engine.session.initialize.sql
|| SemiColon-separated list of SQL statements to be initialized in the
newly created engine session before queries. This configuration can not be used
in JDBC url due to the limitation of Beeline/JDBC driver.
[...]
+| kyuubi.engine.share.level | USER
| Engines will be shared in different levels, available configs are:
<ul> <li>CONNECTION: engine will not be shared but only used by the current
client connection</li> <li>USER: engine will be shared by all sessions created
by a unique username, see also kyuubi.engine.share.level.subdomain</li>
<li>GROUP: the engine will be shared by all sessions created by all users
belong to the same primary group na [...]
+| kyuubi.engine.share.level.sub.domain | <undefined>
| (deprecated) - Using kyuubi.engine.share.level.subdomain instead
[...]
+| kyuubi.engine.share.level.subdomain | <undefined>
| Allow end-users to create a subdomain for the share level of an
engine. A subdomain is a case-insensitive string values that must be a valid
zookeeper subpath. For example, for the `USER` share level, an end-user can
share a certain engine within a subdomain, not for all of its clients.
End-users are free to create multiple engines in the `USER` share level. When
disable engine pool, use 'default' if [...]
+| kyuubi.engine.single.spark.session | false
| When set to true, this engine is running in a single session mode.
All the JDBC/ODBC connections share the temporary views, function registries,
SQL configuration and the current database.
[...]
+| kyuubi.engine.spark.event.loggers | SPARK
| A comma-separated list of engine loggers, where
engine/session/operation etc events go.<ul> <li>SPARK: the events will be
written to the Spark listener bus.</li> <li>JSON: the events will be written to
the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be
done</li> <li>CUSTOM: to be done.</li></ul>
[...]
+| kyuubi.engine.spark.python.env.archive | <undefined>
| Portable Python env archive used for Spark engine Python language
mode.
[...]
+| kyuubi.engine.spark.python.env.archive.exec.path | bin/python
| The Python exec path under the Python env archive.
[...]
+| kyuubi.engine.spark.python.home.archive | <undefined>
| Spark archive containing $SPARK_HOME/python directory, which is used
to init session Python worker for Python language mode.
[...]
+| kyuubi.engine.submit.timeout | PT30S
| Period to tolerant Driver Pod ephemerally invisible after submitting.
In some Resource Managers, e.g. K8s, the Driver Pod is not visible immediately
after `spark-submit` is returned.
[...]
+| kyuubi.engine.trino.event.loggers | JSON
| A comma-separated list of engine history loggers, where
engine/session/operation etc events go.<ul> <li>JSON: the events will be
written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to
be done</li> <li>CUSTOM: to be done.</li></ul>
[...]
+| kyuubi.engine.trino.extra.classpath | <undefined>
| The extra classpath for the Trino query engine, for configuring other
libs which may need by the Trino engine
[...]
+| kyuubi.engine.trino.java.options | <undefined>
| The extra Java options for the Trino query engine
[...]
+| kyuubi.engine.trino.memory | 1g
| The heap memory for the Trino query engine
[...]
+| kyuubi.engine.type | SPARK_SQL
| Specify the detailed engine supported by Kyuubi. The engine type
bindings to SESSION scope. This configuration is experimental. Currently,
available configs are: <ul> <li>SPARK_SQL: specify this engine type will launch
a Spark engine which can provide all the capacity of the Apache Spark. Note,
it's a default engine type.</li> <li>FLINK_SQL: specify this engine type will
launch a Flink engine which c [...]
+| kyuubi.engine.ui.retainedSessions | 200
| The number of SQL client sessions kept in the Kyuubi Query Engine web
UI.
[...]
+| kyuubi.engine.ui.retainedStatements | 200
| The number of statements kept in the Kyuubi Query Engine web UI.
[...]
+| kyuubi.engine.ui.stop.enabled | true
| When true, allows Kyuubi engine to be killed from the Spark Web UI.
[...]
+| kyuubi.engine.user.isolated.spark.session | true
| When set to false, if the engine is running in a group or server
share level, all the JDBC/ODBC connections will be isolated against the user.
Including the temporary views, function registries, SQL configuration, and the
current database. Note that, it does not affect if the share level is
connection or user.
[...]
+| kyuubi.engine.user.isolated.spark.session.idle.interval | PT1M
| The interval to check if the user-isolated Spark session is timeout.
[...]
+| kyuubi.engine.user.isolated.spark.session.idle.timeout | PT6H
| If kyuubi.engine.user.isolated.spark.session is false, we will
release the Spark session if its corresponding user is inactive after this
configured timeout.
[...]
+
+>>>>>>> d9e14f239 ([KYUUBI #4623][K8S] KubernetesApplicationOperation uses
Informer instead of list)
### Event
@@ -283,17 +351,18 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
### Kubernetes
-| Key | Default |
Meaning
| Type | Since |
-|-----------------------------------------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|-------|
-| kyuubi.kubernetes.authenticate.caCertFile | <undefined> | Path to
the CA cert file for connecting to the Kubernetes API server over TLS from the
kyuubi. Specify this as a path as opposed to a URI (i.e. do not provide a
scheme) | string | 1.7.0 |
-| kyuubi.kubernetes.authenticate.clientCertFile | <undefined> | Path to
the client cert file for connecting to the Kubernetes API server over TLS from
the kyuubi. Specify this as a path as opposed to a URI (i.e. do not provide a
scheme) | string | 1.7.0 |
-| kyuubi.kubernetes.authenticate.clientKeyFile | <undefined> | Path to
the client key file for connecting to the Kubernetes API server over TLS from
the kyuubi. Specify this as a path as opposed to a URI (i.e. do not provide a
scheme) | string | 1.7.0 |
-| kyuubi.kubernetes.authenticate.oauthToken | <undefined> | The
OAuth token to use when authenticating against the Kubernetes API server. Note
that unlike, the other authentication options, this must be the exact string
value of the token to use for the authentication. | string | 1.7.0 |
-| kyuubi.kubernetes.authenticate.oauthTokenFile | <undefined> | Path to
the file containing the OAuth token to use when authenticating against the
Kubernetes API server. Specify this as a path as opposed to a URI (i.e. do not
provide a scheme) | string | 1.7.0 |
-| kyuubi.kubernetes.context | <undefined> | The
desired context from your kubernetes config file used to configure the K8s
client for interacting with the cluster.
| string | 1.6.0 |
-| kyuubi.kubernetes.master.address | <undefined> | The
internal Kubernetes master (API server) address to be used for kyuubi.
| string | 1.7.0 |
-| kyuubi.kubernetes.namespace | default | The
namespace that will be used for running the kyuubi pods and find engines.
| string | 1.7.0 |
-| kyuubi.kubernetes.trust.certificates | false | If set
to true then client can submit to kubernetes cluster only with token
| boolean | 1.7.0 |
+| Key | Default |
Meaning
| Type | Since |
+|-----------------------------------------------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------|
+| kyuubi.kubernetes.authenticate.caCertFile | <undefined> |
Path to the CA cert file for connecting to the Kubernetes API server over TLS
from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not
provide a scheme) | string | 1.7.0 |
+| kyuubi.kubernetes.authenticate.clientCertFile | <undefined> |
Path to the client cert file for connecting to the Kubernetes API server over
TLS from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not
provide a scheme) | string | 1.7.0 |
+| kyuubi.kubernetes.authenticate.clientKeyFile | <undefined> |
Path to the client key file for connecting to the Kubernetes API server over
TLS from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not
provide a scheme) | string | 1.7.0 |
+| kyuubi.kubernetes.authenticate.oauthToken | <undefined> |
The OAuth token to use when authenticating against the Kubernetes API server.
Note that unlike, the other authentication options, this must be the exact
string value of the token to use for the authentication. | string | 1.7.0 |
+| kyuubi.kubernetes.authenticate.oauthTokenFile | <undefined> |
Path to the file containing the OAuth token to use when authenticating against
the Kubernetes API server. Specify this as a path as opposed to a URI (i.e. do
not provide a scheme) | string | 1.7.0 |
+| kyuubi.kubernetes.context | <undefined> |
The desired context from your kubernetes config file used to configure the K8s
client for interacting with the cluster.
| string | 1.6.0 |
+| kyuubi.kubernetes.master.address | <undefined> |
The internal Kubernetes master (API server) address to be used for kyuubi.
| string | 1.7.0 |
+| kyuubi.kubernetes.namespace | default |
The namespace that will be used for running the kyuubi pods and find engines.
| string | 1.7.0 |
+| kyuubi.kubernetes.terminatedApplicationRetainPeriod | PT5M |
The period for which the Kyuubi server retains application information after
the application terminates.
| duration | 1.7.1 |
+| kyuubi.kubernetes.trust.certificates | false | If
set to true then client can submit to kubernetes cluster only with token
| boolean | 1.7.0 |
### Metadata
diff --git
a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
index 831504608..9ddcc5937 100644
---
a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
+++
b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
@@ -208,19 +208,18 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
batchRequest.getConf.asScala.toMap,
batchRequest)
- val session =
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
- val batchJobSubmissionOp = session.batchJobSubmissionOp
-
- eventually(timeout(3.minutes), interval(50.milliseconds)) {
- val appInfo = batchJobSubmissionOp.getOrFetchCurrentApplicationInfo
- assert(appInfo.nonEmpty)
- assert(appInfo.exists(_.state == RUNNING))
- assert(appInfo.exists(_.name.startsWith(driverPodNamePrefix)))
+ // wait for driver pod start
+ eventually(timeout(3.minutes), interval(5.second)) {
+ // trigger k8sOperation init here
+ val appInfo =
k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
+ assert(appInfo.state == RUNNING)
+ assert(appInfo.name.startsWith(driverPodNamePrefix))
}
val killResponse =
k8sOperation.killApplicationByTag(sessionHandle.identifier.toString)
assert(killResponse._1)
- assert(killResponse._2 startsWith "Operation of deleted appId:")
+ assert(killResponse._2 endsWith "is completed")
+ assert(killResponse._2 contains sessionHandle.identifier.toString)
eventually(timeout(3.minutes), interval(50.milliseconds)) {
val appInfo =
k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index fa51af7ff..5301066cf 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1169,6 +1169,15 @@ object KyuubiConf {
.booleanConf
.createWithDefault(false)
+ val KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD: ConfigEntry[Long] =
+ buildConf("kyuubi.kubernetes.terminatedApplicationRetainPeriod")
+ .doc("The period for which the Kyuubi server retains application
information after " +
+ "the application terminates.")
+ .version("1.7.1")
+ .timeConf
+ .checkValue(_ > 0, "must be positive number")
+ .createWithDefault(Duration.ofMinutes(5).toMillis)
+
//
///////////////////////////////////////////////////////////////////////////////////////////////
// SQL Engine Configuration
//
//
///////////////////////////////////////////////////////////////////////////////////////////////
@@ -2523,7 +2532,7 @@ object KyuubiConf {
val ENGINE_SUBMIT_TIMEOUT: ConfigEntry[Long] =
buildConf("kyuubi.engine.submit.timeout")
.doc("Period to tolerant Driver Pod ephemerally invisible after
submitting. " +
- "In some Resource Managers, e.g. K8s, the Driver Pod is not invisible
immediately " +
+ "In some Resource Managers, e.g. K8s, the Driver Pod is not visible
immediately " +
"after `spark-submit` is returned.")
.version("1.7.1")
.timeConf
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
index 00db372ce..a2b3d0f76 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
@@ -100,6 +100,11 @@ case class ApplicationInfo(
}
}
+object ApplicationInfo {
+ val NOT_FOUND: ApplicationInfo = ApplicationInfo(null, null,
ApplicationState.NOT_FOUND)
+ val UNKNOWN: ApplicationInfo = ApplicationInfo(null, null,
ApplicationState.UNKNOWN)
+}
+
object ApplicationOperation {
val NOT_FOUND = "APPLICATION_NOT_FOUND"
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index 26e9202c7..7ca6f0e2a 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -17,29 +17,54 @@
package org.apache.kyuubi.engine
-import io.fabric8.kubernetes.api.model.{Pod, PodList}
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+
+import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification}
+import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
-import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable
+import io.fabric8.kubernetes.client.informers.{ResourceEventHandler,
SharedIndexInformer}
-import org.apache.kyuubi.Logging
+import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.ApplicationState.{ApplicationState, FAILED,
FINISHED, PENDING, RUNNING, UNKNOWN}
-import
org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState,
SPARK_APP_ID_LABEL}
+import org.apache.kyuubi.engine.ApplicationState.{isTerminated,
ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
+import
org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState,
LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL}
import org.apache.kyuubi.util.KubernetesUtils
class KubernetesApplicationOperation extends ApplicationOperation with Logging
{
@volatile
private var kubernetesClient: KubernetesClient = _
-
+ private var enginePodInformer: SharedIndexInformer[Pod] = _
private var submitTimeout: Long = _
+ // key is kyuubi_unique_key
+ private val appInfoStore: ConcurrentHashMap[String, ApplicationInfo] =
+ new ConcurrentHashMap[String, ApplicationInfo]
+ // key is kyuubi_unique_key
+ private var cleanupTerminatedAppInfoTrigger: Cache[String, ApplicationState]
= _
+
override def initialize(conf: KyuubiConf): Unit = {
info("Start initializing Kubernetes Client.")
kubernetesClient = KubernetesUtils.buildKubernetesClient(conf) match {
case Some(client) =>
info(s"Initialized Kubernetes Client connect to:
${client.getMasterUrl}")
submitTimeout = conf.get(KyuubiConf.ENGINE_SUBMIT_TIMEOUT)
+ // Disable resync, see
https://github.com/fabric8io/kubernetes-client/discussions/5015
+ enginePodInformer = client.pods()
+ .withLabel(LABEL_KYUUBI_UNIQUE_KEY)
+ .inform(new SparkEnginePodEventHandler)
+ info("Start Kubernetes Client Informer.")
+ // Defer cleaning terminated application information
+ val retainPeriod =
conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD)
+ cleanupTerminatedAppInfoTrigger = CacheBuilder.newBuilder()
+ .expireAfterWrite(retainPeriod, TimeUnit.MILLISECONDS)
+ .removalListener((notification: RemovalNotification[String,
ApplicationState]) => {
+ Option(appInfoStore.remove(notification.getKey)).foreach { removed
=>
+ info(s"Remove terminated application ${removed.id} with " +
+ s"tag ${notification.getKey} and state ${removed.state}")
+ }
+ })
+ .build()
client
case None =>
warn("Fail to init Kubernetes Client for Kubernetes Application
Operation")
@@ -54,34 +79,26 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
}
override def killApplicationByTag(tag: String): KillResponse = {
- if (kubernetesClient != null) {
- debug(s"Deleting application info from Kubernetes cluster by $tag tag")
- try {
- // Need driver only
- val operation = findDriverPodByTag(tag)
- val podList = operation.list().getItems
- if (podList.size() != 0) {
- toApplicationState(podList.get(0).getStatus.getPhase) match {
- case FAILED | UNKNOWN =>
- (
- false,
- s"Target Pod ${podList.get(0).getMetadata.getName} is in
FAILED or UNKNOWN status")
- case _ =>
- (
- operation.delete(),
- s"Operation of deleted appId:
${podList.get(0).getMetadata.getName} is completed")
- }
- } else {
+ if (kubernetesClient == null) {
+ throw new IllegalStateException("Methods initialize and isSupported must
be called ahead")
+ }
+ debug(s"Deleting application info from Kubernetes cluster by $tag tag")
+ try {
+ val info = appInfoStore.getOrDefault(tag, ApplicationInfo.NOT_FOUND)
+ debug(s"Application info[tag: $tag] is in ${info.state}")
+ info.state match {
+ case NOT_FOUND | FAILED | UNKNOWN =>
(
false,
- s"Target Pod(tag: $tag) is not found, due to pod have been deleted
or not created")
- }
- } catch {
- case e: Exception =>
- (false, s"Failed to terminate application with $tag, due to
${e.getMessage}")
+ s"Target application[tag: $tag] is in ${info.state} status")
+ case _ =>
+ (
+ kubernetesClient.pods.withLabel(LABEL_KYUUBI_UNIQUE_KEY,
tag).delete(),
+ s"Operation of deleted application[appId: ${info.id} ,tag: $tag]
is completed")
}
- } else {
- throw new IllegalStateException("Methods initialize and isSupported must
be called ahead")
+ } catch {
+ case e: Exception =>
+ (false, s"Failed to terminate application with $tag, due to
${e.getMessage}")
}
}
@@ -91,60 +108,101 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
}
debug(s"Getting application info from Kubernetes cluster by $tag tag")
try {
- val operation = findDriverPodByTag(tag)
- val podList = operation.list().getItems
- if (podList.size() != 0) {
- val pod = podList.get(0)
- val info = ApplicationInfo(
- // spark pods always tag label `spark-app-selector:<spark-app-id>`
- id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL),
- name = pod.getMetadata.getName,
- state =
KubernetesApplicationOperation.toApplicationState(pod.getStatus.getPhase),
- error = Option(pod.getStatus.getReason))
- debug(s"Successfully got application info by $tag: $info")
- return info
- }
- // Kyuubi should wait second if pod is not be created
- submitTime match {
- case Some(time) =>
- val elapsedTime = System.currentTimeMillis() - time
+ val appInfo = appInfoStore.getOrDefault(tag, ApplicationInfo.NOT_FOUND)
+ (appInfo.state, submitTime) match {
+ // Kyuubi should wait second if pod is not be created
+ case (NOT_FOUND, Some(_submitTime)) =>
+ val elapsedTime = System.currentTimeMillis - _submitTime
if (elapsedTime > submitTimeout) {
error(s"Can't find target driver pod by tag: $tag, " +
s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.")
- ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
+ ApplicationInfo.NOT_FOUND
} else {
warn("Wait for driver pod to be created, " +
s"elapsed time: ${elapsedTime}ms, return UNKNOWN status")
- ApplicationInfo(id = null, name = null, ApplicationState.UNKNOWN)
+ ApplicationInfo.UNKNOWN
}
- case None =>
- ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
+ case (NOT_FOUND, None) =>
+ ApplicationInfo.NOT_FOUND
+ case _ =>
+ debug(s"Successfully got application info by $tag: $appInfo")
+ appInfo
}
} catch {
case e: Exception =>
error(s"Failed to get application with $tag, due to ${e.getMessage}")
- ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
+ ApplicationInfo.NOT_FOUND
}
}
- private def findDriverPodByTag(tag: String): FilterWatchListDeletable[Pod,
PodList] = {
- val operation = kubernetesClient.pods()
- .withLabel(KubernetesApplicationOperation.LABEL_KYUUBI_UNIQUE_KEY, tag)
- val size = operation.list().getItems.size()
- if (size != 1) {
- warn(s"Get Tag: ${tag} Driver Pod In Kubernetes size: ${size}, we expect
1")
+ override def stop(): Unit = {
+ Utils.tryLogNonFatalError {
+ if (enginePodInformer != null) {
+ enginePodInformer.stop()
+ enginePodInformer = null
+ }
}
- operation
- }
- override def stop(): Unit = {
- if (kubernetesClient != null) {
- try {
+ Utils.tryLogNonFatalError {
+ if (kubernetesClient != null) {
kubernetesClient.close()
- } catch {
- case e: Exception => error(e.getMessage)
+ kubernetesClient = null
+ }
+ }
+
+ if (cleanupTerminatedAppInfoTrigger != null) {
+ cleanupTerminatedAppInfoTrigger.cleanUp()
+ cleanupTerminatedAppInfoTrigger = null
+ }
+ }
+
+ private class SparkEnginePodEventHandler extends ResourceEventHandler[Pod] {
+
+ override def onAdd(pod: Pod): Unit = {
+ if (isSparkEnginePod(pod)) {
+ updateApplicationState(pod)
+ }
+ }
+
+ override def onUpdate(oldPod: Pod, newPod: Pod): Unit = {
+ if (isSparkEnginePod(newPod)) {
+ updateApplicationState(newPod)
+ val appState = toApplicationState(newPod.getStatus.getPhase)
+ if (isTerminated(appState)) {
+ markApplicationTerminated(newPod)
+ }
}
}
+
+ override def onDelete(pod: Pod, deletedFinalStateUnknown: Boolean): Unit =
{
+ if (isSparkEnginePod(pod)) {
+ updateApplicationState(pod)
+ markApplicationTerminated(pod)
+ }
+ }
+ }
+
+ private def isSparkEnginePod(pod: Pod): Boolean = {
+ val labels = pod.getMetadata.getLabels
+ labels.containsKey(LABEL_KYUUBI_UNIQUE_KEY) &&
labels.containsKey(SPARK_APP_ID_LABEL)
+ }
+
+ private def updateApplicationState(pod: Pod): Unit = {
+ val appState = toApplicationState(pod.getStatus.getPhase)
+ debug(s"Driver Informer changes pod: ${pod.getMetadata.getName} to state:
$appState")
+ appInfoStore.put(
+ pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY),
+ ApplicationInfo(
+ id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL),
+ name = pod.getMetadata.getName,
+ state = appState,
+ error = Option(pod.getStatus.getReason)))
+ }
+
+ private def markApplicationTerminated(pod: Pod): Unit = {
+ cleanupTerminatedAppInfoTrigger.put(
+ pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY),
+ toApplicationState(pod.getStatus.getPhase))
}
}
@@ -161,10 +219,10 @@ object KubernetesApplicationOperation extends Logging {
case "Running" => RUNNING
case "Succeeded" => FINISHED
case "Failed" | "Error" => FAILED
- case "Unknown" => ApplicationState.UNKNOWN
+ case "Unknown" => UNKNOWN
case _ =>
warn(s"The kubernetes driver pod state: $state is not supported, " +
"mark the application state as UNKNOWN.")
- ApplicationState.UNKNOWN
+ UNKNOWN
}
}