On Thu, Jan 26, 2017 at 4:20 PM, Ben Chambers
<bchamb...@google.com.invalid> wrote:
> Here's an example API that would make this part of a DoFn. The idea here is
> that it would still be run as `ParDo.of(new MyBatchedDoFn())`, but the
> runner (and DoFnRunner) could see that it has asked for batches, so rather
> than calling a `processElement` on every input `I`, it assembles a
> `Collection<I>` and then calls the method.
>
> Possible API making this part of DoFn (with a fixed size):
>
> public MyBatchedDoFn extends DoFn<I, O> {
>   @ProcessBatch(size = 50)
>   public void processBatch(ProcessContext c) {
>     Collection<I> batchContents = c.element();
>     ...
>   }
> }
>
> Possible API making this part of DoFn (with dynamic size):
>
> public MyBatchedDoFn extends DoFn<I, O> {
>   @ProcessBatch
>   public boolean processBatch(ProcessContext c) {
>     Collection<I> batchContents = c.element();
>     if (batchContents.size() < 50) {
>       return false; // batch not yet processed
>     }
>
>     ...
>     return true;
>   }
> }

Or even

public MyBatchedDoFn extends DoFn<I, O> {
  public void processElement(Iterable<I> batch) {
    [process the batch]
  }
}

though I'd rather this not be baked into the DoFn API if it can be
solved separately.

Reply via email to