[
https://issues.apache.org/jira/browse/FLINK-6413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15994844#comment-15994844
]
Aljoscha Krettek commented on FLINK-6413:
-----------------------------------------
You are right that proper support for processing buffers is the better way to
go. I just wanted to throw this quick idea out there because it was only 3
lines of code. (In the snipped above only the last three lines are new code,
the other code is already there)
Bundling is not required by the other runtimes, it's a feature that allows to
reduce the impact of setting up expensive connections and for batching access
to external systems. Some Flink users have been "abusing" windows for this.
> Add stream operator callback to notify about consumed network buffer
> ---------------------------------------------------------------------
>
> Key: FLINK-6413
> URL: https://issues.apache.org/jira/browse/FLINK-6413
> Project: Flink
> Issue Type: Improvement
> Components: DataStream API
> Reporter: Aljoscha Krettek
>
> This is originally motivated by BEAM-1612. Beam has the notion of bundles and
> allows users to do work at the start/end of each bundle. This could be used
> for setting up some expensive connection or for batching accesses to some
> external system. There is also internal optimisation potential because
> accesses/updates to state could be kept in-memory per bundle/buffer and only
> afterwards be written to fault-tolerant state.
> The bundling induced by the Flink network stack (which depends on the network
> buffer size and the buffer timeout) seems like a natural fit for this. I
> propose to add an _experimental_ interface {{BufferConsumedListener}} (or
> some such name):
> {code}
> interface BufferConsumedListener {
> void notifyBufferConsumed():
> }
> {code}
> that is invoked in the input processor whenever a network buffer is
> exhausted:
> https://github.com/apache/flink/blob/922352ac35f3753334e834632e3e361fbd36336e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L178-L178
> The change is very simple, three lines of code would be added:
> {code}
> if (result.isBufferConsumed()) {
> currentRecordDeserializer.getCurrentBuffer().recycle();
> currentRecordDeserializer = null;
> if (streamOperator instanceof BufferConsumedListener) {
> ((BufferConsumedListener) streamOperator).notifyBufferConsumed():
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)