This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.8 by this push:
new 9e24da7e8 [KYUUBI #5220][FOLLOWUP] Batch submitted considers
application state
9e24da7e8 is described below
commit 9e24da7e8929a39283fee58605cb269cfbd45294
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Sep 14 12:13:20 2023 +0800
[KYUUBI #5220][FOLLOWUP] Batch submitted considers application state
### _Why are the changes needed?_
This PR aims to fix the `SparkSubmit` concurrency limit implemented in
#5220.
"submitted" judgment in #5220 only considered `OperationState`, actually,
`ApplicationState` should be counted too. For instance, if a batch is pending
in `ACCEPTED` state, the `SparkSubmit` process won't exit until changed to
`RUNNING` or `FAILED` state, in such case, the `OperationState` is `RUNNING`
and `ApplicationState` is `PENDING`, it should not be treated as "submitted".
Additionally, this PR treats metastore as the single of truth for batch
instead of `SessionManager`
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No.
Closes #5279 from pan3793/5220-followup.
Closes #5220
903abc6d4 [Cheng Pan] Fix
0af6738d2 [Cheng Pan] [KYUUBI #5220][FOLLOWUP] Batch submmited should not
contain application pending state
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit dd2cd516d72631b464bcec2d4bcafc29d61d3880)
Signed-off-by: Cheng Pan <[email protected]>
---
.../apache/kyuubi/server/KyuubiBatchService.scala | 37 ++++++++++++----------
.../kyuubi/server/metadata/api/Metadata.scala | 12 ++++++-
2 files changed, 31 insertions(+), 18 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
index bf10a68fa..2bfbbce2a 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.server
import java.util.concurrent.atomic.AtomicBoolean
import org.apache.kyuubi.config.KyuubiConf.BATCH_SUBMITTER_THREADS
+import org.apache.kyuubi.engine.ApplicationState
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.metadata.MetadataManager
import org.apache.kyuubi.service.{AbstractService, Serverable}
@@ -83,26 +84,28 @@ class KyuubiBatchService(
metadata.requestArgs,
Some(metadata),
fromRecovery = false)
- val sessionHandle = sessionManager.openBatchSession(batchSession)
+ sessionManager.openBatchSession(batchSession)
var submitted = false
while (!submitted) { // block until batch job submitted
- submitted = sessionManager.getBatchSession(sessionHandle).map {
batchSession =>
- val batchState =
batchSession.batchJobSubmissionOp.getStatus.state
- batchState == OperationState.RUNNING ||
OperationState.isTerminal(batchState)
- }.getOrElse {
- error(s"Batch Session $batchId is not existed, marked as
finished")
- true
+ submitted = metadataManager.getBatchSessionMetadata(batchId)
match {
+ case Some(metadata) if
OperationState.isTerminal(metadata.opState) =>
+ true
+ case Some(metadata) if metadata.opState ==
OperationState.RUNNING =>
+ metadata.appState match {
+ // app that is not submitted to resource manager
+ case None | Some(ApplicationState.NOT_FOUND) => false
+ // app that is pending in resource manager
+ case Some(ApplicationState.PENDING) => false
+ // not sure, added for safe
+ case Some(ApplicationState.UNKNOWN) => false
+ case _ => true
+ }
+ case Some(_) =>
+ false
+ case None =>
+ error(s"$batchId does not existed in metastore, assume it is
finished")
+ true
}
- // should we always treat metastore as the single of truth?
- //
- // submitted = metadataManager.getBatchSessionMetadata(batchId)
match {
- // case Some(metadata) =>
- // val batchState = OperationState.withName(metadata.state)
- // batchState == OperationState.RUNNING ||
OperationState.isTerminal(batchState)
- // case None =>
- // error(s"$batchId does not existed in metastore, assume it
is finished")
- // true
- // }
if (!submitted) Thread.sleep(1000)
}
info(s"$batchId is submitted or finished.")
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
index 12759f8cc..3e3d94828 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala
@@ -18,7 +18,10 @@
package org.apache.kyuubi.server.metadata.api
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.ApplicationManagerInfo
+import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState}
+import org.apache.kyuubi.engine.ApplicationState.ApplicationState
+import org.apache.kyuubi.operation.OperationState
+import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.session.SessionType.SessionType
/**
@@ -82,4 +85,11 @@ case class Metadata(
requestConf.get(KyuubiConf.KUBERNETES_CONTEXT.key),
requestConf.get(KyuubiConf.KUBERNETES_NAMESPACE.key))
}
+
+ def opState: OperationState = {
+ assert(state != null, "invalid state, a normal batch record must have
non-null state")
+ OperationState.withName(state)
+ }
+
+ def appState: Option[ApplicationState] =
Option(engineState).map(ApplicationState.withName)
}