This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new ab273c8ba [KYUUBI #6008] RESTful API supports killing engine forcibly
ab273c8ba is described below
commit ab273c8ba34d3c36cf7ed6aff84e1ea3390476ba
Author: hezhao2 <[email protected]>
AuthorDate: Mon Jul 1 15:28:09 2024 +0800
[KYUUBI #6008] RESTful API supports killing engine forcibly
# :mag: Description
## Issue References ๐
## Describe Your Solution ๐ง
I'd like to introduce the feature that allows users to forcibly kill an
engine through API.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6008 from zhaohehuhu/dev-0123.
Closes #6008
00c208a26 [Cheng Pan] fix
8721a2d2a [Cheng Pan] log
efc7587f7 [Cheng Pan] client
cd5129db3 [Cheng Pan] fix ut
5e1b6a161 [Cheng Pan] Update
kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
72d7df357 [Cheng Pan] Update
kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
6d5d08710 [Cheng Pan] Update
kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
b013194d1 [zhaohehuhu] move the position of log
0cdeede7a [zhaohehuhu] restore ENGINE_SPARK_REGISTER_ATTRIBUTES
f826d0515 [zhaohehuhu] reformat
a13466e37 [zhaohehuhu] update doc and log string encoded
3a2f5970a [zhaohehuhu] refactor
ae24ea74d [zhaohehuhu] refactor UT
936a54e27 [Wang, Fei] register app mgr info
9bacc2c8b [hezhao2] fix UTs
11106d75b [Wang, Fei] comments
ba57c2c3f [hezhao2] refactor code to delete the node and then kill
application
634ceb677 [hezhao2] reformat
ab31382ee [hezhao2] reformat
513bcdc57 [hezhao2] fix UT
506220654 [hezhao2] get refId by user, sharelevel and subdomain
3ad9577df [hezhao2] rename params to support multiple engines
632c56b88 [hezhao2] fix unused import
bd7bb45f0 [hezhao2] refactor
fb9b25176 [hezhao2] add default value for forceKill param
070aad06f [hezhao2] refactor
51827ecde [hezhao2] fix UT
f11e7657e [hezhao2] add an UT
8a65cf113 [hezhao2] refactor code
d6f82ff9a [hezhao2] refactor code
f3ab9c546 [hezhao2] new parameter added to decide whether to kill
forcefully handle the result of killApplication
5faa5b54f [hezhao2] kill engine forcibly
Lead-authored-by: hezhao2 <[email protected]>
Co-authored-by: zhaohehuhu <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Co-authored-by: Wang, Fei <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
docs/client/rest/rest_api.md | 15 ++++---
.../engine/spark/SparkTBinaryFrontendService.scala | 3 +-
.../apache/kyuubi/config/KyuubiReservedKeys.scala | 1 +
.../kyuubi/service/AbstractFrontendService.scala | 6 ++-
.../org/apache/kyuubi/client/AdminRestApi.java | 16 +++++++
.../kyuubi/engine/ApplicationOperation.scala | 30 ++++++++++++-
.../org/apache/kyuubi/engine/ProcBuilder.scala | 7 ++-
.../kyuubi/server/api/v1/AdminResource.scala | 30 ++++++++++---
.../engine/flink/FlinkProcessBuilderSuite.scala | 6 ++-
.../kyuubi/server/api/v1/AdminResourceSuite.scala | 50 +++++++++++++++++++++-
.../kyuubi/server/rest/client/AdminCtlSuite.scala | 2 +-
.../server/rest/client/AdminRestApiSuite.scala | 5 ++-
12 files changed, 148 insertions(+), 23 deletions(-)
diff --git a/docs/client/rest/rest_api.md b/docs/client/rest/rest_api.md
index 4f28dec05..9600655a8 100644
--- a/docs/client/rest/rest_api.md
+++ b/docs/client/rest/rest_api.md
@@ -457,13 +457,14 @@ Delete the specified engine.
#### Request Parameters
-| Name | Description | Type |
-|:------------------------|:------------------------------|:-----------------|
-| type | the engine type | String(optional) |
-| sharelevel | the engine share level | String(optional) |
-| subdomain | the engine subdomain | String(optional) |
-| proxyUser | the proxy user to impersonate | String(optional) |
-| hive.server2.proxy.user | the proxy user to impersonate | String(optional) |
+| Name | Description
| Type |
+|:------------------------|:-------------------------------------------------------------|:------------------|
+| type | the engine type
| String(optional) |
+| sharelevel | the engine share level
| String(optional) |
+| subdomain | the engine subdomain
| String(optional) |
+| proxyUser | the proxy user to impersonate
| String(optional) |
+| hive.server2.proxy.user | the proxy user to impersonate
| String(optional) |
+| kill | whether to kill the engine forcibly. Default value
is false. | Boolean(optional) |
`proxyUser` is an alternative to `hive.server2.proxy.user`, and the current
behavior is consistent with
`hive.server2.proxy.user`. When both parameters are set, `proxyUser` takes
precedence.
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala
index 2eed5253d..57f5cb14d 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala
@@ -100,7 +100,8 @@ class SparkTBinaryFrontendService(
val extraAttributes =
conf.get(KyuubiConf.ENGINE_SPARK_REGISTER_ATTRIBUTES).map { attr =>
attr -> KyuubiSparkUtil.globalSparkContext.getConf.get(attr, "")
}.toMap
- val attributes = extraAttributes ++ Map(KYUUBI_ENGINE_ID ->
KyuubiSparkUtil.engineId)
+ val attributes =
+ super.attributes ++ extraAttributes ++ Map(KYUUBI_ENGINE_ID ->
KyuubiSparkUtil.engineId)
// TODO Support Spark Web UI Enabled SSL
sc.uiWebUrl match {
case Some(url) => attributes ++ Map(KYUUBI_ENGINE_URL ->
url.split("//").last)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
index 9f22dd1f8..fb1385a3a 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala
@@ -36,6 +36,7 @@ object KyuubiReservedKeys {
final val KYUUBI_ENGINE_URL = "kyuubi.engine.url"
final val KYUUBI_ENGINE_SUBMIT_TIME_KEY = "kyuubi.engine.submit.time"
final val KYUUBI_ENGINE_CREDENTIALS_KEY = "kyuubi.engine.credentials"
+ final val KYUUBI_ENGINE_APP_MGR_INFO_KEY = "kyuubi.engine.appMgrInfo"
final val KYUUBI_SESSION_HANDLE_KEY = "kyuubi.session.handle"
final val KYUUBI_SESSION_ALIVE_PROBE = "kyuubi.session.alive.probe"
final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_GUID =
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractFrontendService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractFrontendService.scala
index e2ba06789..bdabaaa5d 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractFrontendService.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractFrontendService.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.service
-import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.service.ServiceState.LATENT
/**
@@ -40,4 +40,8 @@ abstract class AbstractFrontendService(name: String)
discoveryService.foreach(addService)
super.initialize(conf)
}
+
+ override def attributes: Map[String, String] = {
+ conf.getAll.filter(_._1 ==
KyuubiReservedKeys.KYUUBI_ENGINE_APP_MGR_INFO_KEY)
+ }
}
diff --git
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java
index 0f6fbbc47..c61635214 100644
---
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java
+++
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java
@@ -23,8 +23,12 @@ import org.apache.kyuubi.client.api.v1.dto.Engine;
import org.apache.kyuubi.client.api.v1.dto.OperationData;
import org.apache.kyuubi.client.api.v1.dto.ServerData;
import org.apache.kyuubi.client.api.v1.dto.SessionData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class AdminRestApi {
+ private static final Logger LOG =
LoggerFactory.getLogger(AdminRestApi.class);
+
private KyuubiRestClient client;
private static final String API_BASE_PATH = "admin";
@@ -65,13 +69,25 @@ public class AdminRestApi {
return this.getClient().post(path, null, client.getAuthHeader());
}
+ /** This method is deprecated since 1.10 */
+ @Deprecated
public String deleteEngine(
String engineType, String shareLevel, String subdomain, String
hs2ProxyUser) {
+ LOG.warn(
+ "The method `deleteEngine(engineType, shareLevel, subdomain,
hs2ProxyUser)` "
+ + "is deprecated since 1.10.0, using "
+ + "`deleteEngine(engineType, shareLevel, subdomain, hs2ProxyUser,
kill)` instead.");
+ return this.deleteEngine(engineType, shareLevel, subdomain, hs2ProxyUser,
false);
+ }
+
+ public String deleteEngine(
+ String engineType, String shareLevel, String subdomain, String
hs2ProxyUser, boolean kill) {
Map<String, Object> params = new HashMap<>();
params.put("type", engineType);
params.put("sharelevel", shareLevel);
params.put("subdomain", subdomain);
params.put("hive.server2.proxy.user", hs2ProxyUser);
+ params.put("kill", kill);
return this.getClient().delete(API_BASE_PATH + "/engine", params,
client.getAuthHeader());
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
index 23a49c1ae..339df2f78 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
@@ -17,6 +17,13 @@
package org.apache.kyuubi.engine
+import java.nio.charset.StandardCharsets
+import java.util.Base64
+
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.ApplicationState.ApplicationState
@@ -132,8 +139,11 @@ case class ApplicationManagerInfo(
resourceManager: Option[String],
kubernetesInfo: KubernetesInfo = KubernetesInfo())
-object ApplicationManagerInfo {
+object ApplicationManagerInfo extends Logging {
final val DEFAULT_KUBERNETES_NAMESPACE = "default"
+ val mapper: ObjectMapper = new ObjectMapper()
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .registerModule(DefaultScalaModule)
def apply(
resourceManager: Option[String],
@@ -143,4 +153,22 @@ object ApplicationManagerInfo {
resourceManager,
KubernetesInfo(kubernetesContext, kubernetesNamespace))
}
+
+ def serialize(appMgrInfo: ApplicationManagerInfo): String = {
+ Base64.getEncoder.encodeToString(
+ mapper.writeValueAsString(appMgrInfo).getBytes(StandardCharsets.UTF_8))
+ }
+
+ def deserialize(encodedStr: String): ApplicationManagerInfo = {
+ try {
+ val json = new String(
+ Base64.getDecoder.decode(encodedStr.getBytes),
+ StandardCharsets.UTF_8)
+ mapper.readValue(json, classOf[ApplicationManagerInfo])
+ } catch {
+ case _: Throwable =>
+ error(s"Fail to deserialize the encoded string: $encodedStr")
+ ApplicationManagerInfo(None)
+ }
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 566bc18ad..0e21bb9f7 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -29,7 +29,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.StringUtils.containsIgnoreCase
import org.apache.kyuubi._
-import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf.KYUUBI_HOME
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.{JavaUtils, NamedThreadFactory}
@@ -169,6 +169,11 @@ trait ProcBuilder {
@volatile private[kyuubi] var process: Process = _
@volatile private[kyuubi] var processLaunched: Boolean = false
+ // Set engine application manger info conf
+ conf.set(
+ KyuubiReservedKeys.KYUUBI_ENGINE_APP_MGR_INFO_KEY,
+ ApplicationManagerInfo.serialize(appMgrInfo()))
+
private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT)
val currentTime = System.currentTimeMillis()
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
index 2e61e6b08..0f1544fe6 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
@@ -31,15 +31,16 @@ import org.apache.commons.lang3.StringUtils
import org.apache.kyuubi.{KYUUBI_VERSION, Logging}
import org.apache.kyuubi.client.api.v1.dto._
-import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.ApplicationManagerInfo
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE
import org.apache.kyuubi.ha.client.{DiscoveryPaths, ServiceNodeInfo}
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
import org.apache.kyuubi.operation.{KyuubiOperation, OperationHandle}
import org.apache.kyuubi.server.KyuubiServer
import org.apache.kyuubi.server.api.{ApiRequestContext, ApiUtils}
-import org.apache.kyuubi.session.{KyuubiSession, SessionHandle}
+import org.apache.kyuubi.session.{KyuubiSession, KyuubiSessionManager,
SessionHandle}
@Tag(name = "Admin")
@Produces(Array(MediaType.APPLICATION_JSON))
@@ -277,7 +278,8 @@ private[v1] class AdminResource extends ApiRequestContext
with Logging {
@QueryParam("sharelevel") shareLevel: String,
@QueryParam("subdomain") subdomain: String,
@QueryParam("proxyUser") kyuubiProxyUser: String,
- @QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): Response =
{
+ @QueryParam("hive.server2.proxy.user") hs2ProxyUser: String,
+ @QueryParam("kill") @DefaultValue("false") kill: Boolean): Response = {
val activeProxyUser = Option(kyuubiProxyUser).getOrElse(hs2ProxyUser)
val userName = if (fe.isAdministrator(fe.getRealUser())) {
Option(activeProxyUser).getOrElse(fe.getRealUser())
@@ -286,24 +288,38 @@ private[v1] class AdminResource extends ApiRequestContext
with Logging {
}
val engine = normalizeEngineInfo(userName, engineType, shareLevel,
subdomain, "default")
val engineSpace = calculateEngineSpace(engine)
+ val responseMsgBuilder = new StringBuilder()
withDiscoveryClient(fe.getConf) { discoveryClient =>
- val engineNodes = discoveryClient.getChildren(engineSpace)
- engineNodes.foreach { node =>
- val nodePath = s"$engineSpace/$node"
+ val engineNodes = discoveryClient.getServiceNodesInfo(engineSpace,
silent = true)
+ engineNodes.foreach { engineNode =>
+ val nodePath = s"$engineSpace/${engineNode.nodeName}"
+ val engineRefId = engineNode.engineRefId.orNull
info(s"Deleting engine node:$nodePath")
try {
discoveryClient.delete(nodePath)
+ responseMsgBuilder
+ .append(s"Engine $engineSpace refId=$engineRefId is deleted
successfully.")
} catch {
case e: Exception =>
error(s"Failed to delete engine node:$nodePath", e)
throw new NotFoundException(s"Failed to delete engine
node:$nodePath," +
s"${e.getMessage}")
}
+
+ if (kill && engineRefId != null) {
+ val appMgrInfo =
+
engineNode.attributes.get(KyuubiReservedKeys.KYUUBI_ENGINE_APP_MGR_INFO_KEY)
+
.map(ApplicationManagerInfo.deserialize).getOrElse(ApplicationManagerInfo(None))
+ val killResponse =
fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
+ .applicationManager.killApplication(appMgrInfo, engineRefId)
+ responseMsgBuilder
+ .append(s"\nKilled engine with $appMgrInfo/$engineRefId:
$killResponse")
+ }
}
}
- Response.ok(s"Engine $engineSpace is deleted successfully.").build()
+ Response.ok(responseMsgBuilder.toString()).build()
}
@ApiResponse(
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 8786ef798..7895ff83f 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -27,7 +27,9 @@ import scala.util.matching.Regex
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_APPLICATION_JARS,
ENGINE_FLINK_EXTRA_CLASSPATH, ENGINE_FLINK_JAVA_OPTIONS, ENGINE_FLINK_MEMORY}
-import
org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY
+import
org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_APP_MGR_INFO_KEY,
KYUUBI_ENGINE_CREDENTIALS_KEY}
+import org.apache.kyuubi.engine.ApplicationManagerInfo
+import org.apache.kyuubi.engine.ApplicationManagerInfo.serialize
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._
class FlinkProcessBuilderSuite extends KyuubiFunSuite {
@@ -39,6 +41,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
ENGINE_FLINK_JAVA_OPTIONS,
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005")
.set(KYUUBI_ENGINE_CREDENTIALS_KEY, "should-not-be-used")
+ .set(KYUUBI_ENGINE_APP_MGR_INFO_KEY,
serialize(ApplicationManagerInfo(None)))
private def applicationModeConf = KyuubiConf()
.set("flink.execution.target", "yarn-application")
@@ -46,6 +49,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
.set(APP_KEY, "kyuubi_connection_flink_paul")
.set("kyuubi.on", "off")
.set(KYUUBI_ENGINE_CREDENTIALS_KEY, "should-not-be-used")
+ .set(KYUUBI_ENGINE_APP_MGR_INFO_KEY,
serialize(ApplicationManagerInfo(None)))
private val tempFlinkHome = Files.createTempDirectory("flink-home").toFile
private val tempOpt =
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
index 2360dea60..c5aa3fbd3 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
@@ -33,7 +33,7 @@ import org.apache.kyuubi.client.api.v1.dto._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import
org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY
-import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState,
EngineRef, KyuubiApplicationManager}
+import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState,
EngineRef, KubernetesInfo, KyuubiApplicationManager}
import org.apache.kyuubi.engine.EngineType.SPARK_SQL
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, USER}
import org.apache.kyuubi.ha.HighAvailabilityConf
@@ -301,6 +301,54 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
assert(!operations.map(op =>
op.getIdentifier).contains(operation.identifier.toString))
}
+ test("force to kill engine - user share level") {
+ val id = UUID.randomUUID().toString
+ conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
+ conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
+ conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+ conf.set(HighAvailabilityConf.HA_NAMESPACE, "kyuubi_test")
+ conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop")
+
+ val engine =
+ new EngineRef(
+ conf.clone,
+ Utils.currentUser,
+ true,
+ PluginLoader.loadGroupProvider(conf),
+ id,
+ null)
+
+ val engineSpace = DiscoveryPaths.makePath(
+ s"kyuubi_test_${KYUUBI_VERSION}_USER_SPARK_SQL",
+ Utils.currentUser,
+ "default")
+
+ withDiscoveryClient(conf) { client =>
+ engine.getOrCreate(client)
+ assert(client.pathExists(engineSpace))
+ assert(client.getChildren(engineSpace).size == 1)
+
+ val response = webTarget.path("api/v1/admin/engine")
+ .queryParam("sharelevel", "USER")
+ .queryParam("type", "spark_sql")
+ .queryParam("kill", "true")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .header(AUTHORIZATION_HEADER,
HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser))
+ .delete()
+
+ assert(response.getStatus === 200)
+ eventually(timeout(5.seconds), interval(100.milliseconds)) {
+ assert(client.getChildren(engineSpace).isEmpty, s"refId same with
$id?")
+ }
+
+ eventually(timeout(30.seconds), interval(100.milliseconds)) {
+ val appMgrInfo = ApplicationManagerInfo(None, KubernetesInfo(None,
None))
+ assert(engineMgr.getApplicationInfo(appMgrInfo, id)
+ .exists(_.state == ApplicationState.NOT_FOUND))
+ }
+ }
+ }
+
test("delete engine - user share level") {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminCtlSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminCtlSuite.scala
index 0c262d238..c11410da8 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminCtlSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminCtlSuite.scala
@@ -93,7 +93,7 @@ class AdminCtlSuite extends RestClientTestHelper with
TestPrematureExit {
ldapUserPasswd)
testPrematureExitForAdminControlCli(
args,
- s"Engine ${engineSpace} is deleted successfully.")
+ s"Engine ${engineSpace} refId=${id} is deleted successfully.")
args = Array(
"list",
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminRestApiSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminRestApiSuite.scala
index 43eb06482..9aaefd273 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminRestApiSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminRestApiSuite.scala
@@ -85,8 +85,9 @@ class AdminRestApiSuite extends RestClientTestHelper {
assert(engines(0).getNamespace == engineSpace)
assert(engines(0).getAttributes.get(KyuubiReservedKeys.KYUUBI_ENGINE_ID).startsWith("local-"))
- val result = adminRestApi.deleteEngine("spark_sql", "user", "default", "")
- assert(result == s"Engine ${engineSpace} is deleted successfully.")
+ // kill engine to release memory quickly
+ val result = adminRestApi.deleteEngine("spark_sql", "user", "default", "",
true)
+ assert(result startsWith s"Engine ${engineSpace} refId=${id} is deleted
successfully.")
engines = adminRestApi.listEngines("spark_sql", "user", "default",
"").asScala
assert(engines.isEmpty)