Repository: flink
Updated Branches:
  refs/heads/master cbe3d9646 -> 8261ed543


[taskmanager] do not process message if not connected to job manager

This commit makes sure that task messages are only processed when
connected to a job manager. Otherwise, they are dropped. Before, the
message were processed in any case.

This closes #914.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d592ee65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d592ee65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d592ee65

Branch: refs/heads/master
Commit: d592ee65f040bb0fa737cd1ac7fffec39ed9068f
Parents: cbe3d96
Author: Maximilian Michels <[email protected]>
Authored: Wed Jul 15 12:16:24 2015 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Wed Jul 15 15:00:54 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/taskmanager/TaskManager.scala | 160 +++++++++----------
 1 file changed, 80 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d592ee65/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 44a0b04..1a35d01 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -307,99 +307,99 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
     if (!isConnected) {
       log.debug(s"Dropping message $message because the TaskManager is 
currently " +
         "not connected to a JobManager.")
-    }
+    } else {
+      // we order the messages by frequency, to make sure the code paths for 
matching
+      // are as short as possible
+      message match {
+
+        // tell the task about the availability of a new input partition
+        case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =>
+          updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
+
+        // tell the task about the availability of some new input partitions
+        case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) =>
+          updateTaskInputPartitions(executionID, partitionInfos)
+
+        // discards intermediate result partitions of a task execution on this 
TaskManager
+        case FailIntermediateResultPartitions(executionID) =>
+          log.info("Discarding the results produced by task execution " + 
executionID)
+          if (network.isAssociated) {
+            try {
+              
network.getPartitionManager.releasePartitionsProducedBy(executionID)
+            } catch {
+              case t: Throwable => killTaskManagerFatal(
+                "Fatal leak: Unable to release intermediate result partition 
data", t)
+            }
+          }
 
-    // we order the messages by frequency, to make sure the code paths for 
matching
-    // are as short as possible
-    message match {
+        // notifies the TaskManager that the state of a task has changed.
+        // the TaskManager informs the JobManager and cleans up in case the 
transition
+        // was into a terminal state, or in case the JobManager cannot be 
informed of the
+        // state transition
 
-      // tell the task about the availability of a new input partition
-      case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) 
=>
-        updateTaskInputPartitions(executionID, List((resultID, partitionInfo)))
+        case updateMsg@UpdateTaskExecutionState(taskExecutionState: 
TaskExecutionState) =>
 
-      // tell the task about the availability of some new input partitions
-      case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) =>
-        updateTaskInputPartitions(executionID, partitionInfos)
+          // we receive these from our tasks and forward them to the JobManager
+          currentJobManager foreach {
+            jobManager => {
+              val futureResponse = (jobManager ? updateMsg)(askTimeout)
 
-      // discards intermediate result partitions of a task execution on this 
TaskManager
-      case FailIntermediateResultPartitions(executionID) =>
-        log.info("Discarding the results produced by task execution " + 
executionID)
-        if (network.isAssociated) {
-          try {
-            
network.getPartitionManager.releasePartitionsProducedBy(executionID)
-          } catch {
-            case t: Throwable => killTaskManagerFatal(
-                "Fatal leak: Unable to release intermediate result partition 
data", t)
-          }
-        }
+              val executionID = taskExecutionState.getID
 
-      // notifies the TaskManager that the state of a task has changed.
-      // the TaskManager informs the JobManager and cleans up in case the 
transition
-      // was into a terminal state, or in case the JobManager cannot be 
informed of the
-      // state transition
+              futureResponse.mapTo[Boolean].onComplete {
+                // IMPORTANT: In the future callback, we cannot directly 
modify state
+                //            but only send messages to the TaskManager to do 
those changes
+                case Success(result) =>
+                  if (!result) {
+                    self ! FailTask(executionID,
+                      new Exception("Task has been cancelled on the 
JobManager."))
+                  }
 
-      case updateMsg @ UpdateTaskExecutionState(taskExecutionState: 
TaskExecutionState) =>
-        
-        // we receive these from our tasks and forward them to the JobManager
-        currentJobManager foreach {
-          jobManager => {
-            val futureResponse = (jobManager ? updateMsg)(askTimeout)
-
-            val executionID = taskExecutionState.getID
-
-            futureResponse.mapTo[Boolean].onComplete {
-              // IMPORTANT: In the future callback, we cannot directly modify 
state
-              //            but only send messages to the TaskManager to do 
those changes
-              case Success(result) =>
-                if (!result) {
-                  self ! FailTask(executionID,
-                    new Exception("Task has been cancelled on the 
JobManager."))
-                }
-                
-              case Failure(t) =>
-                self ! FailTask(executionID, new Exception(
-                  "Failed to send ExecutionStateChange notification to 
JobManager"))
-            }(context.dispatcher)
+                case Failure(t) =>
+                  self ! FailTask(executionID, new Exception(
+                    "Failed to send ExecutionStateChange notification to 
JobManager"))
+              }(context.dispatcher)
+            }
           }
-        }
 
-      // removes the task from the TaskManager and frees all its resources
-      case TaskInFinalState(executionID) =>
-        unregisterTaskAndNotifyFinalState(executionID)
+        // removes the task from the TaskManager and frees all its resources
+        case TaskInFinalState(executionID) =>
+          unregisterTaskAndNotifyFinalState(executionID)
 
-      // starts a new task on the TaskManager
-      case SubmitTask(tdd) =>
-        submitTask(tdd)
+        // starts a new task on the TaskManager
+        case SubmitTask(tdd) =>
+          submitTask(tdd)
 
-      // marks a task as failed for an external reason
-      // external reasons are reasons other than the task code itself throwing 
an exception
-      case FailTask(executionID, cause) =>
-        val task = runningTasks.get(executionID)
-        if (task != null) {
-          task.failExternally(cause)
-        } else {
-          log.debug(s"Cannot find task to fail for execution ${executionID})")
-        }
+        // marks a task as failed for an external reason
+        // external reasons are reasons other than the task code itself 
throwing an exception
+        case FailTask(executionID, cause) =>
+          val task = runningTasks.get(executionID)
+          if (task != null) {
+            task.failExternally(cause)
+          } else {
+            log.debug(s"Cannot find task to fail for execution 
${executionID})")
+          }
 
-      // cancels a task
-      case CancelTask(executionID) =>
-        val task = runningTasks.get(executionID)
-        if (task != null) {
-          task.cancelExecution()
-          sender ! new TaskOperationResult(executionID, true)
-        } else {
-          log.debug(s"Cannot find task to cancel for execution 
${executionID})")
-          sender ! new TaskOperationResult(executionID, false,
+        // cancels a task
+        case CancelTask(executionID) =>
+          val task = runningTasks.get(executionID)
+          if (task != null) {
+            task.cancelExecution()
+            sender ! new TaskOperationResult(executionID, true)
+          } else {
+            log.debug(s"Cannot find task to cancel for execution 
${executionID})")
+            sender ! new TaskOperationResult(executionID, false,
               "No task with that execution ID was found.")
-        }
+          }
 
-      case PartitionState(taskExecutionId, taskResultId, partitionId, state) =>
-        Option(runningTasks.get(taskExecutionId)) match {
-          case Some(task) =>
-            task.onPartitionStateUpdate(taskResultId, partitionId, state)
-          case None =>
-            log.debug(s"Cannot find task $taskExecutionId to respond with 
partition state.")
-        }
+        case PartitionState(taskExecutionId, taskResultId, partitionId, state) 
=>
+          Option(runningTasks.get(taskExecutionId)) match {
+            case Some(task) =>
+              task.onPartitionStateUpdate(taskResultId, partitionId, state)
+            case None =>
+              log.debug(s"Cannot find task $taskExecutionId to respond with 
partition state.")
+          }
+      }
     }
   }
 

Reply via email to