This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new c0bfebd55 [KYUUBI #5065][FOLLOWUP] Graceful close the process when
launch engine timeout
c0bfebd55 is described below
commit c0bfebd5543a578c6554530a1575d6485be1d05a
Author: liupeiyue <[email protected]>
AuthorDate: Mon Jul 31 11:42:18 2023 +0800
[KYUUBI #5065][FOLLOWUP] Graceful close the process when launch engine
timeout
### _Why are the changes needed?_
#5065
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
Closes #5097 from ASiegeLion/master.
Closes #5065
d50a388d6 [Cheng Pan] followup
80861dd71 [liupeiyue] [KYUUBI #5065][FOLLOWUP] Graceful close the process
when launch engine timeout
Lead-authored-by: liupeiyue <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../src/main/scala/org/apache/kyuubi/Utils.scala | 23 ++++++++++++++++++++++
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 2 +-
.../org/apache/kyuubi/engine/ProcBuilder.scala | 9 +--------
.../kyuubi/operation/BatchJobSubmission.scala | 7 +++++--
4 files changed, 30 insertions(+), 11 deletions(-)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
index 06c572130..fac30a173 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
@@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths, StandardCopyOption}
import java.text.SimpleDateFormat
import java.util.{Date, Properties, TimeZone, UUID}
+import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.Lock
@@ -417,4 +418,26 @@ object Utils extends Logging {
lock.unlock()
}
}
+
+ /**
+ * Try killing the process gracefully first, then forcibly if process does
not exit in
+ * graceful period.
+ *
+ * @param process the being killed process
+ * @param gracefulPeriod the graceful killing period, in milliseconds
+ * @return the exit code if process exit normally, None if the process
finally was killed
+ * forcibly
+ */
+ def terminateProcess(process: java.lang.Process, gracefulPeriod: Long):
Option[Int] = {
+ process.destroy()
+ if (process.waitFor(gracefulPeriod, TimeUnit.MILLISECONDS)) {
+ Some(process.exitValue())
+ } else {
+ warn(s"Process does not exit after $gracefulPeriod ms, try to forcibly
kill. " +
+ "Staging files generated by the process may be retained!")
+ process.destroyForcibly()
+ None
+ }
+ }
+
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 5ade86400..123aec46c 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -224,7 +224,7 @@ private[kyuubi] class EngineRef(
if (started + timeout <= System.currentTimeMillis()) {
val killMessage =
engineManager.killApplication(builder.appMgrInfo(), engineRefId)
- process.destroyForcibly()
+ builder.close(true)
MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT,
appUser)))
throw KyuubiSQLException(
s"Timeout($timeout ms, you can modify ${ENGINE_INIT_TIMEOUT.key}
to change it) to" +
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index f1e49d687..304799db8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -21,7 +21,6 @@ import java.io.{File, FilenameFilter, IOException}
import java.net.URI
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths}
-import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
@@ -261,13 +260,7 @@ trait ProcBuilder {
logCaptureThread = null
}
if (destroyProcess && process != null) {
- process.destroy()
- if (!process.waitFor(engineStartupDestroyTimeout,
TimeUnit.MILLISECONDS)) {
- warn("Engine startup process does not exit after " +
- s"$engineStartupDestroyTimeout ms, try to forcibly kill. " +
- "Staging files generated by the process may be retained!")
- process.destroyForcibly()
- }
+ Utils.terminateProcess(process, engineStartupDestroyTimeout)
process = null
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 8bb9804ec..ac723b2c6 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -26,7 +26,7 @@ import com.codahale.metrics.MetricRegistry
import com.google.common.annotations.VisibleForTesting
import org.apache.hive.service.rpc.thrift._
-import org.apache.kyuubi.{KyuubiException, KyuubiSQLException}
+import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationState,
KillResponse, ProcBuilder}
import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
@@ -131,6 +131,9 @@ class BatchJobSubmission(
private val applicationStarvationTimeout =
session.sessionConf.get(KyuubiConf.BATCH_APPLICATION_STARVATION_TIMEOUT)
+ private val applicationStartupDestroyTimeout =
+ session.sessionConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_DESTROY_TIMEOUT)
+
private def updateBatchMetadata(): Unit = {
val endTime = if (isTerminalState(state)) lastAccessTime else 0L
@@ -246,7 +249,7 @@ class BatchJobSubmission(
}
if (applicationFailed(_applicationInfo)) {
- process.destroyForcibly()
+ Utils.terminateProcess(process, applicationStartupDestroyTimeout)
throw new KyuubiException(s"Batch job failed: ${_applicationInfo}")
} else {
process.waitFor()