Repository: spark Updated Branches: refs/heads/master 1e8233541 -> 03f2b7bff
[SPARK-22535][PYSPARK] Sleep before killing the python worker in PythonRunner.MonitorThread ## What changes were proposed in this pull request? `PythonRunner.MonitorThread` should give the task a little time to finish before forcibly killing the python worker. This will reduce the chance of the race condition a lot. I also improved the log a bit to find out the task to blame when it's stuck. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #19762 from zsxwing/SPARK-22535. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03f2b7bf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03f2b7bf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03f2b7bf Branch: refs/heads/master Commit: 03f2b7bff7e537ec747b41ad22e448e1c141f0dd Parents: 1e82335 Author: Shixiong Zhu <[email protected]> Authored: Thu Nov 16 14:22:25 2017 +0900 Committer: Takuya UESHIN <[email protected]> Committed: Thu Nov 16 14:22:25 2017 +0900 ---------------------------------------------------------------------- .../apache/spark/api/python/PythonRunner.scala | 21 ++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/03f2b7bf/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index d417303..9989f68 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -337,6 +337,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( class MonitorThread(env: SparkEnv, worker: Socket, context: TaskContext) extends Thread(s"Worker Monitor for $pythonExec") { + /** How long to wait before killing the python worker if a task cannot be interrupted. */ + private val taskKillTimeout = env.conf.getTimeAsMs("spark.python.task.killTimeout", "2s") + setDaemon(true) override def run() { @@ -346,12 +349,18 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( Thread.sleep(2000) } if (!context.isCompleted) { - try { - logWarning("Incomplete task interrupted: Attempting to kill Python Worker") - env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker) - } catch { - case e: Exception => - logError("Exception when trying to kill worker", e) + Thread.sleep(taskKillTimeout) + if (!context.isCompleted) { + try { + // Mimic the task name used in `Executor` to help the user find out the task to blame. + val taskName = s"${context.partitionId}.${context.taskAttemptId} " + + s"in stage ${context.stageId} (TID ${context.taskAttemptId})" + logWarning(s"Incomplete task $taskName interrupted: Attempting to kill Python Worker") + env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker) + } catch { + case e: Exception => + logError("Exception when trying to kill worker", e) + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
