This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 5de1a19f3 [KYUUBI #3626] AdminResources rest api support to list
current engines
5de1a19f3 is described below
commit 5de1a19f39ae9aef71ebf2e4c2001f34f33ab92b
Author: Tianlin Liao <[email protected]>
AuthorDate: Tue Oct 18 15:56:25 2022 +0800
[KYUUBI #3626] AdminResources rest api support to list current engines
### _Why are the changes needed?_
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #3629 from lightning-L/list-engine.
Closes #3626
f63faa53 [Tianlin Liao] fix test case
3d9af3a1 [Tianlin Liao] do not need LIstEnginesResponse
718c115f [Tianlin Liao] refactor
defc9f52 [Tianlin Liao] refactor getUserName and getEngineSpace method
e5015508 [Tianlin Liao] typo and grammer fix
84256f68 [Tianlin Liao] [KYUUBI #3626] AdminResources rest api support to
list current engines
Authored-by: Tianlin Liao <[email protected]>
Signed-off-by: Fei Wang <[email protected]>
---
.../apache/kyuubi/client/api/v1/dto/Engine.java | 121 +++++++++++++
.../kyuubi/server/KyuubiRestFrontendService.scala | 13 +-
.../kyuubi/server/api/v1/AdminResource.scala | 113 ++++++++----
.../kyuubi/server/api/v1/BatchesResource.scala | 16 +-
.../kyuubi/server/api/v1/AdminResourceSuite.scala | 191 ++++++++++++++++++++-
.../server/api/v1/BatchesResourceSuite.scala | 2 +-
6 files changed, 403 insertions(+), 53 deletions(-)
diff --git
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Engine.java
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Engine.java
new file mode 100644
index 000000000..0f4d70adc
--- /dev/null
+++
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Engine.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.client.api.v1.dto;
+
+import java.util.Objects;
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+public class Engine {
+
+ private String version;
+ private String user;
+ private String engineType;
+ private String sharelevel;
+ private String subdomain;
+ private String instance;
+
+ public Engine() {}
+
+ public Engine(
+ String version,
+ String user,
+ String engineType,
+ String sharelevel,
+ String subdomain,
+ String instance) {
+ this.version = version;
+ this.user = user;
+ this.engineType = engineType;
+ this.sharelevel = sharelevel;
+ this.subdomain = subdomain;
+ this.instance = instance;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getEngineType() {
+ return engineType;
+ }
+
+ public void setEngineType(String engineType) {
+ this.engineType = engineType;
+ }
+
+ public String getSharelevel() {
+ return sharelevel;
+ }
+
+ public void setSharelevel(String sharelevel) {
+ this.sharelevel = sharelevel;
+ }
+
+ public String getSubdomain() {
+ return subdomain;
+ }
+
+ public void setSubdomain(String subdomain) {
+ this.subdomain = subdomain;
+ }
+
+ public String getInstance() {
+ return instance;
+ }
+
+ public void setInstance(String instance) {
+ this.instance = instance;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Engine that = (Engine) o;
+ return Objects.equals(getVersion(), that.getVersion())
+ && Objects.equals(getUser(), that.getUser())
+ && Objects.equals(getEngineType(), that.getEngineType())
+ && Objects.equals(getSharelevel(), that.getSharelevel())
+ && Objects.equals(getSubdomain(), that.getSubdomain())
+ && Objects.equals(getInstance(), that.getInstance());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ getVersion(), getUser(), getEngineType(), getSharelevel(),
getSubdomain(), getInstance());
+ }
+
+ @Override
+ public String toString() {
+ return ReflectionToStringBuilder.toString(this, ToStringStyle.JSON_STYLE);
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
index 8414a91eb..3d0ee3d52 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
@@ -21,6 +21,7 @@ import java.util.EnumSet
import java.util.concurrent.{Future, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import javax.servlet.DispatcherType
+import javax.ws.rs.NotAllowedException
import com.google.common.annotations.VisibleForTesting
import org.apache.hadoop.conf.Configuration
@@ -172,12 +173,22 @@ class KyuubiRestFrontendService(override val serverable:
Serverable)
super.stop()
}
+ def getUserName(hs2ProxyUser: String): String = {
+ val sessionConf = Option(hs2ProxyUser).filter(_.nonEmpty).map(proxyUser =>
+ Map(KyuubiAuthenticationFactory.HS2_PROXY_USER ->
proxyUser)).getOrElse(Map())
+ getUserName(sessionConf)
+ }
+
def getUserName(sessionConf: Map[String, String]): String = {
// using the remote ip address instead of that in proxy http header for
authentication
val ipAddress = AuthenticationFilter.getUserIpAddress
val realUser: String = ServiceUtils.getShortName(
Option(AuthenticationFilter.getUserName).filter(_.nonEmpty).getOrElse("anonymous"))
- getProxyUser(sessionConf, ipAddress, realUser)
+ try {
+ getProxyUser(sessionConf, ipAddress, realUser)
+ } catch {
+ case t: Throwable => throw new NotAllowedException(t.getMessage)
+ }
}
def getIpAddress: String = {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
index 81cbfa667..674b475a2 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
@@ -20,19 +20,21 @@ package org.apache.kyuubi.server.api.v1
import javax.ws.rs._
import javax.ws.rs.core.{MediaType, Response}
+import scala.collection.mutable.ListBuffer
+
import io.swagger.v3.oas.annotations.media.Content
import io.swagger.v3.oas.annotations.responses.ApiResponse
import io.swagger.v3.oas.annotations.tags.Tag
import org.apache.kyuubi.{KYUUBI_VERSION, Logging, Utils}
+import org.apache.kyuubi.client.api.v1.dto.Engine
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE
+import org.apache.kyuubi.ha.client.{DiscoveryPaths, ServiceNodeInfo}
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
-import org.apache.kyuubi.ha.client.DiscoveryPaths
import org.apache.kyuubi.server.KyuubiServer
import org.apache.kyuubi.server.api.ApiRequestContext
-import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
@Tag(name = "Admin")
@Produces(Array(MediaType.APPLICATION_JSON))
@@ -48,7 +50,7 @@ private[v1] class AdminResource extends ApiRequestContext
with Logging {
@POST
@Path("refresh/hadoop_conf")
def refreshFrontendHadoopConf(): Response = {
- val userName = fe.getUserName(Map.empty)
+ val userName = fe.getUserName(Map.empty[String, String])
val ipAddress = fe.getIpAddress
info(s"Receive refresh Kyuubi server hadoop conf request from
$userName/$ipAddress")
if (!userName.equals(administrator)) {
@@ -72,33 +74,9 @@ private[v1] class AdminResource extends ApiRequestContext
with Logging {
@QueryParam("sharelevel") shareLevel: String,
@QueryParam("subdomain") subdomain: String,
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): Response =
{
- val sessionConf = Option(hs2ProxyUser).filter(_.nonEmpty).map(proxyUser =>
- Map(KyuubiAuthenticationFactory.HS2_PROXY_USER ->
proxyUser)).getOrElse(Map())
-
- var userName: String = null
- try {
- userName = fe.getUserName(sessionConf)
- } catch {
- case t: Throwable =>
- throw new NotAllowedException(t.getMessage)
- }
-
- // use default value from kyuubi conf when param is not provided
- val clonedConf: KyuubiConf = fe.getConf.clone
- Option(engineType).foreach(clonedConf.set(ENGINE_TYPE, _))
- Option(subdomain).filter(_.nonEmpty)
- .foreach(_ => clonedConf.set(ENGINE_SHARE_LEVEL_SUBDOMAIN,
Option(subdomain)))
-
Option(shareLevel).filter(_.nonEmpty).foreach(clonedConf.set(ENGINE_SHARE_LEVEL,
_))
-
- val normalizedEngineType = clonedConf.get(ENGINE_TYPE)
- val engineSubdomain =
clonedConf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN).getOrElse("default")
- val engineShareLevel = clonedConf.get(ENGINE_SHARE_LEVEL)
-
- val engineSpace = DiscoveryPaths.makePath(
- s"${fe.getConf.get(HA_NAMESPACE)}_${KYUUBI_VERSION}_" +
- s"${engineShareLevel}_$normalizedEngineType",
- userName,
- Array(engineSubdomain))
+ val userName = fe.getUserName(hs2ProxyUser)
+ val engine = getEngine(userName, engineType, shareLevel, subdomain,
"default")
+ val engineSpace = getEngineSpace(engine)
withDiscoveryClient(fe.getConf) { discoveryClient =>
val engineNodes = discoveryClient.getChildren(engineSpace)
@@ -118,4 +96,79 @@ private[v1] class AdminResource extends ApiRequestContext
with Logging {
Response.ok(s"Engine $engineSpace is deleted successfully.").build()
}
+
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.APPLICATION_JSON)),
+ description = "list kyuubi engines")
+ @GET
+ @Path("engine")
+ def listEngines(
+ @QueryParam("type") engineType: String,
+ @QueryParam("sharelevel") shareLevel: String,
+ @QueryParam("subdomain") subdomain: String,
+ @QueryParam("hive.server2.proxy.user") hs2ProxyUser: String):
Seq[Engine] = {
+ val userName = fe.getUserName(hs2ProxyUser)
+ val engine = getEngine(userName, engineType, shareLevel, subdomain, "")
+ val engineSpace = getEngineSpace(engine)
+
+ var engineNodes = ListBuffer[ServiceNodeInfo]()
+ Option(subdomain).filter(_.nonEmpty) match {
+ case Some(_) =>
+ withDiscoveryClient(fe.getConf) { discoveryClient =>
+ info(s"Listing engine nodes for $engineSpace")
+ engineNodes ++= discoveryClient.getServiceNodesInfo(engineSpace)
+ }
+ case None =>
+ withDiscoveryClient(fe.getConf) { discoveryClient =>
+ discoveryClient.getChildren(engineSpace).map { child =>
+ info(s"Listing engine nodes for $engineSpace/$child")
+ engineNodes ++=
discoveryClient.getServiceNodesInfo(s"$engineSpace/$child")
+ }
+ }
+ }
+ engineNodes.map(node =>
+ new Engine(
+ engine.getVersion,
+ engine.getUser,
+ engine.getEngineType,
+ engine.getSharelevel,
+ node.namespace.split("/").last,
+ node.instance))
+ }
+
+ private def getEngine(
+ userName: String,
+ engineType: String,
+ shareLevel: String,
+ subdomain: String,
+ subdomainDefault: String): Engine = {
+ // use default value from kyuubi conf when param is not provided
+ val clonedConf: KyuubiConf = fe.getConf.clone
+ Option(engineType).foreach(clonedConf.set(ENGINE_TYPE, _))
+ Option(subdomain).filter(_.nonEmpty)
+ .foreach(_ => clonedConf.set(ENGINE_SHARE_LEVEL_SUBDOMAIN,
Option(subdomain)))
+
Option(shareLevel).filter(_.nonEmpty).foreach(clonedConf.set(ENGINE_SHARE_LEVEL,
_))
+
+ val normalizedEngineType = clonedConf.get(ENGINE_TYPE)
+ val engineSubdomain =
clonedConf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN).getOrElse(subdomainDefault)
+ val engineShareLevel = clonedConf.get(ENGINE_SHARE_LEVEL)
+
+ new Engine(
+ KYUUBI_VERSION,
+ userName,
+ normalizedEngineType,
+ engineShareLevel,
+ engineSubdomain,
+ null)
+ }
+
+ private def getEngineSpace(engine: Engine): String = {
+ val serverSpace = fe.getConf.get(HA_NAMESPACE)
+ DiscoveryPaths.makePath(
+
s"${serverSpace}_${engine.getVersion}_${engine.getSharelevel}_${engine.getEngineType}",
+ engine.getUser,
+ Array(engine.getSubdomain))
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index e815ad73b..ceeee3d7f 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -40,7 +40,6 @@ import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.server.api.v1.BatchesResource._
import org.apache.kyuubi.server.metadata.MetadataManager
import org.apache.kyuubi.server.metadata.api.Metadata
-import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
import org.apache.kyuubi.session.{KyuubiBatchSessionImpl,
KyuubiSessionManager, SessionHandle}
@Tag(name = "Batch")
@@ -188,7 +187,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
@GET
@Path("{batchId}")
def batchInfo(@PathParam("batchId") batchId: String): Batch = {
- val userName = fe.getUserName(Map.empty)
+ val userName = fe.getUserName(Map.empty[String, String])
val sessionHandle = formatSessionHandle(batchId)
Option(sessionManager.getBatchSessionImpl(sessionHandle)).map {
batchSession =>
buildBatch(batchSession)
@@ -265,7 +264,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
@PathParam("batchId") batchId: String,
@QueryParam("from") @DefaultValue("-1") from: Int,
@QueryParam("size") size: Int): OperationLog = {
- val userName = fe.getUserName(Map.empty)
+ val userName = fe.getUserName(Map.empty[String, String])
val sessionHandle = formatSessionHandle(batchId)
Option(sessionManager.getBatchSessionImpl(sessionHandle)).map {
batchSession =>
try {
@@ -310,16 +309,7 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String):
CloseBatchResponse = {
val sessionHandle = formatSessionHandle(batchId)
- val sessionConf = Option(hs2ProxyUser).filter(_.nonEmpty).map(proxyUser =>
- Map(KyuubiAuthenticationFactory.HS2_PROXY_USER ->
proxyUser)).getOrElse(Map())
-
- var userName: String = null
- try {
- userName = fe.getUserName(sessionConf)
- } catch {
- case t: Throwable =>
- throw new NotAllowedException(t.getMessage)
- }
+ val userName = fe.getUserName(hs2ProxyUser)
Option(sessionManager.getBatchSessionImpl(sessionHandle)).map {
batchSession =>
if (userName != batchSession.user) {
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
index 5a2f8579f..f97ff5c1a 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
@@ -18,19 +18,36 @@
package org.apache.kyuubi.server.api.v1
import java.util.{Base64, UUID}
-import javax.ws.rs.core.MediaType
+import javax.ws.rs.core.{GenericType, MediaType}
+
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite,
RestFrontendTestHelper, Utils}
+import org.apache.kyuubi.client.api.v1.dto.Engine
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.EngineRef
+import org.apache.kyuubi.engine.{ApplicationState, EngineRef,
KyuubiApplicationManager}
import org.apache.kyuubi.engine.EngineType.SPARK_SQL
-import org.apache.kyuubi.engine.ShareLevel.USER
+import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, USER}
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
import org.apache.kyuubi.ha.client.DiscoveryPaths
import
org.apache.kyuubi.server.http.authentication.AuthenticationHandler.AUTHORIZATION_HEADER
class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
+
+ private val engineMgr = new KyuubiApplicationManager()
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ engineMgr.initialize(KyuubiConf())
+ engineMgr.start()
+ }
+
+ override def afterAll(): Unit = {
+ engineMgr.stop()
+ super.afterAll()
+ }
+
test("refresh Hadoop configuration of the kyuubi server") {
var response = webTarget.path("api/v1/admin/refresh/hadoop_conf")
.request()
@@ -49,7 +66,7 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
assert(200 == response.getStatus)
}
- test("delete engine") {
+ test("delete engine - user share level") {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
@@ -67,8 +84,7 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
engine.getOrCreate(client)
assert(client.pathExists(engineSpace))
- var child = client.getChildren(engineSpace)
- assert(child.size == 1)
+ assert(client.getChildren(engineSpace).size == 1)
val adminUser = Utils.currentUser
val encodeAuthorization = new String(
@@ -84,8 +100,167 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
assert(200 == response.getStatus)
assert(client.pathExists(engineSpace))
- child = client.getChildren(engineSpace)
- assert(child.size == 0)
+ assert(client.getChildren(engineSpace).size == 0)
+
+ // kill the engine application
+ engineMgr.killApplication(None, id)
+ eventually(timeout(30.seconds), interval(100.milliseconds)) {
+ assert(engineMgr.getApplicationInfo(None, id).exists(_.state ==
ApplicationState.NOT_FOUND))
+ }
+ }
+ }
+
+ test("delete engine - connection share level") {
+ conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, CONNECTION.toString)
+ conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
+ conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+ conf.set(HighAvailabilityConf.HA_NAMESPACE, "kyuubi_test")
+ conf.set(KyuubiConf.ENGINE_IDLE_TIMEOUT, 180000L)
+
+ val id = UUID.randomUUID().toString
+ val engine = new EngineRef(conf.clone, Utils.currentUser, id, null)
+ val engineSpace = DiscoveryPaths.makePath(
+ s"kyuubi_test_${KYUUBI_VERSION}_CONNECTION_SPARK_SQL",
+ Utils.currentUser,
+ Array(id))
+
+ withDiscoveryClient(conf) { client =>
+ engine.getOrCreate(client)
+
+ assert(client.pathExists(engineSpace))
+ assert(client.getChildren(engineSpace).size == 1)
+
+ val adminUser = Utils.currentUser
+ val encodeAuthorization = new String(
+ Base64.getEncoder.encode(
+ s"$adminUser:".getBytes()),
+ "UTF-8")
+ val response = webTarget.path("api/v1/admin/engine")
+ .queryParam("sharelevel", "connection")
+ .queryParam("type", "spark_sql")
+ .queryParam("subdomain", id)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
+ .delete()
+
+ assert(200 == response.getStatus)
+ }
+ }
+
+ test("list engine - user share level") {
+ val id = UUID.randomUUID().toString
+ conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
+ conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
+ conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+ conf.set(HighAvailabilityConf.HA_NAMESPACE, "kyuubi_test")
+ conf.set(KyuubiConf.ENGINE_IDLE_TIMEOUT, 180000L)
+ val engine = new EngineRef(conf.clone, Utils.currentUser, id, null)
+
+ val engineSpace = DiscoveryPaths.makePath(
+ s"kyuubi_test_${KYUUBI_VERSION}_USER_SPARK_SQL",
+ Utils.currentUser,
+ Array(""))
+
+ withDiscoveryClient(conf) { client =>
+ engine.getOrCreate(client)
+
+ assert(client.pathExists(engineSpace))
+ assert(client.getChildren(engineSpace).size == 1)
+
+ val adminUser = Utils.currentUser
+ val encodeAuthorization = new String(
+ Base64.getEncoder.encode(
+ s"$adminUser:".getBytes()),
+ "UTF-8")
+ val response = webTarget.path("api/v1/admin/engine")
+ .queryParam("type", "spark_sql")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
+ .get
+
+ assert(200 == response.getStatus)
+ val engines = response.readEntity(new GenericType[Seq[Engine]]() {})
+ assert(engines.size == 1)
+ assert(engines(0).getEngineType == "SPARK_SQL")
+ assert(engines(0).getSharelevel == "USER")
+ assert(engines(0).getSubdomain == "default")
+
+ // kill the engine application
+ engineMgr.killApplication(None, id)
+ eventually(timeout(30.seconds), interval(100.milliseconds)) {
+ assert(engineMgr.getApplicationInfo(None, id).exists(_.state ==
ApplicationState.NOT_FOUND))
+ }
+ }
+ }
+
+ test("list engine - connection share level") {
+ conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, CONNECTION.toString)
+ conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
+ conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+ conf.set(HighAvailabilityConf.HA_NAMESPACE, "kyuubi_test")
+ conf.set(KyuubiConf.ENGINE_IDLE_TIMEOUT, 180000L)
+
+ val engineSpace = DiscoveryPaths.makePath(
+ s"kyuubi_test_${KYUUBI_VERSION}_CONNECTION_SPARK_SQL",
+ Utils.currentUser,
+ Array(""))
+
+ val id1 = UUID.randomUUID().toString
+ val engine1 = new EngineRef(conf.clone, Utils.currentUser, id1, null)
+ val engineSpace1 = DiscoveryPaths.makePath(
+ s"kyuubi_test_${KYUUBI_VERSION}_CONNECTION_SPARK_SQL",
+ Utils.currentUser,
+ Array(id1))
+
+ val id2 = UUID.randomUUID().toString
+ val engine2 = new EngineRef(conf.clone, Utils.currentUser, id2, null)
+ val engineSpace2 = DiscoveryPaths.makePath(
+ s"kyuubi_test_${KYUUBI_VERSION}_CONNECTION_SPARK_SQL",
+ Utils.currentUser,
+ Array(id2))
+
+ withDiscoveryClient(conf) { client =>
+ engine1.getOrCreate(client)
+ engine2.getOrCreate(client)
+
+ assert(client.pathExists(engineSpace))
+ assert(client.getChildren(engineSpace).size == 2)
+ assert(client.pathExists(engineSpace1))
+ assert(client.pathExists(engineSpace2))
+
+ val adminUser = Utils.currentUser
+ val encodeAuthorization = new String(
+ Base64.getEncoder.encode(
+ s"$adminUser:".getBytes()),
+ "UTF-8")
+ val response = webTarget.path("api/v1/admin/engine")
+ .queryParam("type", "spark_sql")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
+ .get
+ assert(200 == response.getStatus)
+ val result = response.readEntity(new GenericType[Seq[Engine]]() {})
+ assert(result.size == 2)
+
+ val response1 = webTarget.path("api/v1/admin/engine")
+ .queryParam("type", "spark_sql")
+ .queryParam("subdomain", id1)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
+ .get
+ assert(200 == response1.getStatus)
+ val result1 = response1.readEntity(new GenericType[Seq[Engine]]() {})
+ assert(result1.size == 1)
+
+ // kill the engine application
+ engineMgr.killApplication(None, id1)
+ engineMgr.killApplication(None, id2)
+ eventually(timeout(30.seconds), interval(100.milliseconds)) {
+ assert(engineMgr.getApplicationInfo(None, id1)
+ .exists(_.state == ApplicationState.NOT_FOUND))
+ assert(engineMgr.getApplicationInfo(None, id2)
+ .exists(_.state == ApplicationState.NOT_FOUND))
+ }
}
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index efc53375d..30ed1e8a1 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -82,7 +82,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper wi
val proxyUserResponse = webTarget.path("api/v1/batches")
.request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.entity(proxyUserRequest, MediaType.APPLICATION_JSON_TYPE))
- assert(500 == proxyUserResponse.getStatus)
+ assert(405 == proxyUserResponse.getStatus)
var getBatchResponse = webTarget.path(s"api/v1/batches/${batch.getId()}")
.request(MediaType.APPLICATION_JSON_TYPE)