This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 208354c32 [KYUUBI #6028] Exited spark-submit process should not block
batch submit queue
208354c32 is described below
commit 208354c327104be0549503f760ce61d376396025
Author: Cheng Pan <[email protected]>
AuthorDate: Tue Jan 30 23:38:52 2024 +0800
[KYUUBI #6028] Exited spark-submit process should not block batch submit
queue
# :mag: Description
## Issue References ๐
While enabling batch implementation V2 with the following configurations
```
kyuubi.batch.impl.version=2
kyuubi.batch.submitter.enabled=true
kyuubi.batch.submitter.threads=48
spark.master=yarn
spark.submit.deployMode=cluster
spark.yarn.submit.waitAppCompletion=false
```
I found that the batch jobs will be blocked in the DB queue once a YARN
queue has no resources, this brings an issue, the subsequential batch jobs that
are going to be submitted to another YARN queue also be queued in DB, rather
than YARN queue.
```
mysql> select state, engine_state, count(1) from metadata where state in
('INITIALIZED', 'PENDING', 'RUNNING') group by state, engine_state;
+-------------+--------------+----------+
| state | engine_state | count(1) |
+-------------+--------------+----------+
| INITIALIZED | NULL | 166 |
| PENDING | NULL | 1 |
| RUNNING | PENDING | 148 |
| RUNNING | RUNNING | 415 |
+-------------+--------------+----------+
```
## Describe Your Solution ๐ง
The submitter queue whose size is controlled by
`kyuubi.batch.submitter.threads` is designed to address the `spark-submit`
process concurrency issue, too many `spark-submit` processes may run out of the
Kyuubi server's node CPU/memory resources and eventually crash the service. For
Spark YARN cluster mode, if set `spark.yarn.submit.waitAppCompletion=false`,
the local `spark-submit` process exits immediately once the Application goes
ACCEPTED status, even no resource could be allocated [...]
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
Pass GA, and roll out into internal cluster.
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6028 from pan3793/batch-submit.
Closes #6028
05fcc758f [Cheng Pan] Exited spark-submit process should not block batch
submit queue
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala | 2 +-
.../src/main/scala/org/apache/kyuubi/engine/EngineRef.scala | 2 +-
.../src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala | 8 +++-----
.../org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala | 2 +-
.../scala/org/apache/kyuubi/operation/BatchJobSubmission.scala | 3 +++
.../main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala | 9 +++++++--
.../scala/org/apache/kyuubi/session/KyuubiBatchSession.scala | 2 ++
7 files changed, 18 insertions(+), 10 deletions(-)
diff --git
a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
index 93153ad36..a32a45d6c 100644
---
a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
+++
b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
@@ -193,7 +193,7 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
Seq("_123", "spark_exec", "spark@", "a" * 238).foreach { invalid =>
conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, invalid)
val builder = new SparkProcessBuilder("test", true, conf)
- val e = intercept[KyuubiException](builder.validateConf)
+ val e = intercept[KyuubiException](builder.validateConf())
assert(e.getMessage === s"'$invalid' in
spark.kubernetes.executor.podNamePrefix is" +
s" invalid. must conform
https://kubernetes.io/docs/concepts/overview/" +
"working-with-objects/names/#dns-subdomain-names and the value length
<= 237")
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 0d1b0adc7..0bb13b049 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -225,7 +225,7 @@ private[kyuubi] class EngineRef(
acquiredPermit = true
val redactedCmd = builder.toString
info(s"Launching engine:\n$redactedCmd")
- builder.validateConf
+ builder.validateConf()
val process = builder.start
var exitValue: Option[Int] = None
var lastApplicationInfo: Option[ApplicationInfo] = None
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 8a8f59ffe..c8c3f9c39 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -24,7 +24,6 @@ import java.nio.file.{Files, Path, Paths}
import scala.collection.JavaConverters._
-import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.EvictingQueue
import org.apache.commons.lang3.StringUtils.containsIgnoreCase
@@ -166,9 +165,8 @@ trait ProcBuilder {
// Visible for test
@volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true
private var logCaptureThread: Thread = _
- private var process: Process = _
- @VisibleForTesting
- @volatile private[kyuubi] var processLaunched: Boolean = _
+ @volatile private[kyuubi] var process: Process = _
+ @volatile private[kyuubi] var processLaunched: Boolean = false
private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT)
@@ -206,7 +204,7 @@ trait ProcBuilder {
file
}
- def validateConf: Unit = {}
+ def validateConf(): Unit = {}
final def start: Process = synchronized {
process = processBuilder.start()
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 10e1e6ce9..4c06d7951 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -302,7 +302,7 @@ class SparkProcessBuilder(
conf.getOption(KUBERNETES_NAMESPACE_KEY).orElse(defaultsConf.get(KUBERNETES_NAMESPACE_KEY))
}
- override def validateConf: Unit = Validator.validateConf(conf)
+ override def validateConf(): Unit = Validator.validateConf(conf)
// For spark on kubernetes, spark pod using env SPARK_USER_NAME as current
user
def setSparkUserName(userName: String, buffer: mutable.Buffer[String]): Unit
= {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 276fe3446..c29065f19 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -100,6 +100,9 @@ class BatchJobSubmission(
getOperationLog)
}
+ def startupProcessAlive: Boolean =
+ builder.processLaunched && Option(builder.process).exists(_.isAlive)
+
override def currentApplicationInfo(): Option[ApplicationInfo] = {
if (isTerminal(state) &&
_applicationInfo.map(_.state).exists(ApplicationState.isTerminated)) {
return _applicationInfo
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
index 2bfbbce2a..e2736267f 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
@@ -94,8 +94,13 @@ class KyuubiBatchService(
metadata.appState match {
// app that is not submitted to resource manager
case None | Some(ApplicationState.NOT_FOUND) => false
- // app that is pending in resource manager
- case Some(ApplicationState.PENDING) => false
+ // app that is pending in resource manager while the local
startup
+ // process is alive. For example, in Spark YARN cluster
mode, if set
+ // spark.yarn.submit.waitAppCompletion=false, the local
spark-submit
+ // process exits immediately once Application goes
ACCEPTED status,
+ // even no resource could be allocated for the AM
container.
+ case Some(ApplicationState.PENDING) if
batchSession.startupProcessAlive =>
+ false
// not sure, added for safe
case Some(ApplicationState.UNKNOWN) => false
case _ => true
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
index 531bbc3af..4ac84c1d0 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
@@ -111,6 +111,8 @@ class KyuubiBatchSession(
batchArgs,
metadata)
+ def startupProcessAlive: Boolean = batchJobSubmissionOp.startupProcessAlive
+
private def waitMetadataRequestsRetryCompletion(): Unit = {
val batchId = batchJobSubmissionOp.batchId
sessionManager.getMetadataRequestsRetryRef(batchId).foreach {