[ 
https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905798#comment-15905798
 ] 

Kenneth Knowles edited comment on BEAM-1612 at 3/13/17 7:24 PM:
----------------------------------------------------------------

I just noticed this, and I agree strongly that it needs to be solved. It is a 
major concern for all runners to get this right in order to have realistic 
performance.

There are some questions surrounding when and how to output data from 
{{@FinishBundle}} since it generally doesn't work with windowing. Bundles are 
not related to a window and can have data from lots of windows. I filed 
BEAM-1283 because I think the spec is very bad, and [~tgroh] filed BEAM-1316 
with an even stronger viewpoint that there should be no output, only flush-like 
operations. But sometimes there may be data that comes back from a flush that 
you need to output - it should be mostly deterministic and independent of 
bundling.

For the relationship with snapshotting, I do want to make sure the ordering is 
clear: The runner is required to call {{FinishBundle}} before durably 
committing otherwise it might not be actually committed, but the commit can 
still fail so the runner is not required to commit right away. So it would be 
fine to just call it every once in a while, and even less often take a 
snapshot, but the important thing is that you can't have other method calls in 
between the {{FinishBundle}} and the commit because they might set up new 
transient state that needs to be flushed.



was (Author: kenn):
I just noticed this, and I agree strongly that it needs to be solved. It is a 
major concern for all runners to get this right in order to have realistic 
performance.

There are some questions surrounding when and how to output data from 
{{@FinishBundle}} since it generally doesn't work with windowing. Bundles are 
not related to a window and can have data from lots of windows. I filed 
BEAM-1283 because I think the spec is very bad, and [~tgroh] filed BEAM-1312 
with an even stronger viewpoint that there should be no output, only flush-like 
operations. But sometimes there may be data that comes back from a flush that 
you need to output - it should be mostly deterministic and independent of 
bundling.

For the relationship with snapshotting, I do want to make sure the ordering is 
clear: The runner is required to call {{FinishBundle}} before durably 
committing otherwise it might not be actually committed, but the commit can 
still fail so the runner is not required to commit right away. So it would be 
fine to just call it every once in a while, and even less often take a 
snapshot, but the important thing is that you can't have other method calls in 
between the {{FinishBundle}} and the commit because they might set up new 
transient state that needs to be flushed.


> Support real Bundle in Flink runner
> -----------------------------------
>
>                 Key: BEAM-1612
>                 URL: https://issues.apache.org/jira/browse/BEAM-1612
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Jingsong Lee
>            Assignee: Jingsong Lee
>
> The Bundle is very important in the beam model. Users can use the bundle to 
> flush buffer, can reuse many heavyweight resources in a bundle. Most IO 
> plugins use the bundle to flush. 
> Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, 
> such as first placed in JavaHeap, flush into RocksDbState when invoke 
> finishBundle , this can reduce the number of serialization.
> But now FlinkRunner calls the finishBundle every processElement. We need 
> support real Bundle.
> I think we can have the following implementations:
> 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But 
> sometimes this "Bundle" maybe too big. This depends on the user's checkpoint 
> configuration.
> 2.Manually control the size of the bundle. The half-bundle will be flushed to 
> a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not 
> need to wait, just call the startBundle and finishBundle at the right time.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to