[
https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17022873#comment-17022873
]
Guowei Ma commented on FLINK-2646:
----------------------------------
Thanks for your detailed comments. I think it is very cool unification the
sink for the bounded and unbounded job.
What I understand about the Sinks and Committing as follow, correct me if I am
wrong
# EndOfInput(TaskFinish) is very similar to trigger a checkpoint. One
difference is this checkpoint is not triggered by the _CheckpointCoordinator_.
So maybe Flink could notify the UDF to snapshot state when receiving the
_EndOfInput_
# JobFinish is very similar to the checkpoint complete. So maybe Flink could
also notify the UDF the _notifyCheckpointComplete_ when a job is finished.
So the sink does not need to assume that the input is bounded or unbounded. It
only depends on the checkpoint mechanism to achieve exactly-once on its side.
I have some little questions and thoughts. I want to be on the same page with
you guys through thinking about these problems.
# When does the Flink notify the task CheckpointComplete if a job has both
bounded and unbounded source? Because the job could not finish the finished
tasks of the job could not be notified of the _JobFinished_ An option is that
Flink needs to support triggering the checkpoint for a job that has the
finished tasks and notifying the completion of the checkpoint.
# When a slot could be released for the other task to use? If I understand
correctly all the resources(included managed memory) should be released in the
_dispose_ stage in the new design. So a task could not release any resource
even after the task reports it is finished to JM if it needs to be notified of
the _JobFinish_ As far as I know the JM could release slot when all the tasks
in it are finished. This might lead to inconsistency. I am not pretty sure
there are some specific cases for this. But I think it might be some potential
risks in theory.
# Flink needs to guarantee that the JobFinish event is received by all the
tasks. Flink could not receive the acknowledgment of the JobFinish event from
the task. There could be two situations. (The drain might have the same claim
in some situations.)
## JobFinshed request/response is lost. Retrying JobFinished notification
might resolve this problem.
## The task failed when handling the JobFinished event. So Flink could not
receive the acknowledge. Flink could use the normal Failover Strategy and
restart the task with the state that is snapshotted at the moment of the
_endOfInput_. This could trigger another round _endOfInput_ and _JobFinished_.
But I think this only works for the source that supports the checkpoint. (JM
failover when notifying the JobFinish Event to the task. The new JM should
notify the JobFinshed evet to all the tasks.)
> User functions should be able to differentiate between successful close and
> erroneous close
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-2646
> URL: https://issues.apache.org/jira/browse/FLINK-2646
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Affects Versions: 0.10.0
> Reporter: Stephan Ewen
> Assignee: Kostas Kloudas
> Priority: Major
> Labels: usability
>
> Right now, the {{close()}} method of rich functions is invoked in case of
> proper completion, and in case of canceling in case of error (to allow for
> cleanup).
> In certain cases, the user function needs to know why it is closed, whether
> the task completed in a regular fashion, or was canceled/failed.
> I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By
> default, this method calls {{close()}}. The runtime is the changed to call
> {{close()}} as part of the regular execution and {{closeAfterFailure()}} in
> case of an irregular exit.
> Because by default all cases call {{close()}} the change would not be API
> breaking.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)