This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 15911573b [KYUUBI #6278] Update DB state when the query fallback to
resource manager and the batch app is terminal (#6284)
15911573b is described below
commit 15911573ba1efc6009747d3ef0d37e385649913a
Author: vinoyang <[email protected]>
AuthorDate: Mon Apr 15 20:36:15 2024 +0800
[KYUUBI #6278] Update DB state when the query fallback to resource manager
and the batch app is terminal (#6284)
* [KYUUBI #6278] Update DB state when the query fallback to yarn and the
batch app is terminal
* update all engine info
---------
Co-authored-by: Wang, Fei <[email protected]>
---
.../kyuubi/server/api/v1/BatchesResource.scala | 22 ++++++++++++++++++++--
1 file changed, 20 insertions(+), 2 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 4e3f8d20b..3c2f02c2f 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -31,6 +31,7 @@ import scala.util.control.NonFatal
import io.swagger.v3.oas.annotations.media.{Content, Schema}
import io.swagger.v3.oas.annotations.responses.ApiResponse
import io.swagger.v3.oas.annotations.tags.Tag
+import org.apache.commons.lang3.StringUtils
import org.glassfish.jersey.media.multipart.{FormDataContentDisposition,
FormDataParam}
import org.apache.kyuubi.{Logging, Utils}
@@ -39,7 +40,7 @@ import org.apache.kyuubi.client.exception.KyuubiRestException
import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys._
-import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo,
KillResponse, KyuubiApplicationManager}
+import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo,
ApplicationState, KillResponse, KyuubiApplicationManager}
import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation,
OperationState}
import org.apache.kyuubi.server.KyuubiServer
import org.apache.kyuubi.server.api.ApiRequestContext
@@ -315,8 +316,14 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
buildBatch(batchSession)
}.getOrElse {
sessionManager.getBatchMetadata(batchId).map { metadata =>
+ val isOperationTerminated = (StringUtils.isNotBlank(metadata.state)
+ &&
OperationState.isTerminal(OperationState.withName(metadata.state)))
+ val isApplicationTerminated =
(StringUtils.isNotBlank(metadata.engineState)
+ &&
ApplicationState.isTerminated(ApplicationState.withName(metadata.engineState)))
+
if (batchV2Enabled(metadata.requestConf) ||
- OperationState.isTerminal(OperationState.withName(metadata.state)) ||
+ isOperationTerminated ||
+ isApplicationTerminated ||
metadata.kyuubiInstance == fe.connectionUrl) {
MetadataManager.buildBatch(metadata)
} else {
@@ -332,6 +339,17 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
Some(userName),
// prevent that the batch be marked as terminated if
application state is NOT_FOUND
Some(metadata.engineOpenTime).filter(_ >
0).orElse(Some(System.currentTimeMillis)))
+ // if the batch app is terminated, update the metadata in db.
+ if (BatchJobSubmission.applicationTerminated(batchAppStatus)) {
+ val appInfo = batchAppStatus.get
+ sessionManager.updateMetadata(Metadata(
+ identifier = batchId,
+ engineId = appInfo.id,
+ engineName = appInfo.name,
+ engineUrl = appInfo.url.orNull,
+ engineState = appInfo.state.toString,
+ engineError = appInfo.error))
+ }
buildBatch(metadata, batchAppStatus)
}
}