Repository: spark Updated Branches: refs/heads/branch-2.0 243bdb11d -> 47fc0b9f4
[SPARK-17638][STREAMING] Stop JVM StreamingContext when the Python process is dead ## What changes were proposed in this pull request? When the Python process is dead, the JVM StreamingContext is still running. Hence we will see a lot of Py4jException before the JVM process exits. It's better to stop the JVM StreamingContext to avoid those annoying logs. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixi...@databricks.com> Closes #15201 from zsxwing/stop-jvm-ssc. (cherry picked from commit 3cdae0ff2f45643df7bc198cb48623526c7eb1a6) Signed-off-by: Shixiong Zhu <shixi...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47fc0b9f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47fc0b9f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47fc0b9f Branch: refs/heads/branch-2.0 Commit: 47fc0b9f40d814bc8e19f86dad591d4aed467222 Parents: 243bdb1 Author: Shixiong Zhu <shixi...@databricks.com> Authored: Thu Sep 22 14:26:45 2016 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Thu Sep 22 14:26:53 2016 -0700 ---------------------------------------------------------------------- .../streaming/api/python/PythonDStream.scala | 33 ++++++++++++++++++-- .../streaming/scheduler/JobGenerator.scala | 2 ++ .../streaming/scheduler/JobScheduler.scala | 2 ++ 3 files changed, 35 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/47fc0b9f/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index aeff4d7..46bfc60 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -24,11 +24,14 @@ import java.util.{ArrayList => JArrayList, List => JList} import scala.collection.JavaConverters._ import scala.language.existentials +import py4j.Py4JException + import org.apache.spark.SparkException import org.apache.spark.api.java._ +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Duration, Interval, Time} +import org.apache.spark.streaming.{Duration, Interval, StreamingContext, Time} import org.apache.spark.streaming.api.java._ import org.apache.spark.streaming.dstream._ import org.apache.spark.util.Utils @@ -157,7 +160,7 @@ private[python] object PythonTransformFunctionSerializer { /** * Helper functions, which are called from Python via Py4J. */ -private[python] object PythonDStream { +private[streaming] object PythonDStream { /** * can not access PythonTransformFunctionSerializer.register() via Py4j @@ -184,6 +187,32 @@ private[python] object PythonDStream { rdds.asScala.foreach(queue.add) queue } + + /** + * Stop [[StreamingContext]] if the Python process crashes (E.g., OOM) in case the user cannot + * stop it in the Python side. + */ + def stopStreamingContextIfPythonProcessIsDead(e: Throwable): Unit = { + // These two special messages are from: + // scalastyle:off + // https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L218 + // https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L340 + // scalastyle:on + if (e.isInstanceOf[Py4JException] && + ("Cannot obtain a new communication channel" == e.getMessage || + "Error while obtaining a new communication channel" == e.getMessage)) { + // Start a new thread to stop StreamingContext to avoid deadlock. + new Thread("Stop-StreamingContext") with Logging { + setDaemon(true) + + override def run(): Unit = { + logError( + "Cannot connect to Python process. It's probably dead. Stopping StreamingContext.", e) + StreamingContext.getActive().foreach(_.stop(stopSparkContext = false)) + } + }.start() + } + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/47fc0b9f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 19c88f1..4489a53 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -22,6 +22,7 @@ import scala.util.{Failure, Success, Try} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time} +import org.apache.spark.streaming.api.python.PythonDStream import org.apache.spark.streaming.util.RecurringTimer import org.apache.spark.util.{Clock, EventLoop, ManualClock, Utils} @@ -252,6 +253,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) + PythonDStream.stopStreamingContextIfPythonProcessIsDead(e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) } http://git-wip-us.apache.org/repos/asf/spark/blob/47fc0b9f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 79d6254..f5ba5ed 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -27,6 +27,7 @@ import org.apache.commons.lang3.SerializationUtils import org.apache.spark.internal.Logging import org.apache.spark.rdd.{PairRDDFunctions, RDD} import org.apache.spark.streaming._ +import org.apache.spark.streaming.api.python.PythonDStream import org.apache.spark.streaming.ui.UIUtils import org.apache.spark.util.{EventLoop, ThreadUtils} @@ -210,6 +211,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { private def handleError(msg: String, e: Throwable) { logError(msg, e) ssc.waiter.notifyError(e) + PythonDStream.stopStreamingContextIfPythonProcessIsDead(e) } private class JobHandler(job: Job) extends Runnable with Logging { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org