[
https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17022938#comment-17022938
]
Kostas Kloudas commented on FLINK-2646:
---------------------------------------
Hi [~maguowei], and thanks for the warming up the discussion on the topic!
One comment about the similarity between EndOfInput (EOI) and checkpoints is
that there is also another difference between them. Focusing on the
StreamingFileSink, which is also the example in the FLIP, the difference is
that the EOI will close the in-progress part files, which the checkpoint
normally does not. The sink currently can keep on writing to the same
in-progress file even after the checkpoint. In this case, it checkpoints the
offset up to which the data is valid. In the case of EOI, we close the
in-progress file because there is no need to keep it open as there will be no
future data. For non 2PC sinks, then we may even not have checkpoints, so the
logic of the EOI will not include checkpointing.
In addition, when checkpointing the state, I think it should be the checkpoint
coordinator who initiates the process because:
# it has to register that the checkpoint/savepoint was successful, and
# make sure that the notifyCheckpointCompleted is sent after all tasks have
checkpointed their state.
For the questions:
# I think that in the case of a mix of bounded and unbounded sources, the
change should target to allow Flink to have checkpoints and the subsequent
checkpoint notifications even if some tasks are finished. This is not only if
we have a mix of bounded and unbounded sources but even in cases of only
bounded sources where some source tasks are faster than others, or sources
finish but intermediate tasks are CPU bound and take longer to finish. Apart
from that, there should also be a way to somehow mark in the checkpoint that
some tasks were finished, so no need to restore them upon recovery or wait for
them to checkpoint state. Of course this requires a lot more design but I think
supporting close and dispose on the UDF level can be done independently.
# For this I agree that more gracefull resource de-allocation may needed but I
think that [~trohrmann] may have more insights.
# For 3.1 I agree that the JobFinished should be received by all tasks and if
not, then restart from the last successful checkpoint. For 3.2 this is why we
go through the normal checkpoint/notifyCheckpointComplete mechanism. On
checkpoint, we take a normal checkpoint so that if this fails we restart from
the previous checkpoint which is consistent state, and only when the checkpoint
is successful (upon notifyCheckpointCompleted), we actually "commit" the data.
If this fails, or some of the tasks fail and some succeed, then we start from
the latest checkpoint (the one we took just before the "commit"), which will
include the latest changes, i.e. that the in-progress files were closed, or in
the case of DRAIN, that the max-watermark was sent.
> User functions should be able to differentiate between successful close and
> erroneous close
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-2646
> URL: https://issues.apache.org/jira/browse/FLINK-2646
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Affects Versions: 0.10.0
> Reporter: Stephan Ewen
> Assignee: Kostas Kloudas
> Priority: Major
> Labels: usability
>
> Right now, the {{close()}} method of rich functions is invoked in case of
> proper completion, and in case of canceling in case of error (to allow for
> cleanup).
> In certain cases, the user function needs to know why it is closed, whether
> the task completed in a regular fashion, or was canceled/failed.
> I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By
> default, this method calls {{close()}}. The runtime is the changed to call
> {{close()}} as part of the regular execution and {{closeAfterFailure()}} in
> case of an irregular exit.
> Because by default all cases call {{close()}} the change would not be API
> breaking.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)