This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.8 by this push:
new 6b12e31b6 [KYUUBI #6028] Exited spark-submit process should not block
batch submit queue
6b12e31b6 is described below
commit 6b12e31b68640b20951b0336cd190ef15f3b34e5
Author: Cheng Pan <[email protected]>
AuthorDate: Tue Jan 30 23:38:52 2024 +0800
[KYUUBI #6028] Exited spark-submit process should not block batch submit
queue
While enabling batch implementation V2 with the following configurations
```
kyuubi.batch.impl.version=2
kyuubi.batch.submitter.enabled=true
kyuubi.batch.submitter.threads=48
spark.master=yarn
spark.submit.deployMode=cluster
spark.yarn.submit.waitAppCompletion=false
```
I found that the batch jobs will be blocked in the DB queue once a YARN
queue has no resources, this brings an issue, the subsequential batch jobs that
are going to be submitted to another YARN queue also be queued in DB, rather
than YARN queue.
```
mysql> select state, engine_state, count(1) from metadata where state in
('INITIALIZED', 'PENDING', 'RUNNING') group by state, engine_state;
+-------------+--------------+----------+
| state | engine_state | count(1) |
+-------------+--------------+----------+
| INITIALIZED | NULL | 166 |
| PENDING | NULL | 1 |
| RUNNING | PENDING | 148 |
| RUNNING | RUNNING | 415 |
+-------------+--------------+----------+
```
The submitter queue whose size is controlled by
`kyuubi.batch.submitter.threads` is designed to address the `spark-submit`
process concurrency issue, too many `spark-submit` processes may run out of the
Kyuubi server's node CPU/memory resources and eventually crash the service. For
Spark YARN cluster mode, if set `spark.yarn.submit.waitAppCompletion=false`,
the local `spark-submit` process exits immediately once the Application goes
ACCEPTED status, even no resource could be allocated [...]
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
Pass GA, and roll out into internal cluster.
---
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6028 from pan3793/batch-submit.
Closes #6028
05fcc758f [Cheng Pan] Exited spark-submit process should not block batch
submit queue
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit 208354c327104be0549503f760ce61d376396025)
Signed-off-by: Cheng Pan <[email protected]>
---
.../kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala | 2 +-
.../src/main/scala/org/apache/kyuubi/engine/EngineRef.scala | 2 +-
.../src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala | 8 +++-----
.../org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala | 2 +-
.../scala/org/apache/kyuubi/operation/BatchJobSubmission.scala | 3 +++
.../main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala | 9 +++++++--
.../scala/org/apache/kyuubi/session/KyuubiBatchSession.scala | 2 ++
7 files changed, 18 insertions(+), 10 deletions(-)
diff --git
a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
index ea804575e..79852a42b 100644
---
a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
+++
b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
@@ -193,7 +193,7 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
Seq("_123", "spark_exec", "spark@", "a" * 238).foreach { invalid =>
conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, invalid)
val builder = new SparkProcessBuilder("test", conf)
- val e = intercept[KyuubiException](builder.validateConf)
+ val e = intercept[KyuubiException](builder.validateConf())
assert(e.getMessage === s"'$invalid' in
spark.kubernetes.executor.podNamePrefix is" +
s" invalid. must conform
https://kubernetes.io/docs/concepts/overview/" +
"working-with-objects/names/#dns-subdomain-names and the value length
<= 237")
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index e15645133..952ed1ef5 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -215,7 +215,7 @@ private[kyuubi] class EngineRef(
acquiredPermit = true
val redactedCmd = builder.toString
info(s"Launching engine:\n$redactedCmd")
- builder.validateConf
+ builder.validateConf()
val process = builder.start
var exitValue: Option[Int] = None
var lastApplicationInfo: Option[ApplicationInfo] = None
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 23196bf1d..64276493a 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -24,7 +24,6 @@ import java.nio.file.{Files, Path, Paths}
import scala.collection.JavaConverters._
-import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.EvictingQueue
import org.apache.commons.lang3.StringUtils.containsIgnoreCase
@@ -164,9 +163,8 @@ trait ProcBuilder {
// Visible for test
@volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true
private var logCaptureThread: Thread = _
- private var process: Process = _
- @VisibleForTesting
- @volatile private[kyuubi] var processLaunched: Boolean = _
+ @volatile private[kyuubi] var process: Process = _
+ @volatile private[kyuubi] var processLaunched: Boolean = false
private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT)
@@ -204,7 +202,7 @@ trait ProcBuilder {
file
}
- def validateConf: Unit = {}
+ def validateConf(): Unit = {}
final def start: Process = synchronized {
process = processBuilder.start()
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 972284f5c..f991d9f2b 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -295,7 +295,7 @@ class SparkProcessBuilder(
conf.getOption(KUBERNETES_NAMESPACE_KEY).orElse(defaultsConf.get(KUBERNETES_NAMESPACE_KEY))
}
- override def validateConf: Unit = Validator.validateConf(conf)
+ override def validateConf(): Unit = Validator.validateConf(conf)
// For spark on kubernetes, spark pod using env SPARK_USER_NAME as current
user
def setSparkUserName(userName: String, buffer: mutable.Buffer[String]): Unit
= {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 3bb17e1aa..b8795ef61 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -100,6 +100,9 @@ class BatchJobSubmission(
getOperationLog)
}
+ def startupProcessAlive: Boolean =
+ builder.processLaunched && Option(builder.process).exists(_.isAlive)
+
override def currentApplicationInfo(): Option[ApplicationInfo] = {
if (isTerminal(state) &&
_applicationInfo.map(_.state).exists(ApplicationState.isTerminated)) {
return _applicationInfo
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
index 2bfbbce2a..e2736267f 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
@@ -94,8 +94,13 @@ class KyuubiBatchService(
metadata.appState match {
// app that is not submitted to resource manager
case None | Some(ApplicationState.NOT_FOUND) => false
- // app that is pending in resource manager
- case Some(ApplicationState.PENDING) => false
+ // app that is pending in resource manager while the local
startup
+ // process is alive. For example, in Spark YARN cluster
mode, if set
+ // spark.yarn.submit.waitAppCompletion=false, the local
spark-submit
+ // process exits immediately once Application goes
ACCEPTED status,
+ // even no resource could be allocated for the AM
container.
+ case Some(ApplicationState.PENDING) if
batchSession.startupProcessAlive =>
+ false
// not sure, added for safe
case Some(ApplicationState.UNKNOWN) => false
case _ => true
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
index 8489e6d30..6a9f81045 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
@@ -112,6 +112,8 @@ class KyuubiBatchSession(
batchArgs,
metadata)
+ def startupProcessAlive: Boolean = batchJobSubmissionOp.startupProcessAlive
+
private def waitMetadataRequestsRetryCompletion(): Unit = {
val batchId = batchJobSubmissionOp.batchId
sessionManager.getMetadataRequestsRetryRef(batchId).foreach {