[ 
https://issues.apache.org/jira/browse/FLINK-6413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988933#comment-15988933
 ] 

Stephan Ewen commented on FLINK-6413:
-------------------------------------

Not sure about this one. Seems quite complex to model a mechanism that is not 
needed by Flink (and exists in Beam because some other runtimes may need it).
This sounds to me like making the runtime more complex to match an interface 
that was designed for a different runtime.

An alternative thought: We had been thinking a while back that we should offer 
an interface to operators to {{processBuffer}} rather than {{processElement}}.
I think that would be more natural than a buffer consumption listener.

Also, given that the network stack needs some quite crucial other fixes along 
latency and stream alignments, I am very skeptical to introduce such added 
complexity unless we have a strong case that some users need that.

> Add stream operator callback to notify about consumed network buffer 
> ---------------------------------------------------------------------
>
>                 Key: FLINK-6413
>                 URL: https://issues.apache.org/jira/browse/FLINK-6413
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>            Reporter: Aljoscha Krettek
>
> This is originally motivated by BEAM-1612. Beam has the notion of bundles and 
> allows users to do work at the start/end of each bundle. This could be used 
> for setting up some expensive connection or for batching accesses to some 
> external system. There is also internal optimisation potential because 
> accesses/updates to state could be kept in-memory per bundle/buffer and only 
> afterwards be written to fault-tolerant state.
> The bundling induced by the Flink network stack (which depends on the network 
> buffer size and the buffer timeout) seems like a natural fit for this. I 
> propose to add an _experimental_ interface {{BufferConsumedListener}} (or 
> some such name):
> {code}
> interface BufferConsumedListener {
>   void notifyBufferConsumed():
> }
> {code}
> that is invoked in the input processor whenever a network buffer is 
> exhausted: 
> https://github.com/apache/flink/blob/922352ac35f3753334e834632e3e361fbd36336e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L178-L178
> The change is very simple, three lines of code would be added:
> {code}
> if (result.isBufferConsumed()) {
>   currentRecordDeserializer.getCurrentBuffer().recycle();
>   currentRecordDeserializer = null;
>   if (streamOperator instanceof BufferConsumedListener) {
>     ((BufferConsumedListener) streamOperator).notifyBufferConsumed():
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to