This is an automated email from the ASF dual-hosted git repository. peacewong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/linkis.git
commit 247616a38541d82503bbc7e14fec2cfe429e206b Author: peacewong <[email protected]> AuthorDate: Tue Oct 10 21:11:34 2023 +0800 Python supports killing subprocesses --- .../spark/executor/SparkPythonExecutor.scala | 24 +++++++++------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala index a47a0b395..1a6203aed 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala @@ -32,6 +32,7 @@ import org.apache.linkis.engineplugin.spark.exception.ExecuteError import org.apache.linkis.engineplugin.spark.imexport.CsvRelation import org.apache.linkis.engineplugin.spark.utils.EngineUtils import org.apache.linkis.governance.common.paser.PythonCodeParser +import org.apache.linkis.governance.common.utils.GovernanceUtils import org.apache.linkis.scheduler.executer.{ExecuteResponse, SuccessExecuteResponse} import org.apache.linkis.storage.resultset.ResultSetWriterFactory @@ -120,16 +121,15 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In } IOUtils.closeQuietly(lineOutputStream) Utils.tryAndErrorMsg { - process.destroy() - process = null - Thread.sleep(1000 * 2L) - // process.destroy will kills the subprocess,not need to force kill with -9, - // kill -9 may cause resources not to be released + // invoke kill process method to kill all tree process pid.foreach(p => { logger.info(s"Try to kill pyspark process with: [kill -15 ${p}]") - Utils.exec(Array("kill", "-15", p), 3000L) + GovernanceUtils.killProcess(String.valueOf(p), s"kill pyspark process,pid: $pid", false) }) - + if (pid.isEmpty) { + process.destroy() + process = null + } }("process close failed") } logger.info(s"To delete python executor") @@ -253,9 +253,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In // close Utils.tryFinally({ if (promise != null && !promise.isCompleted) { - promise.failure( - new ExecuteError(PYSPARK_STOPPED.getErrorCode, PYSPARK_STOPPED.getErrorDesc) - ) + promise.failure(ExecuteError(PYSPARK_STOPPED.getErrorCode, PYSPARK_STOPPED.getErrorDesc)) } }) { close @@ -289,11 +287,9 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In if (process == null) { Utils.tryThrow(initGateway) { t => { - val errMsg = - s"initialize python executor failed, please ask administrator for help! errMsg: ${t.getMessage}" - logger.error(errMsg, t) + logger.error("initialize python executor failed, please ask administrator for help!", t) Utils.tryAndWarn(close) - throw new IllegalStateException(errMsg, t) + throw t } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
