[ 
https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17025065#comment-17025065
 ] 

Kostas Kloudas commented on FLINK-2646:
---------------------------------------

I think that the discussion deviates a bit from the topic described in the 
JIRA. 

This JIRA is targeting to explicitly differentiate between normal and 
exceptional termination at the UDF level. I would say that that from the 
discussion so far it seems that we agree to do it because no matter how we 
choose to implement drain, stop-with-savepoint, EOI and cancel on bounded or 
unbounded streams, some cases need this differentiation. The difference with 
the FLIP is that we may be able to do it by only having one method (sth like 
{{endOfInput()}}) instead of two.

[~maguowei] For unbounded streams, we leverage the checkpointing mechanism to 
guarantee exactly-once semantics and we can have the {{CheckpointCoordinator}} 
coordinate the whole stop-with-savepoint/drain process. I agree that the 
scenario of bounded streams and 2PC can be tricky because the EOI is "injected" 
to the stream by the sources, while checkpoint barriers come from the 
checkpoint coordinator, and these two actions should go hand-in-hand. 
Essentially the EOI should somehow be communicated to the CheckpointCoordinator 
so that it can "finalise" the commit process of the output.

In addition, you raised the question of graceful resource freeing during 
stopping, which is also an open and interesting question. I think that each one 
of them needs more thinking and they deserve a discussion of their own.

What do you think?

> User functions should be able to differentiate between successful close and 
> erroneous close
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-2646
>                 URL: https://issues.apache.org/jira/browse/FLINK-2646
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 0.10.0
>            Reporter: Stephan Ewen
>            Assignee: Kostas Kloudas
>            Priority: Major
>              Labels: usability
>
> Right now, the {{close()}} method of rich functions is invoked in case of 
> proper completion, and in case of canceling in case of error (to allow for 
> cleanup).
> In certain cases, the user function needs to know why it is closed, whether 
> the task completed in a regular fashion, or was canceled/failed.
> I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By 
> default, this method calls {{close()}}. The runtime is the changed to call 
> {{close()}} as part of the regular execution and {{closeAfterFailure()}} in 
> case of an irregular exit.
> Because by default all cases call {{close()}} the change would not be API 
> breaking.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to