[
https://issues.apache.org/jira/browse/BEAM-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16283044#comment-16283044
]
Solomon Duskis commented on BEAM-3311:
--------------------------------------
I definitely agree that larger bundles are important. I would need help from
the Beam team at large to figure out a general solution to this problem.
Here are some useful examples of controlling bundling using Beam constructs
that I got from the Dataflow team that you can use to create a solution that
would work in your specific case:
* Here is an example of how to use a stateful DoFn to buffer and pushback data
here: https://beam.apache.org/blog/2017/08/28/timely-processing.html. Using a
stateful DoFn will allow you to control exactly when data is output to
BigtableIO but is more complicated to write and get correct.
* Alternatively, you can add a set of steps which will buffer data using a
trigger.
PubSubIO -> ... original pipeline ... -> ParDo(Choose a random key in [0,
1000)) -> Window.into(new
GlobalWindows()).triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
-> GBK -> Values -> BigtableIO
The logic behind the above pipeline is that your regrouping all your data into
a fixed key space [0, 1000) in the global window and then attempting to write
to BigtableIO every 10 seconds. This will cause you to get average bundles of:
data output from original pipeline in 10 seconds / 1000 keys.
Good thing is that the transform needed to write is easy and you push all the
buffering logic to the system instead of owning it. Bad thing is that your
rewindowing which may not work depending on whether your writing windowing
information to BigTable.
> Extend BigTableIO to write Iterable of KV
> ------------------------------------------
>
> Key: BEAM-3311
> URL: https://issues.apache.org/jira/browse/BEAM-3311
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-gcp
> Affects Versions: 2.2.0
> Reporter: Anna Smith
> Assignee: Solomon Duskis
>
> The motivation is to achieve qps as advertised in BigTable in Dataflow
> streaming mode (ex: 300k qps for 30 node cluster). Currently we aren't
> seeing this as the bundle size is small in streaming mode and the requests
> are overwhelmed by AuthentiationHeader. For example, in order to achieve qps
> advertised each payload is recommended to be ~1KB but without batching each
> payload is 7KB, the majority of which is the authentication header.
> Currently BigTableIO supports DoFn<KV<ByteString, Iterable<Mutation>>,...>
> where batching is done per Bundle on flush in finishBundle. We would like to
> be able to manually batch using a DoFn<Iterable<KV<ByteString,
> Iterable<Mutation>>>,...> so we can get around the small Bundle size in
> streaming. We have seen some improvements in qps to BigTable when running
> with Dataflow using this approach.
> Initial thoughts on implementation would be to extend Write in order to have
> a BulkWrite of Iterable<KV<ByteString, Iterable<Mutation>>>.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)