[ 
https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955163#comment-15955163
 ] 

Aljoscha Krettek commented on BEAM-1612:
----------------------------------------

I did some more thinking and came up with a somewhat more involved solution. 
(More involved because it requires changes to Flink code but actually quite 
easy since it could be implemented in about 10 lines of code). This part of the 
code is responsible for reading input data from network buffers: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L182.
 At that position we know that we completely consumed one buffer and are ready 
for accepting the next buffer. The number of records in a buffer depends on the 
configured size of the buffers/memory segments and on the flush timeout. This 
can vary from 1 (if you enable flushing to network after every record) to a 
couple of 100s or 1000s of elements.

[~kenn] and [~lzljs3620320], do you think this is an appropriate amount of 
records for bundles. If yes, I would propose an addition to Flink that allows 
operators to register callbacks for buffer consumption. This callback could 
then be use to for {{@StartBundle}}/{{@FinishBundle}} and maybe also for 
optimisations around state accesses.

> Support real Bundle in Flink runner
> -----------------------------------
>
>                 Key: BEAM-1612
>                 URL: https://issues.apache.org/jira/browse/BEAM-1612
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Jingsong Lee
>            Assignee: Jingsong Lee
>
> The Bundle is very important in the beam model. Users can use the bundle to 
> flush buffer, can reuse many heavyweight resources in a bundle. Most IO 
> plugins use the bundle to flush. 
> Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, 
> such as first placed in JavaHeap, flush into RocksDbState when invoke 
> finishBundle , this can reduce the number of serialization.
> But now FlinkRunner calls the finishBundle every processElement. We need 
> support real Bundle.
> I think we can have the following implementations:
> 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But 
> sometimes this "Bundle" maybe too big. This depends on the user's checkpoint 
> configuration.
> 2.Manually control the size of the bundle. The half-bundle will be flushed to 
> a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not 
> need to wait, just call the startBundle and finishBundle at the right time.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to