This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git

commit 247616a38541d82503bbc7e14fec2cfe429e206b
Author: peacewong <[email protected]>
AuthorDate: Tue Oct 10 21:11:34 2023 +0800

    Python supports killing subprocesses
---
 .../spark/executor/SparkPythonExecutor.scala       | 24 +++++++++-------------
 1 file changed, 10 insertions(+), 14 deletions(-)

diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
index a47a0b395..1a6203aed 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
@@ -32,6 +32,7 @@ import 
org.apache.linkis.engineplugin.spark.exception.ExecuteError
 import org.apache.linkis.engineplugin.spark.imexport.CsvRelation
 import org.apache.linkis.engineplugin.spark.utils.EngineUtils
 import org.apache.linkis.governance.common.paser.PythonCodeParser
+import org.apache.linkis.governance.common.utils.GovernanceUtils
 import org.apache.linkis.scheduler.executer.{ExecuteResponse, 
SuccessExecuteResponse}
 import org.apache.linkis.storage.resultset.ResultSetWriterFactory
 
@@ -120,16 +121,15 @@ class SparkPythonExecutor(val sparkEngineSession: 
SparkEngineSession, val id: In
       }
       IOUtils.closeQuietly(lineOutputStream)
       Utils.tryAndErrorMsg {
-        process.destroy()
-        process = null
-        Thread.sleep(1000 * 2L)
-        // process.destroy will kills the subprocess,not need to force kill 
with -9,
-        // kill -9 may cause resources not to be released
+        // invoke kill process method  to kill all tree process
         pid.foreach(p => {
           logger.info(s"Try to kill pyspark process with: [kill -15 ${p}]")
-          Utils.exec(Array("kill", "-15", p), 3000L)
+          GovernanceUtils.killProcess(String.valueOf(p), s"kill pyspark 
process,pid: $pid", false)
         })
-
+        if (pid.isEmpty) {
+          process.destroy()
+          process = null
+        }
       }("process close failed")
     }
     logger.info(s"To delete python executor")
@@ -253,9 +253,7 @@ class SparkPythonExecutor(val sparkEngineSession: 
SparkEngineSession, val id: In
       //      close
       Utils.tryFinally({
         if (promise != null && !promise.isCompleted) {
-          promise.failure(
-            new ExecuteError(PYSPARK_STOPPED.getErrorCode, 
PYSPARK_STOPPED.getErrorDesc)
-          )
+          promise.failure(ExecuteError(PYSPARK_STOPPED.getErrorCode, 
PYSPARK_STOPPED.getErrorDesc))
         }
       }) {
         close
@@ -289,11 +287,9 @@ class SparkPythonExecutor(val sparkEngineSession: 
SparkEngineSession, val id: In
     if (process == null) {
       Utils.tryThrow(initGateway) { t =>
         {
-          val errMsg =
-            s"initialize python executor failed, please ask administrator for 
help! errMsg: ${t.getMessage}"
-          logger.error(errMsg, t)
+          logger.error("initialize python executor failed, please ask 
administrator for help!", t)
           Utils.tryAndWarn(close)
-          throw new IllegalStateException(errMsg, t)
+          throw t
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to