Rename SparkActorSystem to IndestructibleActorSystem
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/5a864e3f Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/5a864e3f Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/5a864e3f Branch: refs/heads/master Commit: 5a864e3fce234d19e1b371d9bab40554293546bb Parents: f6c8c1c c9cd2af Author: Aaron Davidson <[email protected]> Authored: Fri Dec 6 00:16:40 2013 -0800 Committer: Aaron Davidson <[email protected]> Committed: Fri Dec 6 00:21:43 2013 -0800 ---------------------------------------------------------------------- .../executor/CoarseGrainedExecutorBackend.scala | 3 +- .../cluster/SimrSchedulerBackend.scala | 2 +- .../spark/storage/BlockObjectWriter.scala | 2 +- .../scala/org/apache/spark/util/AkkaUtils.scala | 15 ++++-- .../spark/util/IndestructibleActorSystem.scala | 55 +++++++++++++++++++ .../apache/spark/util/SparkActorSystem.scala | 56 -------------------- .../spark/deploy/yarn/WorkerLauncher.scala | 2 +- 7 files changed, 72 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a864e3f/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala ---------------------------------------------------------------------- diff --cc core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index dcb12be,406e015..debbdd4 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@@ -97,7 -97,8 +97,8 @@@ private[spark] object CoarseGrainedExec // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor // before getting started with all our system properties, etc - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0) + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0, - useSparkAS = true) ++ indestructible = true) // set it val sparkHostPort = hostname + ":" + boundPort System.setProperty("spark.hostPort", sparkHostPort) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a864e3f/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---------------------------------------------------------------------- diff --cc core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index b4451fc,b4451fc..df33f6b --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@@ -44,7 -44,7 +44,7 @@@ abstract class BlockObjectWriter(val bl * Flush the partial writes and commit them as a single atomic block. Return the * number of bytes written for this commit. */ -- def commit(): Long ++ def commit(): LongSpark /** * Reverts writes that haven't been flushed yet. Callers should invoke this function http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a864e3f/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala ---------------------------------------------------------------------- diff --cc core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 407e9ff,f3e2644..9f3f163 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@@ -17,7 -17,7 +17,7 @@@ package org.apache.spark.util --import akka.actor.{SparkActorSystem, ActorSystem, ExtendedActorSystem} ++import akka.actor.{IndestructibleActorSystem, ActorSystem, ExtendedActorSystem} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ import scala.concurrent.Await @@@ -34,8 -34,10 +34,13 @@@ private[spark] object AkkaUtils * * Note: the `name` parameter is important, as even if a client sends a message to right * host + port, if the system name is incorrect, Akka will drop the message. ++ * ++ * If indestructible is set to true, the Actor System will continue running in the event ++ * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]]. */ - def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = { - def createActorSystem(name: String, host: String, port: Int, - useSparkAS: Boolean = false): (ActorSystem, Int) = { ++ def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false) ++ : (ActorSystem, Int) = { + val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt @@@ -70,7 -72,12 +75,11 @@@ |akka.remote.log-remote-lifecycle-events = $lifecycleEvents """.stripMargin) - val actorSystem = SparkActorSystem(name, akkaConf) - val actorSystem = if (useSparkAS) { - SparkActorSystem(name, akkaConf) - } - else { ++ val actorSystem = if (indestructible) { ++ IndestructibleActorSystem(name, akkaConf) ++ } else { + ActorSystem(name, akkaConf) + } val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider val boundPort = provider.getDefaultAddress.port.get http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a864e3f/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala ---------------------------------------------------------------------- diff --cc core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala index 0000000,0000000..6951986 new file mode 100644 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala @@@ -1,0 -1,0 +1,55 @@@ ++/** ++ * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com> ++ */ ++ ++// Must be in akka.actor package as ActorSystemImpl is protected[akka]. ++package akka.actor ++ ++import scala.util.control.{ControlThrowable, NonFatal} ++ ++import com.typesafe.config.Config ++ ++/** ++ * An [[akka.actor.ActorSystem]] which refuses to shut down in the event of a fatal exception. ++ * This is necessary as Spark Executors are allowed to recover from fatal exceptions ++ * (see [[org.apache.spark.executor.Executor]]). ++ */ ++object IndestructibleActorSystem { ++ def apply(name: String, config: Config): ActorSystem = ++ apply(name, config, ActorSystem.findClassLoader()) ++ ++ def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem = ++ new IndestructibleActorSystemImpl(name, config, classLoader).start() ++} ++ ++private[akka] class IndestructibleActorSystemImpl( ++ override val name: String, ++ applicationConfig: Config, ++ classLoader: ClassLoader) ++ extends ActorSystemImpl(name, applicationConfig, classLoader) { ++ ++ protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = { ++ val fallbackHandler = super.uncaughtExceptionHandler ++ ++ new Thread.UncaughtExceptionHandler() { ++ def uncaughtException(thread: Thread, cause: Throwable): Unit = { ++ if (isFatalError(cause) && !settings.JvmExitOnFatalError) { ++ log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " + ++ "ActorSystem [{}] tolerating and continuing.... ", thread.getName, name) ++ //shutdown() //TODO make it configurable ++ } else { ++ fallbackHandler.uncaughtException(thread, cause) ++ } ++ } ++ } ++ } ++ ++ def isFatalError(e: Throwable): Boolean = { ++ e match { ++ case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable => ++ false ++ case _ => ++ true ++ } ++ } ++} http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a864e3f/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala ---------------------------------------------------------------------- diff --cc core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala index a679fd6,d329063..0000000 deleted file mode 100644,100644 --- a/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala +++ /dev/null @@@ -1,56 -1,56 +1,0 @@@ --/** -- * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com> -- */ -- --// Must be in akka.actor package as ActorSystemImpl is protected[akka]. --package akka.actor -- --import scala.util.control.{ControlThrowable, NonFatal} -- --import com.typesafe.config.Config -- --/** -- * An ActorSystem specific to Spark. Based off of [[akka.actor.ActorSystem]]. -- * The only change from the default system is that we do not shut down the ActorSystem -- * in the event of a fatal exception. This is necessary as Spark is allowed to recover -- * from fatal exceptions (see [[org.apache.spark.executor.Executor]]). -- */ --object SparkActorSystem { -- def apply(name: String, config: Config): ActorSystem = -- apply(name, config, ActorSystem.findClassLoader()) -- -- def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem = -- new SparkActorSystemImpl(name, config, classLoader).start() --} -- --private[akka] class SparkActorSystemImpl( -- override val name: String, -- applicationConfig: Config, -- classLoader: ClassLoader) -- extends ActorSystemImpl(name, applicationConfig, classLoader) { -- -- protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = { -- val fallbackHandler = super.uncaughtExceptionHandler -- -- new Thread.UncaughtExceptionHandler() { -- def uncaughtException(thread: Thread, cause: Throwable): Unit = { -- if (isFatalError(cause) && !settings.JvmExitOnFatalError) { -- log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " + - "ActorSystem tolerating and continuing.... [{}]", thread.getName, name) - //shutdown() //TODO make it configurable - } else { - fallbackHandler.uncaughtException(thread, cause) - } - } - } - } - - def isFatalError(e: Throwable): Boolean = { - e match { - case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable => - false - case _ => - true - } - } - } - "ActorSystem [{}] tolerating and continuing.... ", thread.getName, name) - //shutdown() //TODO make it configurable - } else { - fallbackHandler.uncaughtException(thread, cause) - } - } - } - } - - def isFatalError(e: Throwable): Boolean = { - e match { - case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable => - false - case _ => - true - } - } -}
