This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new d6290a4de [KYUUBI #5063] Support to filter batch with batch name
d6290a4de is described below
commit d6290a4dede9e43ab0918473bf32d77c14cd0ba3
Author: fwang12 <[email protected]>
AuthorDate: Wed Jul 19 15:44:57 2023 +0800
[KYUUBI #5063] Support to filter batch with batch name
### _Why are the changes needed?_
Support to filter batch with batch name filter condition.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
Closes #5063 from turboFei/batch_name.
Closes #5063
63915a56d [fwang12] ut
2548815a2 [fwang12] ut
34a9229b0 [fwang12] style
Authored-by: fwang12 <[email protected]>
Signed-off-by: fwang12 <[email protected]>
---
.../kyuubi/ctl/cli/ControlCliArguments.scala | 1 +
.../kyuubi/ctl/cmd/list/ListBatchCommand.scala | 1 +
.../org/apache/kyuubi/ctl/opt/CliConfig.scala | 1 +
.../org/apache/kyuubi/ctl/opt/CommandLine.scala | 3 ++
.../kyuubi/ctl/ControlCliArgumentsSuite.scala | 1 +
.../org/apache/kyuubi/client/BatchRestApi.java | 13 +++++++
.../kyuubi/server/api/v1/BatchesResource.scala | 24 +++++++------
.../kyuubi/server/metadata/MetadataManager.scala | 16 +--------
.../server/metadata/api/MetadataFilter.scala | 1 +
.../server/metadata/jdbc/JDBCMetadataStore.scala | 4 +++
.../kyuubi/session/KyuubiSessionManager.scala | 15 ++------
.../server/api/v1/BatchesResourceSuite.scala | 42 ++++++++++++++++------
.../server/metadata/MetadataManagerSuite.scala | 4 +--
.../kyuubi/server/rest/client/BatchCliSuite.scala | 8 ++---
14 files changed, 80 insertions(+), 54 deletions(-)
diff --git
a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cli/ControlCliArguments.scala
b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cli/ControlCliArguments.scala
index 35b4ccacf..10bb99296 100644
---
a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cli/ControlCliArguments.scala
+++
b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cli/ControlCliArguments.scala
@@ -112,6 +112,7 @@ class ControlCliArguments(args: Seq[String], env:
Map[String, String] = sys.env)
| batchType ${cliConfig.batchOpts.batchType}
| batchUser ${cliConfig.batchOpts.batchUser}
| batchState ${cliConfig.batchOpts.batchState}
+ | batchName ${cliConfig.batchOpts.batchName}
| createTime ${cliConfig.batchOpts.createTime}
| endTime ${cliConfig.batchOpts.endTime}
| from ${cliConfig.batchOpts.from}
diff --git
a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListBatchCommand.scala
b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListBatchCommand.scala
index 4ce1b49b2..db781da38 100644
---
a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListBatchCommand.scala
+++
b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListBatchCommand.scala
@@ -46,6 +46,7 @@ class ListBatchCommand(cliConfig: CliConfig) extends
Command[GetBatchesResponse]
batchOpts.batchType,
batchOpts.batchUser,
batchOpts.batchState,
+ batchOpts.batchName,
batchOpts.createTime,
batchOpts.endTime,
if (batchOpts.from < 0) 0 else batchOpts.from,
diff --git
a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/CliConfig.scala
b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/CliConfig.scala
index 38284c595..7818f694a 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/CliConfig.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/CliConfig.scala
@@ -66,6 +66,7 @@ case class BatchOpts(
batchType: String = null,
batchUser: String = null,
batchState: String = null,
+ batchName: String = null,
createTime: Long = 0,
endTime: Long = 0,
from: Int = -1,
diff --git
a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/CommandLine.scala
b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/CommandLine.scala
index 478c439a4..271bb06ab 100644
--- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/CommandLine.scala
+++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/CommandLine.scala
@@ -222,6 +222,9 @@ object CommandLine extends CommonCommandLine {
opt[String]("batchState")
.action((v, c) => c.copy(batchOpts = c.batchOpts.copy(batchState =
v)))
.text("Batch state."),
+ opt[String]("batchName")
+ .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(batchName =
v)))
+ .text("Batch name."),
opt[String]("createTime")
.action((v, c) =>
c.copy(batchOpts = c.batchOpts.copy(createTime =
diff --git
a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala
b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala
index 1b973c0eb..bd5b2ac45 100644
---
a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala
+++
b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala
@@ -429,6 +429,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite with
TestPrematureExit {
| --batchType <value> Batch type.
| --batchUser <value> Batch user.
| --batchState <value> Batch state.
+ | --batchName <value> Batch name.
| --createTime <value> Batch create time, should be in
yyyyMMddHHmmss format.
| --endTime <value> Batch end time, should be in
yyyyMMddHHmmss format.
| --from <value> Specify which record to start from
retrieving info.
diff --git
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java
index f5099568b..afcfe77d3 100644
---
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java
+++
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java
@@ -63,10 +63,23 @@ public class BatchRestApi {
Long endTime,
int from,
int size) {
+ return listBatches(batchType, batchUser, batchState, null, createTime,
endTime, from, size);
+ }
+
+ public GetBatchesResponse listBatches(
+ String batchType,
+ String batchUser,
+ String batchState,
+ String batchName,
+ Long createTime,
+ Long endTime,
+ int from,
+ int size) {
Map<String, Object> params = new HashMap<>();
params.put("batchType", batchType);
params.put("batchUser", batchUser);
params.put("batchState", batchState);
+ params.put("batchName", batchName);
if (null != createTime && createTime > 0) {
params.put("createTime", createTime);
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index ba043f071..e5ac23905 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -45,8 +45,8 @@ import org.apache.kyuubi.operation.{BatchJobSubmission,
FetchOrientation, Operat
import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.server.api.v1.BatchesResource._
import org.apache.kyuubi.server.metadata.MetadataManager
-import org.apache.kyuubi.server.metadata.api.Metadata
-import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager,
SessionHandle}
+import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
+import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager,
SessionHandle, SessionType}
import org.apache.kyuubi.util.JdbcUtils
@Tag(name = "Batch")
@@ -315,6 +315,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
@QueryParam("batchType") batchType: String,
@QueryParam("batchState") batchState: String,
@QueryParam("batchUser") batchUser: String,
+ @QueryParam("batchName") batchName: String,
@QueryParam("createTime") createTime: Long,
@QueryParam("endTime") endTime: Long,
@QueryParam("from") from: Int,
@@ -327,15 +328,16 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
validBatchState(batchState),
s"The valid batch state can be one of the following:
${VALID_BATCH_STATES.mkString(",")}")
}
- val batches =
- sessionManager.getBatchesFromMetadataStore(
- batchType,
- batchUser,
- batchState,
- createTime,
- endTime,
- from,
- size)
+
+ val filter = MetadataFilter(
+ sessionType = SessionType.BATCH,
+ engineType = batchType,
+ username = batchUser,
+ state = batchState,
+ requestName = batchName,
+ createTime = createTime,
+ endTime = endTime)
+ val batches = sessionManager.getBatchesFromMetadataStore(filter, from,
size)
new GetBatchesResponse(from, batches.size, batches.asJava)
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
index 17beeb62a..e2648076d 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
@@ -133,21 +133,7 @@ class MetadataManager extends
AbstractService("MetadataManager") {
.filter(_.sessionType == SessionType.BATCH)
}
- def getBatches(
- batchType: String,
- batchUser: String,
- batchState: String,
- createTime: Long,
- endTime: Long,
- from: Int,
- size: Int): Seq[Batch] = {
- val filter = MetadataFilter(
- sessionType = SessionType.BATCH,
- engineType = batchType,
- username = batchUser,
- state = batchState,
- createTime = createTime,
- endTime = endTime)
+ def getBatches(filter: MetadataFilter, from: Int, size: Int): Seq[Batch] = {
withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from,
size, true)).map(
buildBatch)
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/MetadataFilter.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/MetadataFilter.scala
index 6213f8e64..d4f7f2b63 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/MetadataFilter.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/MetadataFilter.scala
@@ -27,6 +27,7 @@ case class MetadataFilter(
engineType: String = null,
username: String = null,
state: String = null,
+ requestName: String = null,
kyuubiInstance: String = null,
createTime: Long = 0L,
endTime: Long = 0L,
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
index 3f9463bf7..9f0bd6843 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
@@ -238,6 +238,10 @@ class JDBCMetadataStore(conf: KyuubiConf) extends
MetadataStore with Logging {
whereConditions += "state = ?"
params += state.toUpperCase(Locale.ROOT)
}
+ Option(filter.requestName).filter(_.nonEmpty).foreach { requestName =>
+ whereConditions += "request_name = ?"
+ params += requestName
+ }
Option(filter.kyuubiInstance).filter(_.nonEmpty).foreach { kyuubiInstance
=>
whereConditions += "kyuubi_instance = ?"
params += kyuubiInstance
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index d2547bca9..8d63dfbf7 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -36,7 +36,7 @@ import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.{KyuubiOperationManager, OperationState}
import org.apache.kyuubi.plugin.{GroupProvider, PluginLoader,
SessionConfAdvisor}
import org.apache.kyuubi.server.metadata.{MetadataManager,
MetadataRequestsRetryRef}
-import org.apache.kyuubi.server.metadata.api.Metadata
+import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.sql.parser.server.KyuubiParser
import org.apache.kyuubi.util.{SignUtils, ThreadUtils}
@@ -240,17 +240,8 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
metadataManager.flatMap(mm => mm.getBatch(batchId))
}
- def getBatchesFromMetadataStore(
- batchType: String,
- batchUser: String,
- batchState: String,
- createTime: Long,
- endTime: Long,
- from: Int,
- size: Int): Seq[Batch] = {
- metadataManager.map { mm =>
- mm.getBatches(batchType, batchUser, batchState, createTime, endTime,
from, size)
- }.getOrElse(Seq.empty)
+ def getBatchesFromMetadataStore(filter: MetadataFilter, from: Int, size:
Int): Seq[Batch] = {
+ metadataManager.map(_.getBatches(filter, from, size)).getOrElse(Seq.empty)
}
def getBatchMetadata(batchId: String): Option[Metadata] = {
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index 8a797f842..b6bc1af52 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -44,7 +44,7 @@ import org.apache.kyuubi.operation.{BatchJobSubmission,
OperationState}
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.server.KyuubiRestFrontendService
import
org.apache.kyuubi.server.http.authentication.AuthenticationHandler.AUTHORIZATION_HEADER
-import org.apache.kyuubi.server.metadata.api.Metadata
+import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager,
SessionHandle, SessionType}
@@ -62,10 +62,9 @@ class BatchesResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper wi
sessionManager.allSessions().foreach { session =>
sessionManager.closeSession(session.handle)
}
- sessionManager.getBatchesFromMetadataStore(null, null, null, 0, 0, 0,
Int.MaxValue).foreach {
- batch =>
-
sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None),
batch.getId)
- sessionManager.cleanupMetadata(batch.getId)
+ sessionManager.getBatchesFromMetadataStore(MetadataFilter(), 0,
Int.MaxValue).foreach { batch =>
+
sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None),
batch.getId)
+ sessionManager.cleanupMetadata(batch.getId)
}
}
@@ -518,11 +517,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper wi
}
assert(sessionManager.getBatchesFromMetadataStore(
- "SPARK",
- null,
- null,
- 0,
- 0,
+ MetadataFilter(engineType = "SPARK"),
0,
Int.MaxValue).size == 2)
}
@@ -711,4 +706,31 @@ class BatchesResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper wi
.replaceAll("\\[", "").replaceAll("\\]", "")
assert(sessionManager.getBatchMetadata(batchId).map(_.state).contains("CANCELED"))
}
+
+ test("get batch list with batch name filter condition") {
+ val sessionManager = server.frontendServices.head
+ .be.sessionManager.asInstanceOf[KyuubiSessionManager]
+ sessionManager.allSessions().foreach(_.close())
+
+ val uniqueName = UUID.randomUUID().toString
+ sessionManager.openBatchSession(
+ "kyuubi",
+ "kyuubi",
+ InetAddress.getLocalHost.getCanonicalHostName,
+ Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
+ newBatchRequest(
+ "spark",
+ sparkBatchTestResource.get,
+ "",
+ uniqueName))
+
+ val response = webTarget.path("api/v1/batches")
+ .queryParam("batchName", uniqueName)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .get()
+
+ assert(response.getStatus == 200)
+ val getBatchListResponse = response.readEntity(classOf[GetBatchesResponse])
+ assert(getBatchListResponse.getTotal == 1)
+ }
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
index 8064b7f1f..aca416dac 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
@@ -28,7 +28,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.metrics.MetricsConstants._
-import org.apache.kyuubi.server.metadata.api.Metadata
+import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.session.SessionType
class MetadataManagerSuite extends KyuubiFunSuite {
@@ -157,7 +157,7 @@ class MetadataManagerSuite extends KyuubiFunSuite {
metadataManager.start()
f(metadataManager)
} finally {
- metadataManager.getBatches(null, null, null, 0, 0, 0,
Int.MaxValue).foreach { batch =>
+ metadataManager.getBatches(MetadataFilter(), 0, Int.MaxValue).foreach {
batch =>
metadataManager.cleanupMetadataById(batch.getId)
}
// ensure no metadata request leak
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
index 7cf939910..0c44fc3a8 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
@@ -34,6 +34,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ctl.{CtlConf, TestPrematureExit}
import org.apache.kyuubi.engine.ApplicationManagerInfo
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
+import org.apache.kyuubi.server.metadata.api.MetadataFilter
import org.apache.kyuubi.session.KyuubiSessionManager
class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with
BatchTestHelper {
@@ -101,10 +102,9 @@ class BatchCliSuite extends RestClientTestHelper with
TestPrematureExit with Bat
sessionManager.allSessions().foreach { session =>
sessionManager.closeSession(session.handle)
}
- sessionManager.getBatchesFromMetadataStore(null, null, null, 0, 0, 0,
Int.MaxValue).foreach {
- batch =>
-
sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None),
batch.getId)
- sessionManager.cleanupMetadata(batch.getId)
+ sessionManager.getBatchesFromMetadataStore(MetadataFilter(), 0,
Int.MaxValue).foreach { batch =>
+
sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None),
batch.getId)
+ sessionManager.cleanupMetadata(batch.getId)
}
}