Made akka capable of tolerating fatal exceptions and moving on.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/5b11028a Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/5b11028a Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/5b11028a Branch: refs/heads/master Commit: 5b11028a0479623f41e95a41825a9bdfc944b323 Parents: 5618af6 Author: Prashant Sharma <[email protected]> Authored: Mon Dec 2 10:41:26 2013 +0530 Committer: Prashant Sharma <[email protected]> Committed: Mon Dec 2 10:47:39 2013 +0530 ---------------------------------------------------------------------- .../scala/org/apache/spark/util/AkkaUtils.scala | 4 +- .../apache/spark/util/SparkActorSystem.scala | 112 +++++++++++++++++++ 2 files changed, 114 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5b11028a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 5df8213..407e9ff 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.util -import akka.actor.{ActorSystem, ExtendedActorSystem} +import akka.actor.{SparkActorSystem, ActorSystem, ExtendedActorSystem} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ import scala.concurrent.Await @@ -70,7 +70,7 @@ private[spark] object AkkaUtils { |akka.remote.log-remote-lifecycle-events = $lifecycleEvents """.stripMargin) - val actorSystem = ActorSystem(name, akkaConf) + val actorSystem = SparkActorSystem(name, akkaConf) val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider val boundPort = provider.getDefaultAddress.port.get http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5b11028a/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala b/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala new file mode 100644 index 0000000..461e7ab --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala @@ -0,0 +1,112 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com> + */ + +package akka.actor + +import com.typesafe.config.Config +import akka.util._ +import scala.util.control.{NonFatal, ControlThrowable} + +/** + * An actorSystem specific to spark. It has an additional feature of letting spark tolerate + * fatal exceptions. + */ +object SparkActorSystem { + + def apply(name: String, config: Config): ActorSystem = apply(name, config, findClassLoader()) + + def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem = + new SparkActorSystemImpl(name, config, classLoader).start() + + /** + * INTERNAL API + */ + private[akka] def findClassLoader(): ClassLoader = { + def findCaller(get: Int â Class[_]): ClassLoader = + Iterator.from(2 /*is the magic number, promise*/).map(get) dropWhile { + c â + c != null && + (c.getName.startsWith("akka.actor.ActorSystem") || + c.getName.startsWith("scala.Option") || + c.getName.startsWith("scala.collection.Iterator") || + c.getName.startsWith("akka.util.Reflect")) + } next() match { + case null â getClass.getClassLoader + case c â c.getClassLoader + } + + Option(Thread.currentThread.getContextClassLoader) orElse + (Reflect.getCallerClass map findCaller) getOrElse + getClass.getClassLoader + } +} + +private[akka] class SparkActorSystemImpl(override val name: String, + applicationConfig: Config, + classLoader: ClassLoader) + extends ActorSystemImpl(name, applicationConfig, classLoader) { + + protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = + new Thread.UncaughtExceptionHandler() { + def uncaughtException(thread: Thread, cause: Throwable): Unit = { + cause match { + case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable + â log.error(cause, "Uncaught error from thread [{}]", thread.getName) + case _ â + if (settings.JvmExitOnFatalError) { + try { + log.error(cause, "Uncaught error from thread [{}] shutting down JVM since " + + "'akka.jvm-exit-on-fatal-error' is enabled", thread.getName) + import System.err + err.print("Uncaught error from thread [") + err.print(thread.getName) + err.print("] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for " + + "ActorSystem[") + err.print(name) + err.println("]") + cause.printStackTrace(System.err) + System.err.flush() + } finally { + System.exit(-1) + } + } else { + log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " + + "ActorSystem tolerating and continuing.... [{}]", thread.getName, name) + //shutdown() //TODO make it configurable + if (thread.isAlive) log.error("Thread is still alive") + else { + log.error("Thread is dead") + } + } + } + } + } + + override def stop(actor: ActorRef): Unit = { + val path = actor.path + val guard = guardian.path + val sys = systemGuardian.path + path.parent match { + case `guard` â guardian ! StopChild(actor) + case `sys` â systemGuardian ! StopChild(actor) + case _ â actor.asInstanceOf[InternalActorRef].stop() + } + } + + + override def /(actorName: String): ActorPath = guardian.path / actorName + + override def /(path: Iterable[String]): ActorPath = guardian.path / path + + private lazy val _start: this.type = { + // the provider is expected to start default loggers, LocalActorRefProvider does this + provider.init(this) + this + } + + override def start(): this.type = _start + + override def toString: String = lookupRoot.path.root.address.toString + +}
