On Thu, Jan 26, 2017 at 4:20 PM, Ben Chambers
<[email protected]> wrote:
> Here's an example API that would make this part of a DoFn. The idea here is
> that it would still be run as `ParDo.of(new MyBatchedDoFn())`, but the
> runner (and DoFnRunner) could see that it has asked for batches, so rather
> than calling a `processElement` on every input `I`, it assembles a
> `Collection<I>` and then calls the method.
>
> Possible API making this part of DoFn (with a fixed size):
>
> public MyBatchedDoFn extends DoFn<I, O> {
> @ProcessBatch(size = 50)
> public void processBatch(ProcessContext c) {
> Collection<I> batchContents = c.element();
> ...
> }
> }
>
> Possible API making this part of DoFn (with dynamic size):
>
> public MyBatchedDoFn extends DoFn<I, O> {
> @ProcessBatch
> public boolean processBatch(ProcessContext c) {
> Collection<I> batchContents = c.element();
> if (batchContents.size() < 50) {
> return false; // batch not yet processed
> }
>
> ...
> return true;
> }
> }
Or even
public MyBatchedDoFn extends DoFn<I, O> {
public void processElement(Iterable<I> batch) {
[process the batch]
}
}
though I'd rather this not be baked into the DoFn API if it can be
solved separately.