[
https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17024938#comment-17024938
]
Guowei Ma commented on FLINK-2646:
----------------------------------
Share some thoughts about who should trigger the last checkpoint in the bounded
stream scenario:
The _CheckpointCoordiantor_ could trigger the last checkpoint. However, I think
that it has some advantages that the task triggers the last checkpoint in this
scenario. Of course, it is the responsibility that the _CheckpointCoordinator_
should notify the completion of the checkpoint in both two options.
# There would be fewer RPC calls
## The first option (_CheckpointCoodinator_ triggers the last checkpoint)
### Task->JM: Task is finished
### JM -> _CheckpointCoordinator_ -> Task: Trigger the last checkpoint.
### Task -> _CheckpointCoordinator_: Acknowledge the last checkpoint.
### _CheckpointCoodinator_ -> Task: Confirm the completion of the last
checkpoint.
## Second Option (Task triggers the last checkpoint)
### Task -> _CheckpointCoordinator_: Task acknowledges the last checkpoint
when it reaches the end of input.
### _CheckpointCoordinator_ -> Task: Confirm the completion of the last
checkpoint
### Task -> JM: Task is finished.
## The second option is at least once less RPC call than the first option.
Actually I think the step of a.i is not intuitive that _CheckpointCoordinator_
sends RPC call to the Finished tasks. If Flink introduces another state such as
End_Of_Input to the ExecutionState, there would another more RPC call.
# Release the resource is simpler. The _CheckpointCoodinator_ should notify
_Scheduler_ of the completion of the last checkpoint then the _Scheduler_ could
release the slot properly in the first option. The second option does not need
to do anything special for releasing the resource properly.
# Easier to deal with the AM failover scenario. Currently, if a job reaches
the FINISHED status its’ JobGraph would be removed from the JobGraphStore. So
when a new AM grants the leadership it would not re-summitted the Job. We
should only remove the JobGraph when the CheckpointCoordinator confirms the
notification of last-checkpoint is done in the first option. In the second
option, we could do nothing.
In the drain scenario, Flink should trigger the checkpoint from the
CheckpointCoordiantor.
> User functions should be able to differentiate between successful close and
> erroneous close
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-2646
> URL: https://issues.apache.org/jira/browse/FLINK-2646
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Affects Versions: 0.10.0
> Reporter: Stephan Ewen
> Assignee: Kostas Kloudas
> Priority: Major
> Labels: usability
>
> Right now, the {{close()}} method of rich functions is invoked in case of
> proper completion, and in case of canceling in case of error (to allow for
> cleanup).
> In certain cases, the user function needs to know why it is closed, whether
> the task completed in a regular fashion, or was canceled/failed.
> I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By
> default, this method calls {{close()}}. The runtime is the changed to call
> {{close()}} as part of the regular execution and {{closeAfterFailure()}} in
> case of an irregular exit.
> Because by default all cases call {{close()}} the change would not be API
> breaking.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)