[
https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996435#comment-15996435
]
Aljoscha Krettek commented on BEAM-1612:
----------------------------------------
My notion is actually both, I thought about it this morning, in the Park, and I
think I came up with a solution. 😉
Both Flink network buffers and checkpoint barriers (the aforementioned
persistence boundaries in flink) are candidates for "bundles" in Beam. Network
buffers a quite small and would result in a lot of perceived bundles. Also,
there is the problem that user code is not aware of network buffers being
consumed in Flink. Checkpoint barriers, as the proper persistence boundary are
a good candidate but they are a lot more course grained, i.e. they arrive
within several minutes of each other, depending on configuration, because
checkpointing is more heavy weight than the per-bundle commit in the Dataflow
service (I assume).
The main reason why I didn't want to use checkpoint barriers for bundles is
that the {{@FinishBundle}} method allows emitting data while a Flink operator
cannot output data in {{StreamOperator.snapshotState()}}. The reason is that
checkpoint barriers are sent downstream immediately and any elements emitted as
part of checkpointing would have to be "before" the checkpoint barrier in the
stream. However, we can work around that by making any elements emitted by the
{{@FinishBundle}} method part of the checkpoint, emit them at some later point,
and make sure that we re-emit them when restoring from the checkpoint. (Or we
don't allow emitting data in Beam, which would be the easier solution. 😃)
> Support real Bundle in Flink runner
> -----------------------------------
>
> Key: BEAM-1612
> URL: https://issues.apache.org/jira/browse/BEAM-1612
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Reporter: Jingsong Lee
>
> The Bundle is very important in the beam model. Users can use the bundle to
> flush buffer, can reuse many heavyweight resources in a bundle. Most IO
> plugins use the bundle to flush.
> Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState,
> such as first placed in JavaHeap, flush into RocksDbState when invoke
> finishBundle , this can reduce the number of serialization.
> But now FlinkRunner calls the finishBundle every processElement. We need
> support real Bundle.
> I think we can have the following implementations:
> 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But
> sometimes this "Bundle" maybe too big. This depends on the user's checkpoint
> configuration.
> 2.Manually control the size of the bundle. The half-bundle will be flushed to
> a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not
> need to wait, just call the startBundle and finishBundle at the right time.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)