This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new 4fb78cf43 [KYUUBI #5043][1.7] Destroy the build process when
waitCompletion is false and the engine is running in cluster mode
4fb78cf43 is described below
commit 4fb78cf431397f242682644d37008353da5262e8
Author: wforget <[email protected]>
AuthorDate: Mon Jul 31 01:21:27 2023 +0800
[KYUUBI #5043][1.7] Destroy the build process when waitCompletion is false
and the engine is running in cluster mode
### _Why are the changes needed?_
When waitCompletion is false, we need to determine whether it is in cluster
mode to avoid killing the engine running locally.
close #5043
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
Closes #5096 from wForget/dev-1.7.
Closes #5043
4e837dc13 [wforget] [KYUUBI #5043] Destroy the build process when
waitCompletion is false and the engine is running in cluster mode
Authored-by: wforget <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 8 +++-
.../org/apache/kyuubi/engine/ProcBuilder.scala | 7 ++--
.../kyuubi/engine/spark/SparkProcessBuilder.scala | 42 ++++++++++++-------
.../kyuubi/operation/BatchJobSubmission.scala | 14 +++++--
.../engine/spark/SparkProcessBuilderSuite.scala | 4 +-
.../kyuubi/server/rest/client/BatchCliSuite.scala | 49 ++++++++++++++++++----
6 files changed, 91 insertions(+), 33 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index e2ddb4221..332cd2bd8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -251,9 +251,15 @@ private[kyuubi] class EngineRef(
}
engineRef.get
} finally {
+ val waitCompletion =
conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
+ val destroyProcess = !waitCompletion && builder.isClusterMode()
+ if (destroyProcess) {
+ info("Destroy the builder process because waitCompletion is false" +
+ " and the engine is running in cluster mode.")
+ }
// we must close the process builder whether session open is success or
failure since
// we have a log capture thread in process builder.
- builder.close()
+ builder.close(destroyProcess)
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index 4c7330b4d..a538201ea 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -155,7 +155,7 @@ trait ProcBuilder {
@volatile private var error: Throwable = UNCAUGHT_ERROR
private val engineLogMaxLines =
conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_MAX_LOG_LINES)
- private val waitCompletion =
conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
+
protected val lastRowsOfLog: EvictingQueue[String] =
EvictingQueue.create(engineLogMaxLines)
// Visible for test
@volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true
@@ -249,13 +249,14 @@ trait ProcBuilder {
process
}
- def close(destroyProcess: Boolean = !waitCompletion): Unit = synchronized {
+ def isClusterMode(): Boolean = false
+
+ def close(destroyProcess: Boolean): Unit = synchronized {
if (logCaptureThread != null) {
logCaptureThread.interrupt()
logCaptureThread = null
}
if (destroyProcess && process != null) {
- info("Destroy the process, since waitCompletion is false.")
process.destroyForcibly()
process = null
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index b74eab77d..02ce5829e 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.engine.spark
import java.io.{File, IOException}
import java.nio.file.Paths
+import java.util.Locale
import scala.collection.mutable.ArrayBuffer
@@ -183,26 +184,36 @@ class SparkProcessBuilder(
override def shortName: String = "spark"
- protected lazy val defaultMaster: Option[String] = {
+ protected lazy val defaultsConf: Map[String, String] = {
val confDir = env.getOrElse(SPARK_CONF_DIR,
s"$sparkHome${File.separator}conf")
- val defaults =
- try {
- val confFile = new
File(s"$confDir${File.separator}$SPARK_CONF_FILE_NAME")
- if (confFile.exists()) {
- Utils.getPropertiesFromFile(Some(confFile))
- } else {
- Map.empty[String, String]
- }
- } catch {
- case _: Exception =>
- warn(s"Failed to load spark configurations from $confDir")
- Map.empty[String, String]
+ try {
+ val confFile = new
File(s"$confDir${File.separator}$SPARK_CONF_FILE_NAME")
+ if (confFile.exists()) {
+ Utils.getPropertiesFromFile(Some(confFile))
+ } else {
+ Map.empty[String, String]
}
- defaults.get(MASTER_KEY)
+ } catch {
+ case _: Exception =>
+ warn(s"Failed to load spark configurations from $confDir")
+ Map.empty[String, String]
+ }
}
override def clusterManager(): Option[String] = {
- conf.getOption(MASTER_KEY).orElse(defaultMaster)
+ conf.getOption(MASTER_KEY).orElse(defaultsConf.get(MASTER_KEY))
+ }
+
+ def deployMode(): Option[String] = {
+ conf.getOption(DEPLOY_MODE_KEY).orElse(defaultsConf.get(DEPLOY_MODE_KEY))
+ }
+
+ override def isClusterMode(): Boolean = {
+ clusterManager().map(_.toLowerCase(Locale.ROOT)) match {
+ case Some(m) if m.startsWith("yarn") || m.startsWith("k8s") =>
+ deployMode().exists(_.toLowerCase(Locale.ROOT) == "cluster")
+ case _ => false
+ }
}
override def validateConf: Unit = Validator.validateConf(conf)
@@ -224,6 +235,7 @@ object SparkProcessBuilder {
final val APP_KEY = "spark.app.name"
final val TAG_KEY = "spark.yarn.tags"
final val MASTER_KEY = "spark.master"
+ final val DEPLOY_MODE_KEY = "spark.submit.deployMode"
final val INTERNAL_RESOURCE = "spark-internal"
/**
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 475618c87..d3ee1ba80 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -278,7 +278,15 @@ class BatchJobSubmission(
}
}
} finally {
- builder.close()
+ val waitCompletion =
batchConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION.key)
+ .map(_.toBoolean).getOrElse(
+
session.sessionConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION))
+ val destroyProcess = !waitCompletion && builder.isClusterMode()
+ if (destroyProcess) {
+ info("Destroy the builder process because waitCompletion is false" +
+ " and the engine is running in cluster mode.")
+ }
+ builder.close(destroyProcess)
updateApplicationInfoMetadataIfNeeded()
cleanupUploadedResourceIfNeeded()
}
@@ -345,14 +353,14 @@ class BatchJobSubmission(
// fast fail
if (isTerminalState(state)) {
killMessage = (false, s"batch $batchId is already terminal so can not
kill it.")
- builder.close()
+ builder.close(true)
cleanupUploadedResourceIfNeeded()
return
}
try {
killMessage = killBatchApplication()
- builder.close()
+ builder.close(true)
cleanupUploadedResourceIfNeeded()
} finally {
if (state == OperationState.INITIALIZED) {
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index 7b204dafb..9583e2b1c 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -141,7 +141,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper
with MockitoSugar {
assert(!process.logCaptureThreadReleased)
subProcess.waitFor(3, TimeUnit.SECONDS)
} finally {
- process.close()
+ process.close(true)
}
eventually(timeout(3.seconds), interval(100.milliseconds)) {
assert(process.logCaptureThreadReleased)
@@ -174,7 +174,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper
with MockitoSugar {
val p = pb.start
p.waitFor()
} finally {
- pb.close()
+ pb.close(true)
}
}
})
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
index ff807ef02..f85107631 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
@@ -39,6 +39,7 @@ class BatchCliSuite extends RestClientTestHelper with
TestPrematureExit with Bat
val basePath: String = Utils.getCodeSourceLocation(getClass)
val batchFile: String = s"${basePath}/batch.yaml"
+ val longTimeBatchFile: String = s"${basePath}/batch_long_time.yaml"
override protected val otherConfigs: Map[String, String] = {
Map(KyuubiConf.BATCH_APPLICATION_CHECK_INTERVAL.key -> "100")
@@ -71,6 +72,27 @@ class BatchCliSuite extends RestClientTestHelper with
TestPrematureExit with Bat
|options:
| verbose: true""".stripMargin
Files.write(Paths.get(batchFile),
batch_basic.getBytes(StandardCharsets.UTF_8))
+
+ val long_time_batch_basic = s"""apiVersion: v1
+ |username: ${ldapUser}
+ |request:
+ | batchType: Spark
+ | name: LongTimeBatch
+ | resource: ${sparkBatchTestResource.get}
+ | className:
org.apache.spark.examples.DriverSubmissionTest
+ | args:
+ | - 10
+ | configs:
+ | spark.master: local
+ | wait.completion: true
+ | k1: v1
+ | 1: test_integer_key
+ | key:
+ |options:
+ | verbose: true""".stripMargin
+ Files.write(
+ Paths.get(longTimeBatchFile),
+ long_time_batch_basic.getBytes(StandardCharsets.UTF_8))
}
override def afterEach(): Unit = {
@@ -93,7 +115,7 @@ class BatchCliSuite extends RestClientTestHelper with
TestPrematureExit with Bat
"create",
"batch",
"-f",
- batchFile,
+ longTimeBatchFile,
"--password",
ldapUserPasswd)
var result = testPrematureExitForControlCli(createArgs, "")
@@ -109,9 +131,15 @@ class BatchCliSuite extends RestClientTestHelper with
TestPrematureExit with Bat
ldapUser,
"--password",
ldapUserPasswd)
- result = testPrematureExitForControlCli(getArgs, "SPARK")
- assert(result.contains("SPARK"))
- assert(result.contains(s"${fe.connectionUrl}"))
+ var invalidCount = 0
+ eventually(timeout(5.seconds), interval(100.milliseconds)) {
+ invalidCount += 1
+ result = testPrematureExitForControlCli(getArgs, "SPARK")
+ assert(result.contains("RUNNING"))
+ assert(result.contains("SPARK"))
+ assert(result.contains(s"${fe.connectionUrl}"))
+ invalidCount -= 1
+ }
val logArgs = Array(
"log",
@@ -139,7 +167,7 @@ class BatchCliSuite extends RestClientTestHelper with
TestPrematureExit with Bat
eventually(timeout(3.seconds), interval(200.milliseconds)) {
assert(MetricsSystem.counterValue(
- MetricsConstants.REST_CONN_TOTAL).getOrElse(0L) - totalConnections ===
5)
+ MetricsConstants.REST_CONN_TOTAL).getOrElse(0L) - totalConnections -
invalidCount === 5)
assert(MetricsSystem.counterValue(MetricsConstants.REST_CONN_OPEN).getOrElse(0L)
=== 0)
}
}
@@ -151,7 +179,7 @@ class BatchCliSuite extends RestClientTestHelper with
TestPrematureExit with Bat
"create",
"batch",
"-f",
- batchFile,
+ longTimeBatchFile,
"--authSchema",
"SPNEGO")
var result = testPrematureExitForControlCli(createArgs, "")
@@ -165,9 +193,12 @@ class BatchCliSuite extends RestClientTestHelper with
TestPrematureExit with Bat
batchId,
"--authSchema",
"spnego")
- result = testPrematureExitForControlCli(getArgs, "SPARK")
- assert(result.contains("SPARK"))
- assert(result.contains(s"${fe.connectionUrl}"))
+ eventually(timeout(5.seconds), interval(100.milliseconds)) {
+ result = testPrematureExitForControlCli(getArgs, "SPARK")
+ assert(result.contains("RUNNING"))
+ assert(result.contains("SPARK"))
+ assert(result.contains(s"${fe.connectionUrl}"))
+ }
val logArgs = Array(
"log",