[
https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15894699#comment-15894699
]
Jingsong Lee commented on BEAM-1612:
------------------------------------
https://issues.apache.org/jira/browse/FLINK-2846
Does this issue let us cannot emit data while snapshotting? We will lose some
emitted data when job restarted.
But we must invoke the finishBundle when snapshotting, otherwise we will lose
some buffer data which not be flushed.
I think we can make a fake collector in OutputManager when snapshotting. And
then save the data to {{FlinkSplitStateInternals}} or
{{FlinkKeyGroupStateInternals}}, the next processElement then send them out, so
that will not lose the data.
This may be a bit complicated, but it may work.
Just like Flink's AsyncFuntion, it stores the input data while we store the
output data.
> Support real Bundle in Flink runner
> -----------------------------------
>
> Key: BEAM-1612
> URL: https://issues.apache.org/jira/browse/BEAM-1612
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Reporter: Jingsong Lee
> Assignee: Jingsong Lee
>
> The Bundle is very important in the beam model. Users can use the bundle to
> flush buffer, can reuse many heavyweight resources in a bundle. Most IO
> plugins use the bundle to flush.
> Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState,
> such as first placed in JavaHeap, flush into RocksDbState when invoke
> finishBundle , this can reduce the number of serialization.
> But now FlinkRunner calls the finishBundle every processElement. We need
> support real Bundle.
> I think we can have the following implementations:
> 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But
> sometimes this "Bundle" maybe too big. This depends on the user's checkpoint
> configuration.
> 2.Manually control the size of the bundle. The half-bundle will be flushed to
> a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not
> need to wait, just call the startBundle and finishBundle at the right time.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)