Repository: flink Updated Branches: refs/heads/master cbe3d9646 -> 8261ed543
[taskmanager] do not process message if not connected to job manager This commit makes sure that task messages are only processed when connected to a job manager. Otherwise, they are dropped. Before, the message were processed in any case. This closes #914. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d592ee65 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d592ee65 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d592ee65 Branch: refs/heads/master Commit: d592ee65f040bb0fa737cd1ac7fffec39ed9068f Parents: cbe3d96 Author: Maximilian Michels <[email protected]> Authored: Wed Jul 15 12:16:24 2015 +0200 Committer: Maximilian Michels <[email protected]> Committed: Wed Jul 15 15:00:54 2015 +0200 ---------------------------------------------------------------------- .../flink/runtime/taskmanager/TaskManager.scala | 160 +++++++++---------- 1 file changed, 80 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d592ee65/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 44a0b04..1a35d01 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -307,99 +307,99 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { if (!isConnected) { log.debug(s"Dropping message $message because the TaskManager is currently " + "not connected to a JobManager.") - } + } else { + // we order the messages by frequency, to make sure the code paths for matching + // are as short as possible + message match { + + // tell the task about the availability of a new input partition + case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) => + updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) + + // tell the task about the availability of some new input partitions + case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) => + updateTaskInputPartitions(executionID, partitionInfos) + + // discards intermediate result partitions of a task execution on this TaskManager + case FailIntermediateResultPartitions(executionID) => + log.info("Discarding the results produced by task execution " + executionID) + if (network.isAssociated) { + try { + network.getPartitionManager.releasePartitionsProducedBy(executionID) + } catch { + case t: Throwable => killTaskManagerFatal( + "Fatal leak: Unable to release intermediate result partition data", t) + } + } - // we order the messages by frequency, to make sure the code paths for matching - // are as short as possible - message match { + // notifies the TaskManager that the state of a task has changed. + // the TaskManager informs the JobManager and cleans up in case the transition + // was into a terminal state, or in case the JobManager cannot be informed of the + // state transition - // tell the task about the availability of a new input partition - case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) => - updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) + case updateMsg@UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) => - // tell the task about the availability of some new input partitions - case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) => - updateTaskInputPartitions(executionID, partitionInfos) + // we receive these from our tasks and forward them to the JobManager + currentJobManager foreach { + jobManager => { + val futureResponse = (jobManager ? updateMsg)(askTimeout) - // discards intermediate result partitions of a task execution on this TaskManager - case FailIntermediateResultPartitions(executionID) => - log.info("Discarding the results produced by task execution " + executionID) - if (network.isAssociated) { - try { - network.getPartitionManager.releasePartitionsProducedBy(executionID) - } catch { - case t: Throwable => killTaskManagerFatal( - "Fatal leak: Unable to release intermediate result partition data", t) - } - } + val executionID = taskExecutionState.getID - // notifies the TaskManager that the state of a task has changed. - // the TaskManager informs the JobManager and cleans up in case the transition - // was into a terminal state, or in case the JobManager cannot be informed of the - // state transition + futureResponse.mapTo[Boolean].onComplete { + // IMPORTANT: In the future callback, we cannot directly modify state + // but only send messages to the TaskManager to do those changes + case Success(result) => + if (!result) { + self ! FailTask(executionID, + new Exception("Task has been cancelled on the JobManager.")) + } - case updateMsg @ UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) => - - // we receive these from our tasks and forward them to the JobManager - currentJobManager foreach { - jobManager => { - val futureResponse = (jobManager ? updateMsg)(askTimeout) - - val executionID = taskExecutionState.getID - - futureResponse.mapTo[Boolean].onComplete { - // IMPORTANT: In the future callback, we cannot directly modify state - // but only send messages to the TaskManager to do those changes - case Success(result) => - if (!result) { - self ! FailTask(executionID, - new Exception("Task has been cancelled on the JobManager.")) - } - - case Failure(t) => - self ! FailTask(executionID, new Exception( - "Failed to send ExecutionStateChange notification to JobManager")) - }(context.dispatcher) + case Failure(t) => + self ! FailTask(executionID, new Exception( + "Failed to send ExecutionStateChange notification to JobManager")) + }(context.dispatcher) + } } - } - // removes the task from the TaskManager and frees all its resources - case TaskInFinalState(executionID) => - unregisterTaskAndNotifyFinalState(executionID) + // removes the task from the TaskManager and frees all its resources + case TaskInFinalState(executionID) => + unregisterTaskAndNotifyFinalState(executionID) - // starts a new task on the TaskManager - case SubmitTask(tdd) => - submitTask(tdd) + // starts a new task on the TaskManager + case SubmitTask(tdd) => + submitTask(tdd) - // marks a task as failed for an external reason - // external reasons are reasons other than the task code itself throwing an exception - case FailTask(executionID, cause) => - val task = runningTasks.get(executionID) - if (task != null) { - task.failExternally(cause) - } else { - log.debug(s"Cannot find task to fail for execution ${executionID})") - } + // marks a task as failed for an external reason + // external reasons are reasons other than the task code itself throwing an exception + case FailTask(executionID, cause) => + val task = runningTasks.get(executionID) + if (task != null) { + task.failExternally(cause) + } else { + log.debug(s"Cannot find task to fail for execution ${executionID})") + } - // cancels a task - case CancelTask(executionID) => - val task = runningTasks.get(executionID) - if (task != null) { - task.cancelExecution() - sender ! new TaskOperationResult(executionID, true) - } else { - log.debug(s"Cannot find task to cancel for execution ${executionID})") - sender ! new TaskOperationResult(executionID, false, + // cancels a task + case CancelTask(executionID) => + val task = runningTasks.get(executionID) + if (task != null) { + task.cancelExecution() + sender ! new TaskOperationResult(executionID, true) + } else { + log.debug(s"Cannot find task to cancel for execution ${executionID})") + sender ! new TaskOperationResult(executionID, false, "No task with that execution ID was found.") - } + } - case PartitionState(taskExecutionId, taskResultId, partitionId, state) => - Option(runningTasks.get(taskExecutionId)) match { - case Some(task) => - task.onPartitionStateUpdate(taskResultId, partitionId, state) - case None => - log.debug(s"Cannot find task $taskExecutionId to respond with partition state.") - } + case PartitionState(taskExecutionId, taskResultId, partitionId, state) => + Option(runningTasks.get(taskExecutionId)) match { + case Some(task) => + task.onPartitionStateUpdate(taskResultId, partitionId, state) + case None => + log.debug(s"Cannot find task $taskExecutionId to respond with partition state.") + } + } } }
