Here's an example API that would make this part of a DoFn. The idea here is
that it would still be run as `ParDo.of(new MyBatchedDoFn())`, but the
runner (and DoFnRunner) could see that it has asked for batches, so rather
than calling a `processElement` on every input `I`, it assembles a
`Collection<I>` and then calls the method.

Possible API making this part of DoFn (with a fixed size):

public MyBatchedDoFn extends DoFn<I, O> {
  @ProcessBatch(size = 50)
  public void processBatch(ProcessContext c) {
    Collection<I> batchContents = c.element();
    ...
  }
}

Possible API making this part of DoFn (with dynamic size):

public MyBatchedDoFn extends DoFn<I, O> {
  @ProcessBatch
  public boolean processBatch(ProcessContext c) {
    Collection<I> batchContents = c.element();
    if (batchContents.size() < 50) {
      return false; // batch not yet processed
    }

    ...
    return true;
  }
}

On Thu, Jan 26, 2017 at 4:16 PM Robert Bradshaw <rober...@google.com.invalid>
wrote:

> On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov
> <kirpic...@google.com.invalid> wrote:
> > I agree that wrapping the DoFn is probably not the way to go, because the
> > DoFn may be quite tricky due to all the reflective features: e.g. how do
> > you automatically "batch" a DoFn that uses state and timers? What about a
> > DoFn that uses a BoundedWindow parameter? What about a splittable DoFn?
> > What about future reflective features? The class for invoking DoFn's,
> > DoFnInvokers, is absent from the SDK (and present in runners-core) for a
> > good reason.
> >
> > I'd rather leave the intricacies of invoking DoFn's to runners, and say
> > that you can't wrap DoFn's, period - "adapter", "decorator" and other
> > design patterns just don't apply to DoFn's.
>
> As a simple example, given a DoFn<T, O> it's perfectly natural to want
> to "wrap" this as a DoFn<KV<K, T>, KV<K, O>>. State, side inputs,
> windows, etc. would just be passed through.
>
> The fact that this is complicated, with reflection and flexible
> signatures and byte generation, is a property of the SDK (to provide a
> flexible DoFn API). I agree that it's nice to hide this complexity
> from the user, and it discourages this kind of composability.
>
> I would say that it's nice to let the "batching fn" have side inputs,
> setup/teardown, etc. Pretty much everything the current DoFn has,
> though of course using certain properties (e.g. state and timers, or
> windows) would restrict bundles to be contained within a single
> key/window/whatever.
>
> > The two options for batching are:
> > - A transform that takes elements and produces batches, like Robert said
> > - A simple Beam-agnostic library that takes Java objects and produces
> > batches of Java objects, with an API that makes it convenient to use in a
> > typical batching DoFn
>
> I don't think a Beam-agnostic library could correctly handle details
> like windowing and timestamps.
>
>
> On Thu, Jan 26, 2017 at 3:53 PM, Ben Chambers
> <bchamb...@google.com.invalid> wrote:
> > The third option for batching:
> >
> > - Functionality within the DoFn and DoFnRunner built as part of the SDK.
> >
> > I haven't thought through Batching, but at least for the
> > IntraBundleParallelization use case this actually does make sense to
> expose
> > as a part of the model. Knowing that a DoFn supports parallelization, a
> > runner may want to control how much parallelization is allowed, and the
> > DoFn also needs to make sure to wait on all those threads (and make sure
> > they're properly setup for logging/metrics/etc. associated with the
> current
> > step).
> >
> > There may be good reasons to make this a property of a DoFn that the
> runner
> > can inspect, and support. For instance, if a DoFn wants to process
> batches
> > of 50, it may be possible to factor that into how input is split/bundled.
>
> That's an interesting idea. I think this could also be done via the Fn
> API by recognizing the URN of "batching DoFn."
>

Reply via email to