This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 13f11c7ab [KYUUBI #4981] Refactor code of closeBatchSession
13f11c7ab is described below
commit 13f11c7ab4d99894eafc6dfa9fba7228016632db
Author: Cheng Pan <[email protected]>
AuthorDate: Tue Jun 20 16:36:27 2023 +0800
[KYUUBI #4981] Refactor code of closeBatchSession
### _Why are the changes needed?_
This PR aims to refactor the method `closeBatchSession`, by
- extracting sub-methods `checkPermission` and `forceKill`
- explicitly handling redirection on `metadata.kyuubiInstance !=
fe.connectionUrl` to make the logic clearer
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
Closes #4981 from pan3793/cancel.
Closes #4981
0d2e85acc [Cheng Pan] Refactor code of closeBatchSession
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/server/api/v1/BatchesResource.scala | 59 +++++++++++-----------
1 file changed, 30 insertions(+), 29 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 8271c8b88..cb0c63be0 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -40,7 +40,7 @@ import org.apache.kyuubi.client.exception.KyuubiRestException
import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys._
-import org.apache.kyuubi.engine.{ApplicationInfo, KyuubiApplicationManager}
+import org.apache.kyuubi.engine.{ApplicationInfo, KillResponse,
KyuubiApplicationManager}
import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation,
OperationState}
import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.server.api.v1.BatchesResource._
@@ -398,29 +398,37 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
def closeBatchSession(
@PathParam("batchId") batchId: String,
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String):
CloseBatchResponse = {
- val sessionHandle = formatSessionHandle(batchId)
-
- val userName = fe.getSessionUser(hs2ProxyUser)
- sessionManager.getBatchSession(sessionHandle).map { batchSession =>
- if (userName != batchSession.user) {
+ def checkPermission(operator: String, owner: String): Unit = {
+ if (operator != owner) {
throw new WebApplicationException(
- s"$userName is not allowed to close the session belong to
${batchSession.user}",
+ s"$operator is not allowed to close the session belong to $owner",
Status.METHOD_NOT_ALLOWED)
}
+ }
+
+ def forceKill(clusterManager: Option[String], batchId: String):
KillResponse = {
+ val (killed, message) = sessionManager.applicationManager
+ .killApplication(clusterManager, batchId)
+ info(s"Mark batch[$batchId] closed by ${fe.connectionUrl}")
+ sessionManager.updateMetadata(Metadata(identifier = batchId,
peerInstanceClosed = true))
+ (killed, message)
+ }
+
+ val sessionHandle = formatSessionHandle(batchId)
+ val userName = fe.getSessionUser(hs2ProxyUser)
+
+ sessionManager.getBatchSession(sessionHandle).map { batchSession =>
+ checkPermission(userName, batchSession.user)
sessionManager.closeSession(batchSession.handle)
- val (success, msg) = batchSession.batchJobSubmissionOp.getKillMessage
- new CloseBatchResponse(success, msg)
+ val (killed, msg) = batchSession.batchJobSubmissionOp.getKillMessage
+ new CloseBatchResponse(killed, msg)
}.getOrElse {
sessionManager.getBatchMetadata(batchId).map { metadata =>
- if (userName != metadata.username) {
- throw new WebApplicationException(
- s"$userName is not allowed to close the session belong to
${metadata.username}",
- Status.METHOD_NOT_ALLOWED)
- } else if
(OperationState.isTerminal(OperationState.withName(metadata.state)) ||
- metadata.kyuubiInstance == fe.connectionUrl) {
+ checkPermission(userName, metadata.username)
+ if
(OperationState.isTerminal(OperationState.withName(metadata.state))) {
new CloseBatchResponse(false, s"The batch[$metadata] has been
terminated.")
- } else {
+ } else if (metadata.kyuubiInstance != fe.connectionUrl) {
info(s"Redirecting delete batch[$batchId] to
${metadata.kyuubiInstance}")
val internalRestClient =
getInternalRestClient(metadata.kyuubiInstance)
try {
@@ -428,20 +436,13 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
} catch {
case e: KyuubiRestException =>
error(s"Error redirecting delete batch[$batchId] to
${metadata.kyuubiInstance}", e)
- val appMgrKillResp =
sessionManager.applicationManager.killApplication(
- metadata.clusterManager,
- batchId)
- info(
- s"Marking batch[$batchId/${metadata.kyuubiInstance}] closed by
${fe.connectionUrl}")
- sessionManager.updateMetadata(Metadata(
- identifier = batchId,
- peerInstanceClosed = true))
- if (appMgrKillResp._1) {
- new CloseBatchResponse(appMgrKillResp._1, appMgrKillResp._2)
- } else {
- new CloseBatchResponse(false, Utils.stringifyException(e))
- }
+ val (killed, msg) = forceKill(metadata.clusterManager, batchId)
+ new CloseBatchResponse(killed, if (killed) msg else
Utils.stringifyException(e))
}
+ } else { // should not happen, but handle this for safe
+ warn(s"Something wrong on deleting batch[$batchId], try forcibly
killing application")
+ val (killed, msg) = forceKill(metadata.clusterManager, batchId)
+ new CloseBatchResponse(killed, msg)
}
}.getOrElse {
error(s"Invalid batchId: $batchId")