[
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14620756#comment-14620756
]
ASF GitHub Bot commented on FLINK-2292:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/896#discussion_r34272101
--- Diff:
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
---
@@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with
ActorSynchronousLogging {
if (!isConnected) {
log.debug(s"Dropping message $message because the TaskManager is
currently " +
"not connected to a JobManager.")
- }
+ } else {
- // we order the messages by frequency, to make sure the code paths for
matching
- // are as short as possible
- message match {
+ // we order the messages by frequency, to make sure the code paths
for matching
+ // are as short as possible
+ message match {
+
+ // tell the task about the availability of a new input partition
+ case UpdateTaskSinglePartitionInfo(executionID, resultID,
partitionInfo) =>
+ updateTaskInputPartitions(executionID, List((resultID,
partitionInfo)))
+
+ // tell the task about the availability of some new input
partitions
+ case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos)
=>
+ updateTaskInputPartitions(executionID, partitionInfos)
+
+ // discards intermediate result partitions of a task execution on
this TaskManager
+ case FailIntermediateResultPartitions(executionID) =>
+ log.info("Discarding the results produced by task execution " +
executionID)
+ if (network.isAssociated) {
+ try {
+
network.getPartitionManager.releasePartitionsProducedBy(executionID)
+ } catch {
+ case t: Throwable => killTaskManagerFatal(
+ "Fatal leak: Unable to release intermediate result
partition data", t)
+ }
+ }
- // tell the task about the availability of a new input partition
- case UpdateTaskSinglePartitionInfo(executionID, resultID,
partitionInfo) =>
- updateTaskInputPartitions(executionID, List((resultID,
partitionInfo)))
+ // notifies the TaskManager that the state of a task has changed.
+ // the TaskManager informs the JobManager and cleans up in case
the transition
+ // was into a terminal state, or in case the JobManager cannot be
informed of the
+ // state transition
- // tell the task about the availability of some new input partitions
- case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) =>
- updateTaskInputPartitions(executionID, partitionInfos)
+ case updateMsg@UpdateTaskExecutionState(taskExecutionState:
TaskExecutionState) =>
- // discards intermediate result partitions of a task execution on
this TaskManager
- case FailIntermediateResultPartitions(executionID) =>
- log.info("Discarding the results produced by task execution " +
executionID)
- if (network.isAssociated) {
- try {
-
network.getPartitionManager.releasePartitionsProducedBy(executionID)
- } catch {
- case t: Throwable => killTaskManagerFatal(
- "Fatal leak: Unable to release intermediate result
partition data", t)
- }
- }
+ // we receive these from our tasks and forward them to the
JobManager
--- End diff --
Here is a lot of changed code that was seemingly edited without need (has
nothing to do with the accumulators). Since that is pretty sensitive code, I
feel very hesitant to commit these massive edits. What was the reason for these
changes in the first place?
> Report accumulators periodically while job is running
> -----------------------------------------------------
>
> Key: FLINK-2292
> URL: https://issues.apache.org/jira/browse/FLINK-2292
> Project: Flink
> Issue Type: Sub-task
> Components: JobManager, TaskManager
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> Accumulators should be sent periodically, as part of the heartbeat that sends
> metrics. This allows them to be updated in real time.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)