This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new ac21e271f [KYUUBI #5043] Destroy the build process when waitCompletion
is false and the engine is running in cluster mode
ac21e271f is described below
commit ac21e271fe717ff8159dc811778ee5a5ef7c9198
Author: wforget <[email protected]>
AuthorDate: Mon Jul 17 18:12:11 2023 +0800
[KYUUBI #5043] Destroy the build process when waitCompletion is false and
the engine is running in cluster mode
### _Why are the changes needed?_
When waitCompletion is false, we need to determine whether it is in cluster
mode to avoid killing the engine running locally.
close #5043
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
Closes #5044 from wForget/KYUUBI-5043.
Closes #5043
9d26aea15 [wforget] fix style
aad322f1a [wforget] fix test
20d082ae1 [wforget] [KYUUBI-5043] Destroy the build process when
waitCompletion is false and the engine is running in cluster mode
Authored-by: wforget <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 8 +++-
.../org/apache/kyuubi/engine/ProcBuilder.scala | 7 ++--
.../kyuubi/engine/spark/SparkProcessBuilder.scala | 14 +++++++
.../kyuubi/operation/BatchJobSubmission.scala | 14 +++++--
.../engine/spark/SparkProcessBuilderSuite.scala | 4 +-
.../kyuubi/server/rest/client/BatchCliSuite.scala | 49 ++++++++++++++++++----
6 files changed, 78 insertions(+), 18 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 387758714..5ade86400 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -267,9 +267,15 @@ private[kyuubi] class EngineRef(
}
engineRef.get
} finally {
+ val waitCompletion =
conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
+ val destroyProcess = !waitCompletion && builder.isClusterMode()
+ if (destroyProcess) {
+ info("Destroy the builder process because waitCompletion is false" +
+ " and the engine is running in cluster mode.")
+ }
// we must close the process builder whether session open is success or
failure since
// we have a log capture thread in process builder.
- builder.close()
+ builder.close(destroyProcess)
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index d30e72674..a44fe06bc 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -155,7 +155,7 @@ trait ProcBuilder {
@volatile private var error: Throwable = UNCAUGHT_ERROR
private val engineLogMaxLines =
conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_MAX_LOG_LINES)
- private val waitCompletion =
conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
+
protected val lastRowsOfLog: EvictingQueue[String] =
EvictingQueue.create(engineLogMaxLines)
// Visible for test
@volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true
@@ -249,13 +249,14 @@ trait ProcBuilder {
process
}
- def close(destroyProcess: Boolean = !waitCompletion): Unit = synchronized {
+ def isClusterMode(): Boolean = false
+
+ def close(destroyProcess: Boolean): Unit = synchronized {
if (logCaptureThread != null) {
logCaptureThread.interrupt()
logCaptureThread = null
}
if (destroyProcess && process != null) {
- info("Destroy the process, since waitCompletion is false.")
process.destroyForcibly()
process = null
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 6110c0246..5bfe506da 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.engine.spark
import java.io.{File, IOException}
import java.nio.file.Paths
+import java.util.Locale
import scala.collection.mutable.ArrayBuffer
@@ -210,6 +211,18 @@ class SparkProcessBuilder(
conf.getOption(MASTER_KEY).orElse(defaultsConf.get(MASTER_KEY))
}
+ def deployMode(): Option[String] = {
+ conf.getOption(DEPLOY_MODE_KEY).orElse(defaultsConf.get(DEPLOY_MODE_KEY))
+ }
+
+ override def isClusterMode(): Boolean = {
+ clusterManager().map(_.toLowerCase(Locale.ROOT)) match {
+ case Some(m) if m.startsWith("yarn") || m.startsWith("k8s") =>
+ deployMode().exists(_.toLowerCase(Locale.ROOT) == "cluster")
+ case _ => false
+ }
+ }
+
def kubernetesContext(): Option[String] = {
conf.getOption(KUBERNETES_CONTEXT_KEY).orElse(defaultsConf.get(KUBERNETES_CONTEXT_KEY))
}
@@ -237,6 +250,7 @@ object SparkProcessBuilder {
final val APP_KEY = "spark.app.name"
final val TAG_KEY = "spark.yarn.tags"
final val MASTER_KEY = "spark.master"
+ final val DEPLOY_MODE_KEY = "spark.submit.deployMode"
final val KUBERNETES_CONTEXT_KEY = "spark.kubernetes.context"
final val KUBERNETES_NAMESPACE_KEY = "spark.kubernetes.namespace"
final val INTERNAL_RESOURCE = "spark-internal"
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 82562541d..8bb9804ec 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -268,7 +268,15 @@ class BatchJobSubmission(
}
}
} finally {
- builder.close()
+ val waitCompletion =
batchConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION.key)
+ .map(_.toBoolean).getOrElse(
+
session.sessionConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION))
+ val destroyProcess = !waitCompletion && builder.isClusterMode()
+ if (destroyProcess) {
+ info("Destroy the builder process because waitCompletion is false" +
+ " and the engine is running in cluster mode.")
+ }
+ builder.close(destroyProcess)
updateApplicationInfoMetadataIfNeeded()
cleanupUploadedResourceIfNeeded()
}
@@ -335,14 +343,14 @@ class BatchJobSubmission(
// fast fail
if (isTerminalState(state)) {
killMessage = (false, s"batch $batchId is already terminal so can not
kill it.")
- builder.close()
+ builder.close(true)
cleanupUploadedResourceIfNeeded()
return
}
try {
killMessage = killBatchApplication()
- builder.close()
+ builder.close(true)
cleanupUploadedResourceIfNeeded()
} finally {
if (state == OperationState.INITIALIZED) {
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index e70acf8ad..ad8324c85 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -141,7 +141,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper
with MockitoSugar {
assert(!process.logCaptureThreadReleased)
subProcess.waitFor(3, TimeUnit.SECONDS)
} finally {
- process.close()
+ process.close(true)
}
eventually(timeout(3.seconds), interval(100.milliseconds)) {
assert(process.logCaptureThreadReleased)
@@ -173,7 +173,7 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper
with MockitoSugar {
val p = pb.start
p.waitFor()
} finally {
- pb.close()
+ pb.close(true)
}
})
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
index 4e18951b5..7cf939910 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
@@ -40,6 +40,7 @@ class BatchCliSuite extends RestClientTestHelper with
TestPrematureExit with Bat
val basePath: String = Utils.getCodeSourceLocation(getClass)
val batchFile: String = s"${basePath}/batch.yaml"
+ val longTimeBatchFile: String = s"${basePath}/batch_long_time.yaml"
override protected val otherConfigs: Map[String, String] = {
Map(KyuubiConf.BATCH_APPLICATION_CHECK_INTERVAL.key -> "100")
@@ -72,6 +73,27 @@ class BatchCliSuite extends RestClientTestHelper with
TestPrematureExit with Bat
|options:
| verbose: true""".stripMargin
Files.write(Paths.get(batchFile),
batch_basic.getBytes(StandardCharsets.UTF_8))
+
+ val long_time_batch_basic = s"""apiVersion: v1
+ |username: ${ldapUser}
+ |request:
+ | batchType: Spark
+ | name: LongTimeBatch
+ | resource: ${sparkBatchTestResource.get}
+ | className:
org.apache.spark.examples.DriverSubmissionTest
+ | args:
+ | - 10
+ | configs:
+ | spark.master: local
+ | wait.completion: true
+ | k1: v1
+ | 1: test_integer_key
+ | key:
+ |options:
+ | verbose: true""".stripMargin
+ Files.write(
+ Paths.get(longTimeBatchFile),
+ long_time_batch_basic.getBytes(StandardCharsets.UTF_8))
}
override def afterEach(): Unit = {
@@ -94,7 +116,7 @@ class BatchCliSuite extends RestClientTestHelper with
TestPrematureExit with Bat
"create",
"batch",
"-f",
- batchFile,
+ longTimeBatchFile,
"--password",
ldapUserPasswd)
var result = testPrematureExitForControlCli(createArgs, "")
@@ -110,9 +132,15 @@ class BatchCliSuite extends RestClientTestHelper with
TestPrematureExit with Bat
ldapUser,
"--password",
ldapUserPasswd)
- result = testPrematureExitForControlCli(getArgs, "SPARK")
- assert(result.contains("SPARK"))
- assert(result.contains(s"${fe.connectionUrl}"))
+ var invalidCount = 0
+ eventually(timeout(5.seconds), interval(100.milliseconds)) {
+ invalidCount += 1
+ result = testPrematureExitForControlCli(getArgs, "SPARK")
+ assert(result.contains("RUNNING"))
+ assert(result.contains("SPARK"))
+ assert(result.contains(s"${fe.connectionUrl}"))
+ invalidCount -= 1
+ }
val logArgs = Array(
"log",
@@ -140,7 +168,7 @@ class BatchCliSuite extends RestClientTestHelper with
TestPrematureExit with Bat
eventually(timeout(3.seconds), interval(200.milliseconds)) {
assert(MetricsSystem.counterValue(
- MetricsConstants.REST_CONN_TOTAL).getOrElse(0L) - totalConnections ===
5)
+ MetricsConstants.REST_CONN_TOTAL).getOrElse(0L) - totalConnections -
invalidCount === 5)
assert(MetricsSystem.counterValue(MetricsConstants.REST_CONN_OPEN).getOrElse(0L)
=== 0)
}
}
@@ -152,7 +180,7 @@ class BatchCliSuite extends RestClientTestHelper with
TestPrematureExit with Bat
"create",
"batch",
"-f",
- batchFile,
+ longTimeBatchFile,
"--authSchema",
"SPNEGO")
var result = testPrematureExitForControlCli(createArgs, "")
@@ -166,9 +194,12 @@ class BatchCliSuite extends RestClientTestHelper with
TestPrematureExit with Bat
batchId,
"--authSchema",
"spnego")
- result = testPrematureExitForControlCli(getArgs, "SPARK")
- assert(result.contains("SPARK"))
- assert(result.contains(s"${fe.connectionUrl}"))
+ eventually(timeout(5.seconds), interval(100.milliseconds)) {
+ result = testPrematureExitForControlCli(getArgs, "SPARK")
+ assert(result.contains("RUNNING"))
+ assert(result.contains("SPARK"))
+ assert(result.contains(s"${fe.connectionUrl}"))
+ }
val logArgs = Array(
"log",