Repository: spark
Updated Branches:
  refs/heads/master af15c82bf -> 3af1f3864


SPARK-1772 Stop catching Throwable, let Executors die

The main issue this patch fixes is 
[SPARK-1772](https://issues.apache.org/jira/browse/SPARK-1772), in which 
Executors may not die when fatal exceptions (e.g., OOM) are thrown. This patch 
causes Executors to delegate to the ExecutorUncaughtExceptionHandler when a 
fatal exception is thrown.

This patch also continues the fight in the neverending war against `case t: 
Throwable =>`, by only catching Exceptions in many places, and adding a wrapper 
for Threads and Runnables to make sure any uncaught exceptions are at least 
printed to the logs.

It also turns out that it is unlikely that the IndestructibleActorSystem 
actually works, given testing 
([here](https://gist.github.com/aarondav/ca1f0cdcd50727f89c0d)). The 
uncaughtExceptionHandler is not called from the places that we expected it 
would be.
[SPARK-1620](https://issues.apache.org/jira/browse/SPARK-1620) deals with part 
of this issue, but refactoring our Actor Systems to ensure that exceptions are 
dealt with properly is a much bigger change, outside the scope of this PR.

Author: Aaron Davidson <aa...@databricks.com>

Closes #715 from aarondav/throwable and squashes the following commits:

f9b9bfe [Aaron Davidson] Remove other redundant 'throw e'
e937a0a [Aaron Davidson] Address Prashant and Matei's comments
1867867 [Aaron Davidson] [RFC] SPARK-1772 Stop catching Throwable, let 
Executors die


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3af1f386
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3af1f386
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3af1f386

Branch: refs/heads/master
Commit: 3af1f386439cdddd42e545ad63d089f4dfdf9f8a
Parents: af15c82
Author: Aaron Davidson <aa...@databricks.com>
Authored: Mon May 12 11:08:52 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Mon May 12 11:08:52 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/ContextCleaner.scala | 11 ++--
 .../scala/org/apache/spark/SparkContext.scala   | 12 ++--
 .../org/apache/spark/api/python/PythonRDD.scala |  3 +-
 .../spark/api/python/PythonWorkerFactory.scala  |  1 -
 .../scala/org/apache/spark/deploy/Client.scala  |  2 +-
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  2 +-
 .../spark/deploy/history/HistoryServer.scala    |  8 +--
 .../org/apache/spark/deploy/master/Master.scala |  4 +-
 .../spark/deploy/worker/DriverWrapper.scala     |  2 +-
 .../executor/CoarseGrainedExecutorBackend.scala |  2 +-
 .../org/apache/spark/executor/Executor.scala    | 37 ++++-------
 .../ExecutorUncaughtExceptionHandler.scala      | 53 +++++++++++++++
 .../spark/scheduler/EventLoggingListener.scala  |  4 +-
 .../spark/scheduler/TaskResultGetter.scala      |  8 +--
 .../apache/spark/storage/DiskBlockManager.scala |  6 +-
 .../spark/storage/TachyonBlockManager.scala     |  7 +-
 .../scala/org/apache/spark/util/AkkaUtils.scala | 11 +---
 .../spark/util/IndestructibleActorSystem.scala  | 68 --------------------
 .../scala/org/apache/spark/util/Utils.scala     | 26 +++++++-
 19 files changed, 127 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3af1f386/core/src/main/scala/org/apache/spark/ContextCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala 
b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 54e08d7..e2d2250 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, 
SynchronizedBuffer}
 
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
 
 /**
  * Classes that represent cleaning tasks.
@@ -110,7 +111,7 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
   }
 
   /** Keep cleaning RDD, shuffle, and broadcast state. */
-  private def keepCleaning() {
+  private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
     while (!stopped) {
       try {
         val reference = 
Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
@@ -128,7 +129,7 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
           }
         }
       } catch {
-        case t: Throwable => logError("Error in cleaning thread", t)
+        case e: Exception => logError("Error in cleaning thread", e)
       }
     }
   }
@@ -141,7 +142,7 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
       listeners.foreach(_.rddCleaned(rddId))
       logInfo("Cleaned RDD " + rddId)
     } catch {
-      case t: Throwable => logError("Error cleaning RDD " + rddId, t)
+      case e: Exception => logError("Error cleaning RDD " + rddId, e)
     }
   }
 
@@ -154,7 +155,7 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
       listeners.foreach(_.shuffleCleaned(shuffleId))
       logInfo("Cleaned shuffle " + shuffleId)
     } catch {
-      case t: Throwable => logError("Error cleaning shuffle " + shuffleId, t)
+      case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
     }
   }
 
@@ -166,7 +167,7 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
       listeners.foreach(_.broadcastCleaned(broadcastId))
       logInfo("Cleaned broadcast " + broadcastId)
     } catch {
-      case t: Throwable => logError("Error cleaning broadcast " + broadcastId, 
t)
+      case e: Exception => logError("Error cleaning broadcast " + broadcastId, 
e)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3af1f386/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 71bab29..e6121a7 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1494,8 +1494,8 @@ object SparkContext extends Logging {
         } catch {
           // TODO: Enumerate the exact reasons why it can fail
           // But irrespective of it, it means we cannot proceed !
-          case th: Throwable => {
-            throw new SparkException("YARN mode not available ?", th)
+          case e: Exception => {
+            throw new SparkException("YARN mode not available ?", e)
           }
         }
         val backend = new CoarseGrainedSchedulerBackend(scheduler, 
sc.env.actorSystem)
@@ -1510,8 +1510,8 @@ object SparkContext extends Logging {
           cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
 
         } catch {
-          case th: Throwable => {
-            throw new SparkException("YARN mode not available ?", th)
+          case e: Exception => {
+            throw new SparkException("YARN mode not available ?", e)
           }
         }
 
@@ -1521,8 +1521,8 @@ object SparkContext extends Logging {
           val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], 
classOf[SparkContext])
           cons.newInstance(scheduler, 
sc).asInstanceOf[CoarseGrainedSchedulerBackend]
         } catch {
-          case th: Throwable => {
-            throw new SparkException("YARN mode not available ?", th)
+          case e: Exception => {
+            throw new SparkException("YARN mode not available ?", e)
           }
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3af1f386/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 2971c27..57b28b9 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -171,7 +171,7 @@ private[spark] class PythonRDD[T: ClassTag](
       this.interrupt()
     }
 
-    override def run() {
+    override def run(): Unit = Utils.logUncaughtExceptions {
       try {
         SparkEnv.set(env)
         val stream = new BufferedOutputStream(worker.getOutputStream, 
bufferSize)
@@ -282,7 +282,6 @@ private[spark] object PythonRDD {
       }
     } catch {
       case eof: EOFException => {}
-      case e: Throwable => throw e
     }
     JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3af1f386/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 002f2ac..759cbe2 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -71,7 +71,6 @@ private[spark] class PythonWorkerFactory(pythonExec: String, 
envVars: Map[String
           stopDaemon()
           startDaemon()
           new Socket(daemonHost, daemonPort)
-        case e: Throwable => throw e
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3af1f386/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala 
b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 7ead117..aeb159a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -157,7 +157,7 @@ object Client {
     // TODO: See if we can initialize akka so return messages are sent back 
using the same TCP
     //       flow. Else, this (sadly) requires the DriverClient be routable 
from the Master.
     val (actorSystem, _) = AkkaUtils.createActorSystem(
-      "driverClient", Utils.localHostName(), 0, false, conf, new 
SecurityManager(conf))
+      "driverClient", Utils.localHostName(), 0, conf, new 
SecurityManager(conf))
 
     actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3af1f386/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index e2df1b8..148115d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -103,7 +103,7 @@ object SparkHadoopUtil {
           .newInstance()
           .asInstanceOf[SparkHadoopUtil]
       } catch {
-       case th: Throwable => throw new SparkException("Unable to load YARN 
support", th)
+       case e: Exception => throw new SparkException("Unable to load YARN 
support", e)
       }
     } else {
       new SparkHadoopUtil

http://git-wip-us.apache.org/repos/asf/spark/blob/3af1f386/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 1238bbf..a9c11dc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -70,7 +70,7 @@ class HistoryServer(
    * TODO: Add a mechanism to update manually.
    */
   private val logCheckingThread = new Thread {
-    override def run() {
+    override def run(): Unit = Utils.logUncaughtExceptions {
       while (!stopped) {
         val now = System.currentTimeMillis
         if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
@@ -154,7 +154,7 @@ class HistoryServer(
         numCompletedApplications = logInfos.size
 
       } catch {
-        case t: Throwable => logError("Exception in checking for event log 
updates", t)
+        case e: Exception => logError("Exception in checking for event log 
updates", e)
       }
     } else {
       logWarning("Attempted to check for event log updates before binding the 
server.")
@@ -231,8 +231,8 @@ class HistoryServer(
         dir.getModificationTime
       }
     } catch {
-      case t: Throwable =>
-        logError("Exception in accessing modification time of 
%s".format(dir.getPath), t)
+      case e: Exception =>
+        logError("Exception in accessing modification time of 
%s".format(dir.getPath), e)
         -1L
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3af1f386/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index f254f55..c6dec30 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -684,8 +684,8 @@ private[spark] class Master(
         webUi.attachSparkUI(ui)
         return true
       } catch {
-        case t: Throwable =>
-          logError("Exception in replaying log for application %s 
(%s)".format(appName, app.id), t)
+        case e: Exception =>
+          logError("Exception in replaying log for application %s 
(%s)".format(appName, app.id), e)
       }
     } else {
       logWarning("Application %s (%s) has no valid logs: %s".format(appName, 
app.id, eventLogDir))

http://git-wip-us.apache.org/repos/asf/spark/blob/3af1f386/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index be15138..05e242e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -31,7 +31,7 @@ object DriverWrapper {
       case workerUrl :: mainClass :: extraArgs =>
         val conf = new SparkConf()
         val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
-          Utils.localHostName(), 0, false, conf, new SecurityManager(conf))
+          Utils.localHostName(), 0, conf, new SecurityManager(conf))
         actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = 
"workerWatcher")
 
         // Delegate to supplied main class

http://git-wip-us.apache.org/repos/asf/spark/blob/3af1f386/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index e912ae8..84aec65 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -105,7 +105,7 @@ private[spark] object CoarseGrainedExecutorBackend {
         // Create a new ActorSystem to run the backend, because we can't 
create a
         // SparkEnv / Executor before getting started with all our system 
properties, etc
         val (actorSystem, boundPort) = 
AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
-          indestructible = true, conf = conf, new SecurityManager(conf))
+          conf, new SecurityManager(conf))
         // set it
         val sparkHostPort = hostname + ":" + boundPort
         actorSystem.actorOf(

http://git-wip-us.apache.org/repos/asf/spark/blob/3af1f386/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 98e7e0b..baee7a2 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -74,28 +74,7 @@ private[spark] class Executor(
     // Setup an uncaught exception handler for non-local mode.
     // Make any thread terminations due to uncaught exceptions kill the entire
     // executor process to avoid surprising stalls.
-    Thread.setDefaultUncaughtExceptionHandler(
-      new Thread.UncaughtExceptionHandler {
-        override def uncaughtException(thread: Thread, exception: Throwable) {
-          try {
-            logError("Uncaught exception in thread " + thread, exception)
-
-            // We may have been called from a shutdown hook. If so, we must 
not call System.exit().
-            // (If we do, we will deadlock.)
-            if (!Utils.inShutdown()) {
-              if (exception.isInstanceOf[OutOfMemoryError]) {
-                System.exit(ExecutorExitCode.OOM)
-              } else {
-                System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
-              }
-            }
-          } catch {
-            case oom: OutOfMemoryError => 
Runtime.getRuntime.halt(ExecutorExitCode.OOM)
-            case t: Throwable => 
Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
-          }
-        }
-      }
-    )
+    Thread.setDefaultUncaughtExceptionHandler(ExecutorUncaughtExceptionHandler)
   }
 
   val executorSource = new ExecutorSource(this, executorId)
@@ -259,6 +238,11 @@ private[spark] class Executor(
         }
 
         case t: Throwable => {
+          // Attempt to exit cleanly by informing the driver of our failure.
+          // If anything goes wrong (or this was a fatal exception), we will 
delegate to
+          // the default uncaught exception handler, which will terminate the 
Executor.
+          logError("Exception in task ID " + taskId, t)
+
           val serviceTime = System.currentTimeMillis() - taskStart
           val metrics = attemptedTask.flatMap(t => t.metrics)
           for (m <- metrics) {
@@ -268,10 +252,11 @@ private[spark] class Executor(
           val reason = ExceptionFailure(t.getClass.getName, t.toString, 
t.getStackTrace, metrics)
           execBackend.statusUpdate(taskId, TaskState.FAILED, 
ser.serialize(reason))
 
-          // TODO: Should we exit the whole executor here? On the one hand, 
the failed task may
-          // have left some weird state around depending on when the exception 
was thrown, but on
-          // the other hand, maybe we could detect that when future tasks fail 
and exit then.
-          logError("Exception in task ID " + taskId, t)
+          // Don't forcibly exit unless the exception was inherently fatal, to 
avoid
+          // stopping other tasks unnecessarily.
+          if (Utils.isFatalError(t)) {
+            ExecutorUncaughtExceptionHandler.uncaughtException(t)
+          }
         }
       } finally {
         // TODO: Unregister shuffle memory only for ResultTask

http://git-wip-us.apache.org/repos/asf/spark/blob/3af1f386/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala
 
b/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala
new file mode 100644
index 0000000..b0e984c
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * The default uncaught exception handler for Executors terminates the whole 
process, to avoid
+ * getting into a bad state indefinitely. Since Executors are relatively 
lightweight, it's better
+ * to fail fast when things go wrong.
+ */
+private[spark] object ExecutorUncaughtExceptionHandler
+  extends Thread.UncaughtExceptionHandler with Logging {
+
+  override def uncaughtException(thread: Thread, exception: Throwable) {
+    try {
+      logError("Uncaught exception in thread " + thread, exception)
+
+      // We may have been called from a shutdown hook. If so, we must not call 
System.exit().
+      // (If we do, we will deadlock.)
+      if (!Utils.inShutdown()) {
+        if (exception.isInstanceOf[OutOfMemoryError]) {
+          System.exit(ExecutorExitCode.OOM)
+        } else {
+          System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
+        }
+      }
+    } catch {
+      case oom: OutOfMemoryError => 
Runtime.getRuntime.halt(ExecutorExitCode.OOM)
+      case t: Throwable => 
Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
+    }
+  }
+
+  def uncaughtException(exception: Throwable) {
+    uncaughtException(Thread.currentThread(), exception)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3af1f386/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 7968a06..a90b0d4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -206,8 +206,8 @@ private[spark] object EventLoggingListener extends Logging {
         applicationComplete = filePaths.exists { path => 
isApplicationCompleteFile(path.getName) }
       )
     } catch {
-      case t: Throwable =>
-        logError("Exception in parsing logging info from directory 
%s".format(logDir), t)
+      case e: Exception =>
+        logError("Exception in parsing logging info from directory 
%s".format(logDir), e)
       EventLoggingInfo.empty
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3af1f386/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index c9ad2b1..99d305b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -43,7 +43,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, 
scheduler: TaskSchedul
   def enqueueSuccessfulTask(
     taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
     getTaskResultExecutor.execute(new Runnable {
-      override def run() {
+      override def run(): Unit = Utils.logUncaughtExceptions {
         try {
           val result = 
serializer.get().deserialize[TaskResult[_]](serializedData) match {
             case directResult: DirectTaskResult[_] => directResult
@@ -70,7 +70,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, 
scheduler: TaskSchedul
           case cnf: ClassNotFoundException =>
             val loader = Thread.currentThread.getContextClassLoader
             taskSetManager.abort("ClassNotFound with classloader: " + loader)
-          case ex: Throwable =>
+          case ex: Exception =>
             taskSetManager.abort("Exception while deserializing and fetching 
task: %s".format(ex))
         }
       }
@@ -81,7 +81,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, 
scheduler: TaskSchedul
     serializedData: ByteBuffer) {
     var reason : TaskEndReason = UnknownReason
     getTaskResultExecutor.execute(new Runnable {
-      override def run() {
+      override def run(): Unit = Utils.logUncaughtExceptions {
         try {
           if (serializedData != null && serializedData.limit() > 0) {
             reason = serializer.get().deserialize[TaskEndReason](
@@ -94,7 +94,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, 
scheduler: TaskSchedul
             val loader = Utils.getContextOrSparkClassLoader
             logError(
               "Could not deserialize TaskEndReason: ClassNotFound with 
classloader " + loader)
-          case ex: Throwable => {}
+          case ex: Exception => {}
         }
         scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/3af1f386/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index cf6ef00..3a7243a 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -148,7 +148,7 @@ private[spark] class DiskBlockManager(shuffleManager: 
ShuffleBlockManager, rootD
   private def addShutdownHook() {
     localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir))
     Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
-      override def run() {
+      override def run(): Unit = Utils.logUncaughtExceptions {
         logDebug("Shutdown hook called")
         DiskBlockManager.this.stop()
       }
@@ -162,8 +162,8 @@ private[spark] class DiskBlockManager(shuffleManager: 
ShuffleBlockManager, rootD
         try {
           if (!Utils.hasRootAsShutdownDeleteDir(localDir)) 
Utils.deleteRecursively(localDir)
         } catch {
-          case t: Throwable =>
-            logError("Exception while deleting local spark dir: " + localDir, 
t)
+          case e: Exception =>
+            logError("Exception while deleting local spark dir: " + localDir, 
e)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/3af1f386/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index b0b9674..a6cbe3a 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -25,7 +25,6 @@ import tachyon.client.TachyonFile
 
 import org.apache.spark.Logging
 import org.apache.spark.executor.ExecutorExitCode
-import org.apache.spark.network.netty.ShuffleSender
 import org.apache.spark.util.Utils
 
 
@@ -137,7 +136,7 @@ private[spark] class TachyonBlockManager(
   private def addShutdownHook() {
     tachyonDirs.foreach(tachyonDir => 
Utils.registerShutdownDeleteDir(tachyonDir))
     Runtime.getRuntime.addShutdownHook(new Thread("delete Spark tachyon dirs") 
{
-      override def run() {
+      override def run(): Unit = Utils.logUncaughtExceptions {
         logDebug("Shutdown hook called")
         tachyonDirs.foreach { tachyonDir =>
           try {
@@ -145,8 +144,8 @@ private[spark] class TachyonBlockManager(
               Utils.deleteRecursively(tachyonDir, client)
             }
           } catch {
-            case t: Throwable =>
-              logError("Exception while deleting tachyon spark dir: " + 
tachyonDir, t)
+            case e: Exception =>
+              logError("Exception while deleting tachyon spark dir: " + 
tachyonDir, e)
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/3af1f386/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala 
b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 8afe09a..a8d12bb 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -20,7 +20,7 @@ package org.apache.spark.util
 import scala.collection.JavaConversions.mapAsJavaMap
 import scala.concurrent.duration.{Duration, FiniteDuration}
 
-import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
+import akka.actor.{ActorSystem, ExtendedActorSystem}
 import com.typesafe.config.ConfigFactory
 import org.apache.log4j.{Level, Logger}
 
@@ -41,7 +41,7 @@ private[spark] object AkkaUtils extends Logging {
    * If indestructible is set to true, the Actor System will continue running 
in the event
    * of a fatal exception. This is used by 
[[org.apache.spark.executor.Executor]].
    */
-  def createActorSystem(name: String, host: String, port: Int, indestructible: 
Boolean = false,
+  def createActorSystem(name: String, host: String, port: Int,
     conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = {
 
     val akkaThreads   = conf.getInt("spark.akka.threads", 4)
@@ -101,12 +101,7 @@ private[spark] object AkkaUtils extends Logging {
       |akka.log-dead-letters-during-shutdown = $lifecycleEvents
       """.stripMargin))
 
-    val actorSystem = if (indestructible) {
-      IndestructibleActorSystem(name, akkaConf)
-    } else {
-      ActorSystem(name, akkaConf)
-    }
-
+    val actorSystem = ActorSystem(name, akkaConf)
     val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
     val boundPort = provider.getDefaultAddress.port.get
     (actorSystem, boundPort)

http://git-wip-us.apache.org/repos/asf/spark/blob/3af1f386/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala 
b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
deleted file mode 100644
index 4188a86..0000000
--- a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// Must be in akka.actor package as ActorSystemImpl is protected[akka].
-package akka.actor
-
-import scala.util.control.{ControlThrowable, NonFatal}
-
-import com.typesafe.config.Config
-
-/**
- * An akka.actor.ActorSystem which refuses to shut down in the event of a 
fatal exception
- * This is necessary as Spark Executors are allowed to recover from fatal 
exceptions
- * (see org.apache.spark.executor.Executor)
- */
-object IndestructibleActorSystem {
-  def apply(name: String, config: Config): ActorSystem =
-    apply(name, config, ActorSystem.findClassLoader())
-
-  def apply(name: String, config: Config, classLoader: ClassLoader): 
ActorSystem =
-    new IndestructibleActorSystemImpl(name, config, classLoader).start()
-}
-
-private[akka] class IndestructibleActorSystemImpl(
-    override val name: String,
-    applicationConfig: Config,
-    classLoader: ClassLoader)
-  extends ActorSystemImpl(name, applicationConfig, classLoader) {
-
-  protected override def uncaughtExceptionHandler: 
Thread.UncaughtExceptionHandler = {
-    val fallbackHandler = super.uncaughtExceptionHandler
-
-    new Thread.UncaughtExceptionHandler() {
-      def uncaughtException(thread: Thread, cause: Throwable): Unit = {
-        if (isFatalError(cause) && !settings.JvmExitOnFatalError) {
-          log.error(cause, "Uncaught fatal error from thread [{}] not shutting 
down " +
-            "ActorSystem [{}] tolerating and continuing.... ", thread.getName, 
name)
-          // shutdown()                 //TODO make it configurable
-        } else {
-          fallbackHandler.uncaughtException(thread, cause)
-        }
-      }
-    }
-  }
-
-  def isFatalError(e: Throwable): Boolean = {
-    e match {
-      case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: 
ControlThrowable =>
-        false
-      case _ =>
-        true
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/3af1f386/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 95777fb..8f7594a 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -29,6 +29,7 @@ import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
 import scala.reflect.ClassTag
 import scala.util.Try
+import scala.util.control.{ControlThrowable, NonFatal}
 
 import com.google.common.io.Files
 import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -41,7 +42,6 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, 
SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.serializer.{DeserializationStream, 
SerializationStream, SerializerInstance}
 
-
 /**
  * Various utility methods used by Spark.
  */
@@ -1125,4 +1125,28 @@ private[spark] object Utils extends Logging {
     }
   }
 
+  /** 
+   * Executes the given block, printing and re-throwing any uncaught 
exceptions.
+   * This is particularly useful for wrapping code that runs in a thread, to 
ensure
+   * that exceptions are printed, and to avoid having to catch Throwable.
+   */
+  def logUncaughtExceptions[T](f: => T): T = {
+    try {
+      f
+    } catch {
+      case t: Throwable =>
+        logError(s"Uncaught exception in thread 
${Thread.currentThread().getName}", t)
+        throw t
+    }
+  }
+
+  /** Returns true if the given exception was fatal. See docs for 
scala.util.control.NonFatal. */
+  def isFatalError(e: Throwable): Boolean = {
+    e match {
+      case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: 
ControlThrowable =>
+        false
+      case _ =>
+        true
+    }
+  }
 }

Reply via email to