On Thu, Jan 26, 2017 at 4:20 PM, Ben Chambers <bchamb...@google.com.invalid> wrote: > Here's an example API that would make this part of a DoFn. The idea here is > that it would still be run as `ParDo.of(new MyBatchedDoFn())`, but the > runner (and DoFnRunner) could see that it has asked for batches, so rather > than calling a `processElement` on every input `I`, it assembles a > `Collection<I>` and then calls the method. > > Possible API making this part of DoFn (with a fixed size): > > public MyBatchedDoFn extends DoFn<I, O> { > @ProcessBatch(size = 50) > public void processBatch(ProcessContext c) { > Collection<I> batchContents = c.element(); > ... > } > } > > Possible API making this part of DoFn (with dynamic size): > > public MyBatchedDoFn extends DoFn<I, O> { > @ProcessBatch > public boolean processBatch(ProcessContext c) { > Collection<I> batchContents = c.element(); > if (batchContents.size() < 50) { > return false; // batch not yet processed > } > > ... > return true; > } > }
Or even public MyBatchedDoFn extends DoFn<I, O> { public void processElement(Iterable<I> batch) { [process the batch] } } though I'd rather this not be baked into the DoFn API if it can be solved separately.