This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new ac21e271f [KYUUBI #5043] Destroy the build process when waitCompletion 
is false and the engine is running in cluster mode
ac21e271f is described below

commit ac21e271fe717ff8159dc811778ee5a5ef7c9198
Author: wforget <[email protected]>
AuthorDate: Mon Jul 17 18:12:11 2023 +0800

    [KYUUBI #5043] Destroy the build process when waitCompletion is false and 
the engine is running in cluster mode
    
    ### _Why are the changes needed?_
    
    When waitCompletion is false, we need to determine whether it is in cluster 
mode to avoid killing the engine running locally.
    
    close #5043
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    Closes #5044 from wForget/KYUUBI-5043.
    
    Closes #5043
    
    9d26aea15 [wforget] fix style
    aad322f1a [wforget] fix test
    20d082ae1 [wforget] [KYUUBI-5043] Destroy the build process when 
waitCompletion is false and the engine is running in cluster mode
    
    Authored-by: wforget <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../scala/org/apache/kyuubi/engine/EngineRef.scala |  8 +++-
 .../org/apache/kyuubi/engine/ProcBuilder.scala     |  7 ++--
 .../kyuubi/engine/spark/SparkProcessBuilder.scala  | 14 +++++++
 .../kyuubi/operation/BatchJobSubmission.scala      | 14 +++++--
 .../engine/spark/SparkProcessBuilderSuite.scala    |  4 +-
 .../kyuubi/server/rest/client/BatchCliSuite.scala  | 49 ++++++++++++++++++----
 6 files changed, 78 insertions(+), 18 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 387758714..5ade86400 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -267,9 +267,15 @@ private[kyuubi] class EngineRef(
       }
       engineRef.get
     } finally {
+      val waitCompletion = 
conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
+      val destroyProcess = !waitCompletion && builder.isClusterMode()
+      if (destroyProcess) {
+        info("Destroy the builder process because waitCompletion is false" +
+          " and the engine is running in cluster mode.")
+      }
       // we must close the process builder whether session open is success or 
failure since
       // we have a log capture thread in process builder.
-      builder.close()
+      builder.close(destroyProcess)
     }
   }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index d30e72674..a44fe06bc 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -155,7 +155,7 @@ trait ProcBuilder {
   @volatile private var error: Throwable = UNCAUGHT_ERROR
 
   private val engineLogMaxLines = 
conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_MAX_LOG_LINES)
-  private val waitCompletion = 
conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
+
   protected val lastRowsOfLog: EvictingQueue[String] = 
EvictingQueue.create(engineLogMaxLines)
   // Visible for test
   @volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true
@@ -249,13 +249,14 @@ trait ProcBuilder {
     process
   }
 
-  def close(destroyProcess: Boolean = !waitCompletion): Unit = synchronized {
+  def isClusterMode(): Boolean = false
+
+  def close(destroyProcess: Boolean): Unit = synchronized {
     if (logCaptureThread != null) {
       logCaptureThread.interrupt()
       logCaptureThread = null
     }
     if (destroyProcess && process != null) {
-      info("Destroy the process, since waitCompletion is false.")
       process.destroyForcibly()
       process = null
     }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 6110c0246..5bfe506da 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.engine.spark
 
 import java.io.{File, IOException}
 import java.nio.file.Paths
+import java.util.Locale
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -210,6 +211,18 @@ class SparkProcessBuilder(
     conf.getOption(MASTER_KEY).orElse(defaultsConf.get(MASTER_KEY))
   }
 
+  def deployMode(): Option[String] = {
+    conf.getOption(DEPLOY_MODE_KEY).orElse(defaultsConf.get(DEPLOY_MODE_KEY))
+  }
+
+  override def isClusterMode(): Boolean = {
+    clusterManager().map(_.toLowerCase(Locale.ROOT)) match {
+      case Some(m) if m.startsWith("yarn") || m.startsWith("k8s") =>
+        deployMode().exists(_.toLowerCase(Locale.ROOT) == "cluster")
+      case _ => false
+    }
+  }
+
   def kubernetesContext(): Option[String] = {
     
conf.getOption(KUBERNETES_CONTEXT_KEY).orElse(defaultsConf.get(KUBERNETES_CONTEXT_KEY))
   }
@@ -237,6 +250,7 @@ object SparkProcessBuilder {
   final val APP_KEY = "spark.app.name"
   final val TAG_KEY = "spark.yarn.tags"
   final val MASTER_KEY = "spark.master"
+  final val DEPLOY_MODE_KEY = "spark.submit.deployMode"
   final val KUBERNETES_CONTEXT_KEY = "spark.kubernetes.context"
   final val KUBERNETES_NAMESPACE_KEY = "spark.kubernetes.namespace"
   final val INTERNAL_RESOURCE = "spark-internal"
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 82562541d..8bb9804ec 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -268,7 +268,15 @@ class BatchJobSubmission(
         }
       }
     } finally {
-      builder.close()
+      val waitCompletion = 
batchConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION.key)
+        .map(_.toBoolean).getOrElse(
+          
session.sessionConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION))
+      val destroyProcess = !waitCompletion && builder.isClusterMode()
+      if (destroyProcess) {
+        info("Destroy the builder process because waitCompletion is false" +
+          " and the engine is running in cluster mode.")
+      }
+      builder.close(destroyProcess)
       updateApplicationInfoMetadataIfNeeded()
       cleanupUploadedResourceIfNeeded()
     }
@@ -335,14 +343,14 @@ class BatchJobSubmission(
       // fast fail
       if (isTerminalState(state)) {
         killMessage = (false, s"batch $batchId is already terminal so can not 
kill it.")
-        builder.close()
+        builder.close(true)
         cleanupUploadedResourceIfNeeded()
         return
       }
 
       try {
         killMessage = killBatchApplication()
-        builder.close()
+        builder.close(true)
         cleanupUploadedResourceIfNeeded()
       } finally {
         if (state == OperationState.INITIALIZED) {
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index e70acf8ad..ad8324c85 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -141,7 +141,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper 
with MockitoSugar {
       assert(!process.logCaptureThreadReleased)
       subProcess.waitFor(3, TimeUnit.SECONDS)
     } finally {
-      process.close()
+      process.close(true)
     }
     eventually(timeout(3.seconds), interval(100.milliseconds)) {
       assert(process.logCaptureThreadReleased)
@@ -173,7 +173,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper 
with MockitoSugar {
               val p = pb.start
               p.waitFor()
             } finally {
-              pb.close()
+              pb.close(true)
             }
           })
         }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
index 4e18951b5..7cf939910 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
@@ -40,6 +40,7 @@ class BatchCliSuite extends RestClientTestHelper with 
TestPrematureExit with Bat
 
   val basePath: String = Utils.getCodeSourceLocation(getClass)
   val batchFile: String = s"${basePath}/batch.yaml"
+  val longTimeBatchFile: String = s"${basePath}/batch_long_time.yaml"
 
   override protected val otherConfigs: Map[String, String] = {
     Map(KyuubiConf.BATCH_APPLICATION_CHECK_INTERVAL.key -> "100")
@@ -72,6 +73,27 @@ class BatchCliSuite extends RestClientTestHelper with 
TestPrematureExit with Bat
                          |options:
                          |  verbose: true""".stripMargin
     Files.write(Paths.get(batchFile), 
batch_basic.getBytes(StandardCharsets.UTF_8))
+
+    val long_time_batch_basic = s"""apiVersion: v1
+                                   |username: ${ldapUser}
+                                   |request:
+                                   |  batchType: Spark
+                                   |  name: LongTimeBatch
+                                   |  resource: ${sparkBatchTestResource.get}
+                                   |  className: 
org.apache.spark.examples.DriverSubmissionTest
+                                   |  args:
+                                   |   - 10
+                                   |  configs:
+                                   |    spark.master: local
+                                   |    wait.completion: true
+                                   |    k1: v1
+                                   |    1: test_integer_key
+                                   |    key:
+                                   |options:
+                                   |  verbose: true""".stripMargin
+    Files.write(
+      Paths.get(longTimeBatchFile),
+      long_time_batch_basic.getBytes(StandardCharsets.UTF_8))
   }
 
   override def afterEach(): Unit = {
@@ -94,7 +116,7 @@ class BatchCliSuite extends RestClientTestHelper with 
TestPrematureExit with Bat
       "create",
       "batch",
       "-f",
-      batchFile,
+      longTimeBatchFile,
       "--password",
       ldapUserPasswd)
     var result = testPrematureExitForControlCli(createArgs, "")
@@ -110,9 +132,15 @@ class BatchCliSuite extends RestClientTestHelper with 
TestPrematureExit with Bat
       ldapUser,
       "--password",
       ldapUserPasswd)
-    result = testPrematureExitForControlCli(getArgs, "SPARK")
-    assert(result.contains("SPARK"))
-    assert(result.contains(s"${fe.connectionUrl}"))
+    var invalidCount = 0
+    eventually(timeout(5.seconds), interval(100.milliseconds)) {
+      invalidCount += 1
+      result = testPrematureExitForControlCli(getArgs, "SPARK")
+      assert(result.contains("RUNNING"))
+      assert(result.contains("SPARK"))
+      assert(result.contains(s"${fe.connectionUrl}"))
+      invalidCount -= 1
+    }
 
     val logArgs = Array(
       "log",
@@ -140,7 +168,7 @@ class BatchCliSuite extends RestClientTestHelper with 
TestPrematureExit with Bat
 
     eventually(timeout(3.seconds), interval(200.milliseconds)) {
       assert(MetricsSystem.counterValue(
-        MetricsConstants.REST_CONN_TOTAL).getOrElse(0L) - totalConnections === 
5)
+        MetricsConstants.REST_CONN_TOTAL).getOrElse(0L) - totalConnections - 
invalidCount === 5)
       
assert(MetricsSystem.counterValue(MetricsConstants.REST_CONN_OPEN).getOrElse(0L)
 === 0)
     }
   }
@@ -152,7 +180,7 @@ class BatchCliSuite extends RestClientTestHelper with 
TestPrematureExit with Bat
       "create",
       "batch",
       "-f",
-      batchFile,
+      longTimeBatchFile,
       "--authSchema",
       "SPNEGO")
     var result = testPrematureExitForControlCli(createArgs, "")
@@ -166,9 +194,12 @@ class BatchCliSuite extends RestClientTestHelper with 
TestPrematureExit with Bat
       batchId,
       "--authSchema",
       "spnego")
-    result = testPrematureExitForControlCli(getArgs, "SPARK")
-    assert(result.contains("SPARK"))
-    assert(result.contains(s"${fe.connectionUrl}"))
+    eventually(timeout(5.seconds), interval(100.milliseconds)) {
+      result = testPrematureExitForControlCli(getArgs, "SPARK")
+      assert(result.contains("RUNNING"))
+      assert(result.contains("SPARK"))
+      assert(result.contains(s"${fe.connectionUrl}"))
+    }
 
     val logArgs = Array(
       "log",

Reply via email to