Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/896#discussion_r34335380
  
    --- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
    @@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
         if (!isConnected) {
           log.debug(s"Dropping message $message because the TaskManager is 
currently " +
             "not connected to a JobManager.")
    -    }
    +    } else {
     
    -    // we order the messages by frequency, to make sure the code paths for 
matching
    -    // are as short as possible
    -    message match {
    +      // we order the messages by frequency, to make sure the code paths 
for matching
    +      // are as short as possible
    +      message match {
    +
    +        // tell the task about the availability of a new input partition
    +        case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =>
    +          updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
    +
    +        // tell the task about the availability of some new input 
partitions
    +        case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) 
=>
    +          updateTaskInputPartitions(executionID, partitionInfos)
    +
    +        // discards intermediate result partitions of a task execution on 
this TaskManager
    +        case FailIntermediateResultPartitions(executionID) =>
    +          log.info("Discarding the results produced by task execution " + 
executionID)
    +          if (network.isAssociated) {
    +            try {
    +              
network.getPartitionManager.releasePartitionsProducedBy(executionID)
    +            } catch {
    +              case t: Throwable => killTaskManagerFatal(
    +                "Fatal leak: Unable to release intermediate result 
partition data", t)
    +            }
    +          }
     
    -      // tell the task about the availability of a new input partition
    -      case UpdateTaskSinglePartitionInfo(executionID, resultID, 
partitionInfo) =>
    -        updateTaskInputPartitions(executionID, List((resultID, 
partitionInfo)))
    +        // notifies the TaskManager that the state of a task has changed.
    +        // the TaskManager informs the JobManager and cleans up in case 
the transition
    +        // was into a terminal state, or in case the JobManager cannot be 
informed of the
    +        // state transition
     
    -      // tell the task about the availability of some new input partitions
    -      case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) =>
    -        updateTaskInputPartitions(executionID, partitionInfos)
    +        case updateMsg@UpdateTaskExecutionState(taskExecutionState: 
TaskExecutionState) =>
     
    -      // discards intermediate result partitions of a task execution on 
this TaskManager
    -      case FailIntermediateResultPartitions(executionID) =>
    -        log.info("Discarding the results produced by task execution " + 
executionID)
    -        if (network.isAssociated) {
    -          try {
    -            
network.getPartitionManager.releasePartitionsProducedBy(executionID)
    -          } catch {
    -            case t: Throwable => killTaskManagerFatal(
    -                "Fatal leak: Unable to release intermediate result 
partition data", t)
    -          }
    -        }
    +          // we receive these from our tasks and forward them to the 
JobManager
    --- End diff --
    
    This is a bug I discovered while reading through the code. It prevents 
processing of messages when the task manager is not connected to the job 
manager. If you look at line 307, it says it would skip the message but 
continues to process it. If you want I can open a separate pull request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to