This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new 4fb78cf43 [KYUUBI #5043][1.7] Destroy the build process when 
waitCompletion is false and the engine is running in cluster mode
4fb78cf43 is described below

commit 4fb78cf431397f242682644d37008353da5262e8
Author: wforget <[email protected]>
AuthorDate: Mon Jul 31 01:21:27 2023 +0800

    [KYUUBI #5043][1.7] Destroy the build process when waitCompletion is false 
and the engine is running in cluster mode
    
    ### _Why are the changes needed?_
    
    When waitCompletion is false, we need to determine whether it is in cluster 
mode to avoid killing the engine running locally.
    
    close #5043
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    Closes #5096 from wForget/dev-1.7.
    
    Closes #5043
    
    4e837dc13 [wforget] [KYUUBI #5043] Destroy the build process when 
waitCompletion is false and the engine is running in cluster mode
    
    Authored-by: wforget <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../scala/org/apache/kyuubi/engine/EngineRef.scala |  8 +++-
 .../org/apache/kyuubi/engine/ProcBuilder.scala     |  7 ++--
 .../kyuubi/engine/spark/SparkProcessBuilder.scala  | 42 ++++++++++++-------
 .../kyuubi/operation/BatchJobSubmission.scala      | 14 +++++--
 .../engine/spark/SparkProcessBuilderSuite.scala    |  4 +-
 .../kyuubi/server/rest/client/BatchCliSuite.scala  | 49 ++++++++++++++++++----
 6 files changed, 91 insertions(+), 33 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index e2ddb4221..332cd2bd8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -251,9 +251,15 @@ private[kyuubi] class EngineRef(
       }
       engineRef.get
     } finally {
+      val waitCompletion = 
conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
+      val destroyProcess = !waitCompletion && builder.isClusterMode()
+      if (destroyProcess) {
+        info("Destroy the builder process because waitCompletion is false" +
+          " and the engine is running in cluster mode.")
+      }
       // we must close the process builder whether session open is success or 
failure since
       // we have a log capture thread in process builder.
-      builder.close()
+      builder.close(destroyProcess)
     }
   }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 4c7330b4d..a538201ea 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -155,7 +155,7 @@ trait ProcBuilder {
   @volatile private var error: Throwable = UNCAUGHT_ERROR
 
   private val engineLogMaxLines = 
conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_MAX_LOG_LINES)
-  private val waitCompletion = 
conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
+
   protected val lastRowsOfLog: EvictingQueue[String] = 
EvictingQueue.create(engineLogMaxLines)
   // Visible for test
   @volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true
@@ -249,13 +249,14 @@ trait ProcBuilder {
     process
   }
 
-  def close(destroyProcess: Boolean = !waitCompletion): Unit = synchronized {
+  def isClusterMode(): Boolean = false
+
+  def close(destroyProcess: Boolean): Unit = synchronized {
     if (logCaptureThread != null) {
       logCaptureThread.interrupt()
       logCaptureThread = null
     }
     if (destroyProcess && process != null) {
-      info("Destroy the process, since waitCompletion is false.")
       process.destroyForcibly()
       process = null
     }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index b74eab77d..02ce5829e 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.engine.spark
 
 import java.io.{File, IOException}
 import java.nio.file.Paths
+import java.util.Locale
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -183,26 +184,36 @@ class SparkProcessBuilder(
 
   override def shortName: String = "spark"
 
-  protected lazy val defaultMaster: Option[String] = {
+  protected lazy val defaultsConf: Map[String, String] = {
     val confDir = env.getOrElse(SPARK_CONF_DIR, 
s"$sparkHome${File.separator}conf")
-    val defaults =
-      try {
-        val confFile = new 
File(s"$confDir${File.separator}$SPARK_CONF_FILE_NAME")
-        if (confFile.exists()) {
-          Utils.getPropertiesFromFile(Some(confFile))
-        } else {
-          Map.empty[String, String]
-        }
-      } catch {
-        case _: Exception =>
-          warn(s"Failed to load spark configurations from $confDir")
-          Map.empty[String, String]
+    try {
+      val confFile = new 
File(s"$confDir${File.separator}$SPARK_CONF_FILE_NAME")
+      if (confFile.exists()) {
+        Utils.getPropertiesFromFile(Some(confFile))
+      } else {
+        Map.empty[String, String]
       }
-    defaults.get(MASTER_KEY)
+    } catch {
+      case _: Exception =>
+        warn(s"Failed to load spark configurations from $confDir")
+        Map.empty[String, String]
+    }
   }
 
   override def clusterManager(): Option[String] = {
-    conf.getOption(MASTER_KEY).orElse(defaultMaster)
+    conf.getOption(MASTER_KEY).orElse(defaultsConf.get(MASTER_KEY))
+  }
+
+  def deployMode(): Option[String] = {
+    conf.getOption(DEPLOY_MODE_KEY).orElse(defaultsConf.get(DEPLOY_MODE_KEY))
+  }
+
+  override def isClusterMode(): Boolean = {
+    clusterManager().map(_.toLowerCase(Locale.ROOT)) match {
+      case Some(m) if m.startsWith("yarn") || m.startsWith("k8s") =>
+        deployMode().exists(_.toLowerCase(Locale.ROOT) == "cluster")
+      case _ => false
+    }
   }
 
   override def validateConf: Unit = Validator.validateConf(conf)
@@ -224,6 +235,7 @@ object SparkProcessBuilder {
   final val APP_KEY = "spark.app.name"
   final val TAG_KEY = "spark.yarn.tags"
   final val MASTER_KEY = "spark.master"
+  final val DEPLOY_MODE_KEY = "spark.submit.deployMode"
   final val INTERNAL_RESOURCE = "spark-internal"
 
   /**
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 475618c87..d3ee1ba80 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -278,7 +278,15 @@ class BatchJobSubmission(
         }
       }
     } finally {
-      builder.close()
+      val waitCompletion = 
batchConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION.key)
+        .map(_.toBoolean).getOrElse(
+          
session.sessionConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION))
+      val destroyProcess = !waitCompletion && builder.isClusterMode()
+      if (destroyProcess) {
+        info("Destroy the builder process because waitCompletion is false" +
+          " and the engine is running in cluster mode.")
+      }
+      builder.close(destroyProcess)
       updateApplicationInfoMetadataIfNeeded()
       cleanupUploadedResourceIfNeeded()
     }
@@ -345,14 +353,14 @@ class BatchJobSubmission(
       // fast fail
       if (isTerminalState(state)) {
         killMessage = (false, s"batch $batchId is already terminal so can not 
kill it.")
-        builder.close()
+        builder.close(true)
         cleanupUploadedResourceIfNeeded()
         return
       }
 
       try {
         killMessage = killBatchApplication()
-        builder.close()
+        builder.close(true)
         cleanupUploadedResourceIfNeeded()
       } finally {
         if (state == OperationState.INITIALIZED) {
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index 7b204dafb..9583e2b1c 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -141,7 +141,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper 
with MockitoSugar {
       assert(!process.logCaptureThreadReleased)
       subProcess.waitFor(3, TimeUnit.SECONDS)
     } finally {
-      process.close()
+      process.close(true)
     }
     eventually(timeout(3.seconds), interval(100.milliseconds)) {
       assert(process.logCaptureThreadReleased)
@@ -174,7 +174,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper 
with MockitoSugar {
                 val p = pb.start
                 p.waitFor()
               } finally {
-                pb.close()
+                pb.close(true)
               }
             }
           })
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
index ff807ef02..f85107631 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
@@ -39,6 +39,7 @@ class BatchCliSuite extends RestClientTestHelper with 
TestPrematureExit with Bat
 
   val basePath: String = Utils.getCodeSourceLocation(getClass)
   val batchFile: String = s"${basePath}/batch.yaml"
+  val longTimeBatchFile: String = s"${basePath}/batch_long_time.yaml"
 
   override protected val otherConfigs: Map[String, String] = {
     Map(KyuubiConf.BATCH_APPLICATION_CHECK_INTERVAL.key -> "100")
@@ -71,6 +72,27 @@ class BatchCliSuite extends RestClientTestHelper with 
TestPrematureExit with Bat
                          |options:
                          |  verbose: true""".stripMargin
     Files.write(Paths.get(batchFile), 
batch_basic.getBytes(StandardCharsets.UTF_8))
+
+    val long_time_batch_basic = s"""apiVersion: v1
+                                   |username: ${ldapUser}
+                                   |request:
+                                   |  batchType: Spark
+                                   |  name: LongTimeBatch
+                                   |  resource: ${sparkBatchTestResource.get}
+                                   |  className: 
org.apache.spark.examples.DriverSubmissionTest
+                                   |  args:
+                                   |   - 10
+                                   |  configs:
+                                   |    spark.master: local
+                                   |    wait.completion: true
+                                   |    k1: v1
+                                   |    1: test_integer_key
+                                   |    key:
+                                   |options:
+                                   |  verbose: true""".stripMargin
+    Files.write(
+      Paths.get(longTimeBatchFile),
+      long_time_batch_basic.getBytes(StandardCharsets.UTF_8))
   }
 
   override def afterEach(): Unit = {
@@ -93,7 +115,7 @@ class BatchCliSuite extends RestClientTestHelper with 
TestPrematureExit with Bat
       "create",
       "batch",
       "-f",
-      batchFile,
+      longTimeBatchFile,
       "--password",
       ldapUserPasswd)
     var result = testPrematureExitForControlCli(createArgs, "")
@@ -109,9 +131,15 @@ class BatchCliSuite extends RestClientTestHelper with 
TestPrematureExit with Bat
       ldapUser,
       "--password",
       ldapUserPasswd)
-    result = testPrematureExitForControlCli(getArgs, "SPARK")
-    assert(result.contains("SPARK"))
-    assert(result.contains(s"${fe.connectionUrl}"))
+    var invalidCount = 0
+    eventually(timeout(5.seconds), interval(100.milliseconds)) {
+      invalidCount += 1
+      result = testPrematureExitForControlCli(getArgs, "SPARK")
+      assert(result.contains("RUNNING"))
+      assert(result.contains("SPARK"))
+      assert(result.contains(s"${fe.connectionUrl}"))
+      invalidCount -= 1
+    }
 
     val logArgs = Array(
       "log",
@@ -139,7 +167,7 @@ class BatchCliSuite extends RestClientTestHelper with 
TestPrematureExit with Bat
 
     eventually(timeout(3.seconds), interval(200.milliseconds)) {
       assert(MetricsSystem.counterValue(
-        MetricsConstants.REST_CONN_TOTAL).getOrElse(0L) - totalConnections === 
5)
+        MetricsConstants.REST_CONN_TOTAL).getOrElse(0L) - totalConnections - 
invalidCount === 5)
       
assert(MetricsSystem.counterValue(MetricsConstants.REST_CONN_OPEN).getOrElse(0L)
 === 0)
     }
   }
@@ -151,7 +179,7 @@ class BatchCliSuite extends RestClientTestHelper with 
TestPrematureExit with Bat
       "create",
       "batch",
       "-f",
-      batchFile,
+      longTimeBatchFile,
       "--authSchema",
       "SPNEGO")
     var result = testPrematureExitForControlCli(createArgs, "")
@@ -165,9 +193,12 @@ class BatchCliSuite extends RestClientTestHelper with 
TestPrematureExit with Bat
       batchId,
       "--authSchema",
       "spnego")
-    result = testPrematureExitForControlCli(getArgs, "SPARK")
-    assert(result.contains("SPARK"))
-    assert(result.contains(s"${fe.connectionUrl}"))
+    eventually(timeout(5.seconds), interval(100.milliseconds)) {
+      result = testPrematureExitForControlCli(getArgs, "SPARK")
+      assert(result.contains("RUNNING"))
+      assert(result.contains("SPARK"))
+      assert(result.contains(s"${fe.connectionUrl}"))
+    }
 
     val logArgs = Array(
       "log",

Reply via email to