Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 58a1a63e6 -> 5498ae205


[GEARPUMP-283] Return app exception to client

Author: manuzhang <[email protected]>

Closes #161 from manuzhang/GEARPUMP-283.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/5498ae20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/5498ae20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/5498ae20

Branch: refs/heads/master
Commit: 5498ae2058ee0faf98b4107bf2e73e927e80514a
Parents: 58a1a63
Author: manuzhang <[email protected]>
Authored: Tue Feb 28 11:00:49 2017 +0800
Committer: manuzhang <[email protected]>
Committed: Tue Feb 28 11:01:09 2017 +0800

----------------------------------------------------------------------
 .../apache/gearpump/streaming/ClusterMessage.scala |  3 ++-
 .../gearpump/streaming/appmaster/AppMaster.scala   | 17 ++++++++++-------
 .../gearpump/streaming/appmaster/TaskManager.scala |  4 ++--
 .../gearpump/streaming/executor/Executor.scala     |  2 +-
 4 files changed, 15 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5498ae20/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
index 4b801a2..8a76916 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
@@ -77,7 +77,8 @@ object ExecutorToAppMaster {
   case class RegisterTask(taskId: TaskId, executorId: Int, task: HostPort)
   case class UnRegisterTask(taskId: TaskId, executorId: Int)
 
-  case class MessageLoss(executorId: Int, taskId: TaskId, cause: String)
+  case class MessageLoss(executorId: Int, taskId: TaskId,
+      cause: String, ex: Option[Throwable] = None)
 }
 
 object AppMasterToMaster {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5498ae20/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
index 5ace1b2..15df0b3 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
@@ -82,7 +82,8 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
   private var taskManager: Option[ActorRef] = None
   private var clockService: Option[ActorRef] = None
   private val systemConfig = context.system.settings.config
-  private var lastFailure = LastFailure(0L, null)
+  // TODO: Consolidate failure and exception into one which requires 
refactoring of MessageLoss
+  private var lastFailure: (LastFailure, Option[Throwable]) = (LastFailure(0L, 
null), None)
 
   private val appMasterBrief = ExecutorBrief(APPMASTER_DEFAULT_EXECUTOR_ID,
     self.path.toString,
@@ -160,8 +161,8 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
       } else {
         LOG.error(s"replay for invalid appId ${replay.appId}")
       }
-    case messageLoss: MessageLoss =>
-      lastFailure = LastFailure(System.currentTimeMillis(), messageLoss.cause)
+    case messageLoss@MessageLoss(_, _, cause, ex) =>
+      lastFailure = LastFailure(System.currentTimeMillis(), cause) -> ex
       taskManager.foreach(_ forward messageLoss)
     case lookupTask: LookupTaskActorRef =>
       taskManager.foreach(_ forward lookupTask)
@@ -259,7 +260,7 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
       dagManager forward replaceDAG
     case GetLastFailure(id) =>
       if (id == appId) {
-        sender ! lastFailure
+        sender ! lastFailure._1
       } else {
         LOG.error(s"GetLastFailure for invalid appId $id")
       }
@@ -306,15 +307,17 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
     case FailedToRecover(errorMsg) =>
       if (context.children.toList.contains(sender())) {
         LOG.error(errorMsg)
-        val failed = ApplicationStatusChanged(appId, ApplicationStatus.FAILED, 
lastFailure.time,
-          new Exception(lastFailure.error))
+        val (failure, exception) = lastFailure
+        val failed = ApplicationStatusChanged(appId, ApplicationStatus.FAILED, 
failure.time,
+          exception.getOrElse(new Exception(failure.error)))
         masterProxy ! failed
       }
     case AllocateResourceTimeOut =>
       val errorMsg = s"Failed to allocate resource in time, shutdown 
application $appId"
       LOG.error(errorMsg)
+      val (failure, exception) = lastFailure
       val failed = ApplicationStatusChanged(appId, ApplicationStatus.FAILED,
-        System.currentTimeMillis(), new Exception(lastFailure.error))
+        System.currentTimeMillis(), exception.getOrElse(new 
Exception(failure.error)))
       masterProxy ! failed
       context.stop(self)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5498ae20/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
index 81ed79a..51c4de9 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
@@ -132,7 +132,7 @@ private[appmaster] class TaskManager(
           self ! executorStopped
           context.become(recovery(recoverState))
         }
-      case MessageLoss(executorId, taskId, cause) =>
+      case MessageLoss(executorId, taskId, _, _) =>
         if (state.taskRegistry.isTaskRegisteredForExecutor(executorId) &&
           appRestartPolicy.allowRestart) {
           context.become(recovery(recoverState))
@@ -217,7 +217,7 @@ private[appmaster] class TaskManager(
     val onMessageLoss: Receive = {
       case ExecutorStopped(executorId) =>
         context.become(recovery(recoverState))
-      case MessageLoss(executorId, taskId, cause) =>
+      case MessageLoss(executorId, taskId, cause, _) =>
         if (state.taskRegistry.isTaskRegisteredForExecutor(executorId) &&
           appRestartPolicy.allowRestart) {
           context.become(recovery(recoverState))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5498ae20/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala
index 56bf61d..bfc205a 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala
@@ -124,7 +124,7 @@ class Executor(executorContext: ExecutorContext, userConf : 
UserConfig, launcher
           s" MessageLoss, so that the system will replay all lost message"
         LOG.error(errorMsg, ex)
         val detailErrorMsg = errorMsg + "\n" + ExceptionUtils.getStackTrace(ex)
-        taskId.foreach(appMaster ! MessageLoss(executorId, _, detailErrorMsg))
+        taskId.foreach(appMaster ! MessageLoss(executorId, _, detailErrorMsg, 
Some(ex)))
         Resume
     }
 

Reply via email to