This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new f6331a2a0 [KYUUBI #3653][REST] AdminResource add list kyuubi server api
f6331a2a0 is described below
commit f6331a2a0fc0650ba0970d58f90a7a7c8e908095
Author: zwangsheng <[email protected]>
AuthorDate: Tue Apr 18 13:44:23 2023 +0800
[KYUUBI #3653][REST] AdminResource add list kyuubi server api
### _Why are the changes needed?_
Add List Kyuubi Server Api for `AdminResource`
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4670 from zwangsheng/KYUUBI_3653.
Closes #3653
b91a6c617 [zwangsheng] fxi
4271d0fd0 [zwangsheng] fix comments
e14f8cd55 [zwangsheng] [KYUUBI #3653][REST] AdminResource add list server
api
Authored-by: zwangsheng <[email protected]>
Signed-off-by: fwang12 <[email protected]>
---
.../kyuubi/client/api/v1/dto/ServerData.java | 129 +++++++++++++++++++++
.../org/apache/kyuubi/server/api/ApiUtils.scala | 14 ++-
.../kyuubi/server/api/v1/AdminResource.scala | 31 ++++-
.../kyuubi/server/api/v1/AdminResourceSuite.scala | 104 ++++++++---------
4 files changed, 219 insertions(+), 59 deletions(-)
diff --git
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ServerData.java
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ServerData.java
new file mode 100644
index 000000000..d64d43a72
--- /dev/null
+++
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ServerData.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.client.api.v1.dto;
+
+import java.util.Map;
+import java.util.Objects;
+
+public class ServerData {
+ private String nodeName;
+ private String namespace;
+ private String instance;
+ private String host;
+ private int port;
+ private Map<String, String> attributes;
+ private String status;
+
+ public ServerData(
+ String nodeName,
+ String namespace,
+ String instance,
+ String host,
+ int port,
+ Map<String, String> attributes,
+ String status) {
+ this.nodeName = nodeName;
+ this.namespace = namespace;
+ this.instance = instance;
+ this.host = host;
+ this.port = port;
+ this.attributes = attributes;
+ this.status = status;
+ }
+
+ public String getNodeName() {
+ return nodeName;
+ }
+
+ public ServerData setNodeName(String nodeName) {
+ this.nodeName = nodeName;
+ return this;
+ }
+
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public ServerData setNamespace(String namespace) {
+ this.namespace = namespace;
+ return this;
+ }
+
+ public String getInstance() {
+ return instance;
+ }
+
+ public ServerData setInstance(String instance) {
+ this.instance = instance;
+ return this;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public ServerData setHost(String host) {
+ this.host = host;
+ return this;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public ServerData setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ public Map<String, String> getAttributes() {
+ return attributes;
+ }
+
+ public ServerData setAttributes(Map<String, String> attributes) {
+ this.attributes = attributes;
+ return this;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public ServerData setStatus(String status) {
+ this.status = status;
+ return this;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(nodeName, namespace, instance, port, attributes,
status);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null || getClass() != obj.getClass()) return false;
+ ServerData server = (ServerData) obj;
+ return port == server.port
+ && Objects.equals(nodeName, server.nodeName)
+ && Objects.equals(namespace, server.namespace)
+ && Objects.equals(instance, server.instance)
+ && Objects.equals(host, server.host)
+ && Objects.equals(status, server.status);
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala
index ebbf04c90..1f2cb309b 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala
@@ -20,8 +20,9 @@ package org.apache.kyuubi.server.api
import scala.collection.JavaConverters._
import org.apache.kyuubi.Utils
-import org.apache.kyuubi.client.api.v1.dto.{OperationData, SessionData}
+import org.apache.kyuubi.client.api.v1.dto.{OperationData, ServerData,
SessionData}
import org.apache.kyuubi.events.KyuubiOperationEvent
+import org.apache.kyuubi.ha.client.ServiceNodeInfo
import org.apache.kyuubi.operation.KyuubiOperation
import org.apache.kyuubi.session.KyuubiSession
@@ -58,4 +59,15 @@ object ApiUtils {
opEvent.sessionType,
operation.getSession.asInstanceOf[KyuubiSession].connectionUrl)
}
+
+ def serverData(nodeInfo: ServiceNodeInfo): ServerData = {
+ new ServerData(
+ nodeInfo.nodeName,
+ nodeInfo.namespace,
+ nodeInfo.instance,
+ nodeInfo.host,
+ nodeInfo.port,
+ nodeInfo.attributes.asJava,
+ "Running")
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
index 0d8b31b2c..113660a41 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
@@ -31,7 +31,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.zookeeper.KeeperException.NoNodeException
import org.apache.kyuubi.{KYUUBI_VERSION, Logging, Utils}
-import org.apache.kyuubi.client.api.v1.dto.{Engine, OperationData, SessionData}
+import org.apache.kyuubi.client.api.v1.dto.{Engine, OperationData, ServerData,
SessionData}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE
@@ -296,6 +296,35 @@ private[v1] class AdminResource extends ApiRequestContext
with Logging {
node.attributes.asJava))
}
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(
+ new Content(
+ mediaType = MediaType.APPLICATION_JSON,
+ array = new ArraySchema(schema = new Schema(implementation =
+ classOf[OperationData])))),
+ description = "list all live kyuubi servers")
+ @GET
+ @Path("server")
+ def listServers(): Seq[ServerData] = {
+ val userName = fe.getSessionUser(Map.empty[String, String])
+ val ipAddress = fe.getIpAddress
+ info(s"Received list all live kyuubi servers request from
$userName/$ipAddress")
+ if (!isAdministrator(userName)) {
+ throw new NotAllowedException(
+ s"$userName is not allowed to list all live kyuubi servers")
+ }
+ val kyuubiConf = fe.getConf
+ val servers = ListBuffer[ServerData]()
+ val serverSpec = DiscoveryPaths.makePath(null,
kyuubiConf.get(HA_NAMESPACE))
+ withDiscoveryClient(kyuubiConf) { discoveryClient =>
+ discoveryClient.getServiceNodesInfo(serverSpec).map(nodeInfo => {
+ servers += ApiUtils.serverData(nodeInfo)
+ })
+ }
+ servers
+ }
+
private def getEngine(
userName: String,
engineType: String,
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
index a10994d7e..b7650627e 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
@@ -17,6 +17,7 @@
package org.apache.kyuubi.server.api.v1
+import java.nio.charset.StandardCharsets
import java.util.{Base64, UUID}
import javax.ws.rs.client.Entity
import javax.ws.rs.core.{GenericType, MediaType}
@@ -24,19 +25,22 @@ import javax.ws.rs.core.{GenericType, MediaType}
import scala.collection.JavaConverters._
import
org.apache.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2
+import org.mockito.Mockito.lenient
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+import org.scalatestplus.mockito.MockitoSugar.mock
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite,
RestFrontendTestHelper, Utils}
-import org.apache.kyuubi.client.api.v1.dto.{Engine, OperationData,
SessionData, SessionHandle, SessionOpenRequest}
+import org.apache.kyuubi.client.api.v1.dto.{Engine, OperationData, ServerData,
SessionData, SessionHandle, SessionOpenRequest}
import org.apache.kyuubi.config.KyuubiConf
import
org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY
import org.apache.kyuubi.engine.{ApplicationState, EngineRef,
KyuubiApplicationManager}
import org.apache.kyuubi.engine.EngineType.SPARK_SQL
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, USER}
import org.apache.kyuubi.ha.HighAvailabilityConf
+import org.apache.kyuubi.ha.client.{DiscoveryPaths, ServiceDiscovery}
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
-import org.apache.kyuubi.ha.client.DiscoveryPaths
import org.apache.kyuubi.plugin.PluginLoader
+import org.apache.kyuubi.server.KyuubiRestFrontendService
import
org.apache.kyuubi.server.http.authentication.AuthenticationHandler.AUTHORIZATION_HEADER
class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
@@ -46,6 +50,13 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
override protected lazy val conf: KyuubiConf = KyuubiConf()
.set(KyuubiConf.SERVER_ADMINISTRATORS, Seq("admin001"))
+ private val encodeAuthorization: String = {
+ new String(
+ Base64.getEncoder.encode(
+ s"${Utils.currentUser}:".getBytes()),
+ StandardCharsets.UTF_8)
+ }
+
override def beforeAll(): Unit = {
super.beforeAll()
engineMgr.initialize(KyuubiConf())
@@ -63,11 +74,6 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
.post(null)
assert(405 == response.getStatus)
- val adminUser = Utils.currentUser
- val encodeAuthorization = new String(
- Base64.getEncoder.encode(
- s"$adminUser:".getBytes()),
- "UTF-8")
response = webTarget.path("api/v1/admin/refresh/hadoop_conf")
.request()
.header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
@@ -76,7 +82,7 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
val admin001AuthHeader = new String(
Base64.getEncoder.encode("admin001".getBytes()),
- "UTF-8")
+ StandardCharsets.UTF_8)
response = webTarget.path("api/v1/admin/refresh/hadoop_conf")
.request()
.header(AUTHORIZATION_HEADER, s"BASIC $admin001AuthHeader")
@@ -85,7 +91,7 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
val admin002AuthHeader = new String(
Base64.getEncoder.encode("admin002".getBytes()),
- "UTF-8")
+ StandardCharsets.UTF_8)
response = webTarget.path("api/v1/admin/refresh/hadoop_conf")
.request()
.header(AUTHORIZATION_HEADER, s"BASIC $admin002AuthHeader")
@@ -99,11 +105,6 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
.post(null)
assert(405 == response.getStatus)
- val adminUser = Utils.currentUser
- val encodeAuthorization = new String(
- Base64.getEncoder.encode(
- s"$adminUser:".getBytes()),
- "UTF-8")
response = webTarget.path("api/v1/admin/refresh/user_defaults_conf")
.request()
.header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
@@ -117,11 +118,6 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
.post(null)
assert(405 == response.getStatus)
- val adminUser = Utils.currentUser
- val encodeAuthorization = new String(
- Base64.getEncoder.encode(
- s"$adminUser:".getBytes()),
- "UTF-8")
response = webTarget.path("api/v1/admin/refresh/unlimited_users")
.request()
.header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
@@ -136,12 +132,6 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
.request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
- val adminUser = Utils.currentUser
- val encodeAuthorization = new String(
- Base64.getEncoder.encode(
- s"$adminUser:".getBytes()),
- "UTF-8")
-
// get session list
var response2 = webTarget.path("api/v1/admin/sessions").request()
.header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
@@ -196,12 +186,6 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
"localhost",
Map("testConfig" -> "testValue"))
- val adminUser = Utils.currentUser
- val encodeAuthorization = new String(
- Base64.getEncoder.encode(
- s"$adminUser:".getBytes()),
- "UTF-8")
-
// list sessions
var response = webTarget.path("api/v1/admin/sessions")
.queryParam("users", "admin")
@@ -249,12 +233,6 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
Map("testConfig" -> "testValue"))
val operation = fe.be.getCatalogs(sessionHandle)
- val adminUser = Utils.currentUser
- val encodeAuthorization = new String(
- Base64.getEncoder.encode(
- s"$adminUser:".getBytes()),
- "UTF-8")
-
// list operations
var response = webTarget.path("api/v1/admin/operations").request()
.header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
@@ -301,11 +279,6 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
assert(client.pathExists(engineSpace))
assert(client.getChildren(engineSpace).size == 1)
- val adminUser = Utils.currentUser
- val encodeAuthorization = new String(
- Base64.getEncoder.encode(
- s"$adminUser:".getBytes()),
- "UTF-8")
val response = webTarget.path("api/v1/admin/engine")
.queryParam("sharelevel", "USER")
.queryParam("type", "spark_sql")
@@ -349,11 +322,6 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
assert(client.pathExists(engineSpace))
assert(client.getChildren(engineSpace).size == 1)
- val adminUser = Utils.currentUser
- val encodeAuthorization = new String(
- Base64.getEncoder.encode(
- s"$adminUser:".getBytes()),
- "UTF-8")
val response = webTarget.path("api/v1/admin/engine")
.queryParam("sharelevel", "connection")
.queryParam("type", "spark_sql")
@@ -389,11 +357,6 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
assert(client.pathExists(engineSpace))
assert(client.getChildren(engineSpace).size == 1)
- val adminUser = Utils.currentUser
- val encodeAuthorization = new String(
- Base64.getEncoder.encode(
- s"$adminUser:".getBytes()),
- "UTF-8")
val response = webTarget.path("api/v1/admin/engine")
.queryParam("type", "spark_sql")
.request(MediaType.APPLICATION_JSON_TYPE)
@@ -453,11 +416,6 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
assert(client.pathExists(engineSpace1))
assert(client.pathExists(engineSpace2))
- val adminUser = Utils.currentUser
- val encodeAuthorization = new String(
- Base64.getEncoder.encode(
- s"$adminUser:".getBytes()),
- "UTF-8")
val response = webTarget.path("api/v1/admin/engine")
.queryParam("type", "spark_sql")
.request(MediaType.APPLICATION_JSON_TYPE)
@@ -488,4 +446,36 @@ class AdminResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
}
}
}
+
+ test("list server") {
+ // Mock Kyuubi Server
+ val serverDiscovery = mock[ServiceDiscovery]
+ lenient.when(serverDiscovery.fe).thenReturn(fe)
+ val namespace = conf.get(HighAvailabilityConf.HA_NAMESPACE)
+ withDiscoveryClient(conf) { client =>
+ client.registerService(conf, namespace, serverDiscovery)
+
+ val response = webTarget.path("api/v1/admin/server")
+ .request()
+ .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
+ .get
+
+ assert(200 == response.getStatus)
+ val result = response.readEntity(new GenericType[Seq[ServerData]]() {})
+ assert(result.size == 1)
+ val testServer = result.head
+ val export = fe.asInstanceOf[KyuubiRestFrontendService]
+
+ assert(namespace.equals(testServer.getNamespace.replaceFirst("/", "")))
+ assert(export.host.equals(testServer.getHost))
+ assert(export.connectionUrl.equals(testServer.getInstance()))
+ assert(!testServer.getAttributes.isEmpty)
+ val attributes = testServer.getAttributes
+ assert(attributes.containsKey("serviceUri") &&
+ attributes.get("serviceUri").equals(fe.connectionUrl))
+ assert(attributes.containsKey("version"))
+ assert(attributes.containsKey("sequence"))
+ assert("Running".equals(testServer.getStatus))
+ }
+ }
}