This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 208354c32 [KYUUBI #6028] Exited spark-submit process should not block 
batch submit queue
208354c32 is described below

commit 208354c327104be0549503f760ce61d376396025
Author: Cheng Pan <[email protected]>
AuthorDate: Tue Jan 30 23:38:52 2024 +0800

    [KYUUBI #6028] Exited spark-submit process should not block batch submit 
queue
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    While enabling batch implementation V2 with the following configurations
    ```
    kyuubi.batch.impl.version=2
    kyuubi.batch.submitter.enabled=true
    kyuubi.batch.submitter.threads=48
    spark.master=yarn
    spark.submit.deployMode=cluster
    spark.yarn.submit.waitAppCompletion=false
    ```
    
    I found that the batch jobs will be blocked in the DB queue once a YARN 
queue has no resources, this brings an issue, the subsequential batch jobs that 
are going to be submitted to another YARN queue also be queued in DB, rather 
than YARN queue.
    
    ```
    mysql> select state, engine_state, count(1) from metadata where state in 
('INITIALIZED', 'PENDING', 'RUNNING') group by state, engine_state;
    +-------------+--------------+----------+
    | state       | engine_state | count(1) |
    +-------------+--------------+----------+
    | INITIALIZED | NULL         |      166 |
    | PENDING     | NULL         |        1 |
    | RUNNING     | PENDING      |      148 |
    | RUNNING     | RUNNING      |      415 |
    +-------------+--------------+----------+
    ```
    
    ## Describe Your Solution ๐Ÿ”ง
    
    The submitter queue whose size is controlled by 
`kyuubi.batch.submitter.threads` is designed to address the `spark-submit` 
process concurrency issue, too many `spark-submit` processes may run out of the 
Kyuubi server's node CPU/memory resources and eventually crash the service. For 
Spark YARN cluster mode, if set `spark.yarn.submit.waitAppCompletion=false`, 
the local `spark-submit` process exits immediately once the Application goes 
ACCEPTED status, even no resource could be allocated [...]
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    Pass GA, and roll out into internal cluster.
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6028 from pan3793/batch-submit.
    
    Closes #6028
    
    05fcc758f [Cheng Pan] Exited spark-submit process should not block batch 
submit queue
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala      | 2 +-
 .../src/main/scala/org/apache/kyuubi/engine/EngineRef.scala      | 2 +-
 .../src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala    | 8 +++-----
 .../org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala     | 2 +-
 .../scala/org/apache/kyuubi/operation/BatchJobSubmission.scala   | 3 +++
 .../main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala | 9 +++++++--
 .../scala/org/apache/kyuubi/session/KyuubiBatchSession.scala     | 2 ++
 7 files changed, 18 insertions(+), 10 deletions(-)

diff --git 
a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
 
b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
index 93153ad36..a32a45d6c 100644
--- 
a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
+++ 
b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
@@ -193,7 +193,7 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
     Seq("_123", "spark_exec", "spark@", "a" * 238).foreach { invalid =>
       conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, invalid)
       val builder = new SparkProcessBuilder("test", true, conf)
-      val e = intercept[KyuubiException](builder.validateConf)
+      val e = intercept[KyuubiException](builder.validateConf())
       assert(e.getMessage === s"'$invalid' in 
spark.kubernetes.executor.podNamePrefix is" +
         s" invalid. must conform 
https://kubernetes.io/docs/concepts/overview/"; +
         "working-with-objects/names/#dns-subdomain-names and the value length 
<= 237")
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 0d1b0adc7..0bb13b049 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -225,7 +225,7 @@ private[kyuubi] class EngineRef(
       acquiredPermit = true
       val redactedCmd = builder.toString
       info(s"Launching engine:\n$redactedCmd")
-      builder.validateConf
+      builder.validateConf()
       val process = builder.start
       var exitValue: Option[Int] = None
       var lastApplicationInfo: Option[ApplicationInfo] = None
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 8a8f59ffe..c8c3f9c39 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -24,7 +24,6 @@ import java.nio.file.{Files, Path, Paths}
 
 import scala.collection.JavaConverters._
 
-import com.google.common.annotations.VisibleForTesting
 import com.google.common.collect.EvictingQueue
 import org.apache.commons.lang3.StringUtils.containsIgnoreCase
 
@@ -166,9 +165,8 @@ trait ProcBuilder {
   // Visible for test
   @volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true
   private var logCaptureThread: Thread = _
-  private var process: Process = _
-  @VisibleForTesting
-  @volatile private[kyuubi] var processLaunched: Boolean = _
+  @volatile private[kyuubi] var process: Process = _
+  @volatile private[kyuubi] var processLaunched: Boolean = false
 
   private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
     val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT)
@@ -206,7 +204,7 @@ trait ProcBuilder {
     file
   }
 
-  def validateConf: Unit = {}
+  def validateConf(): Unit = {}
 
   final def start: Process = synchronized {
     process = processBuilder.start()
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 10e1e6ce9..4c06d7951 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -302,7 +302,7 @@ class SparkProcessBuilder(
     
conf.getOption(KUBERNETES_NAMESPACE_KEY).orElse(defaultsConf.get(KUBERNETES_NAMESPACE_KEY))
   }
 
-  override def validateConf: Unit = Validator.validateConf(conf)
+  override def validateConf(): Unit = Validator.validateConf(conf)
 
   // For spark on kubernetes, spark pod using env SPARK_USER_NAME as current 
user
   def setSparkUserName(userName: String, buffer: mutable.Buffer[String]): Unit 
= {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 276fe3446..c29065f19 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -100,6 +100,9 @@ class BatchJobSubmission(
       getOperationLog)
   }
 
+  def startupProcessAlive: Boolean =
+    builder.processLaunched && Option(builder.process).exists(_.isAlive)
+
   override def currentApplicationInfo(): Option[ApplicationInfo] = {
     if (isTerminal(state) && 
_applicationInfo.map(_.state).exists(ApplicationState.isTerminated)) {
       return _applicationInfo
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
index 2bfbbce2a..e2736267f 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
@@ -94,8 +94,13 @@ class KyuubiBatchService(
                   metadata.appState match {
                     // app that is not submitted to resource manager
                     case None | Some(ApplicationState.NOT_FOUND) => false
-                    // app that is pending in resource manager
-                    case Some(ApplicationState.PENDING) => false
+                    // app that is pending in resource manager while the local 
startup
+                    // process is alive. For example, in Spark YARN cluster 
mode, if set
+                    // spark.yarn.submit.waitAppCompletion=false, the local 
spark-submit
+                    // process exits immediately once Application goes 
ACCEPTED status,
+                    // even no resource could be allocated for the AM 
container.
+                    case Some(ApplicationState.PENDING) if 
batchSession.startupProcessAlive =>
+                      false
                     // not sure, added for safe
                     case Some(ApplicationState.UNKNOWN) => false
                     case _ => true
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
index 531bbc3af..4ac84c1d0 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
@@ -111,6 +111,8 @@ class KyuubiBatchSession(
       batchArgs,
       metadata)
 
+  def startupProcessAlive: Boolean = batchJobSubmissionOp.startupProcessAlive
+
   private def waitMetadataRequestsRetryCompletion(): Unit = {
     val batchId = batchJobSubmissionOp.batchId
     sessionManager.getMetadataRequestsRetryRef(batchId).foreach {

Reply via email to