On Thu, Jan 26, 2017 at 5:04 PM, Eugene Kirpichov
<kirpic...@google.com.invalid> wrote:
> It would be nice to start with an inventory of the batching use cases we
> already have implemented manually, and see what kind of API would be
> sufficient to replace all of them.

Sounds like a good way to make forward progress.

> E.g.:
> - both fixed and dynamic batch size as specified above are insufficient for
> what PubsubIO and ElasticsearchIO do (they flush batches based on either
> #docs written, or their total byte size). PubsubIO can also add multiple
> elements to the batch in one ProcessElement call, and can respectively
> flush the batch multiple times.
> - JdbcIO doesn't store the input elements directly - instead it uses each
> one to do a call on a PreparedStatement, and flushes the PreparedStatement
> when necessary. This could be changed though.
> - In MongoDb, you need to make a copy of the element (Document) before
> adding it to the batch, because flushing the batch will modify it. Though
> this could, too, be changed (e.g. make a copy of every Document in the
> batch, before flushing the batch)
>
> Note that all of these "batch DoFn's" do not emit output at all, so
> concerns about output timestamps do not apply to them. Maybe we should
> special-case just this use case, no output? Maybe for this case, again, a
> Beam-agnostic library will be sufficient.

The no-output case is certainly much easier to handle, and a
beam-agnostic library would likely be sufficient (unless one wants to
batch across bundles or abstract away things like timers for bounded
latency).

The most common cases that I've run into are:

- Contacting some external service that has a batch API (e.g. e.g.
databases that can do multiple point lookups in one request, or
calling a service to get the status (e.g. finished, authorized, etc.)
of multiple items at once, or other batchy APIs).
- Calling libraries that can amortize costs over multiple elements
(e.g. vector-matrix multiplications, or tensorflow (ML) graph
application).

For each of these the "naive" batching messes up the output windows
and timestamps, which are 1:1 with the inputs.

> On Thu, Jan 26, 2017 at 4:20 PM Ben Chambers <bchamb...@google.com.invalid>
> wrote:
>
>> Here's an example API that would make this part of a DoFn. The idea here is
>> that it would still be run as `ParDo.of(new MyBatchedDoFn())`, but the
>> runner (and DoFnRunner) could see that it has asked for batches, so rather
>> than calling a `processElement` on every input `I`, it assembles a
>> `Collection<I>` and then calls the method.
>>
>> Possible API making this part of DoFn (with a fixed size):
>>
>> public MyBatchedDoFn extends DoFn<I, O> {
>>   @ProcessBatch(size = 50)
>>   public void processBatch(ProcessContext c) {
>>     Collection<I> batchContents = c.element();
>>     ...
>>   }
>> }
>>
>> Possible API making this part of DoFn (with dynamic size):
>>
>> public MyBatchedDoFn extends DoFn<I, O> {
>>   @ProcessBatch
>>   public boolean processBatch(ProcessContext c) {
>>     Collection<I> batchContents = c.element();
>>     if (batchContents.size() < 50) {
>>       return false; // batch not yet processed
>>     }
>>
>>     ...
>>     return true;
>>   }
>> }
>>
>> On Thu, Jan 26, 2017 at 4:16 PM Robert Bradshaw
>> <rober...@google.com.invalid>
>> wrote:
>>
>> > On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov
>> > <kirpic...@google.com.invalid> wrote:
>> > > I agree that wrapping the DoFn is probably not the way to go, because
>> the
>> > > DoFn may be quite tricky due to all the reflective features: e.g. how
>> do
>> > > you automatically "batch" a DoFn that uses state and timers? What
>> about a
>> > > DoFn that uses a BoundedWindow parameter? What about a splittable DoFn?
>> > > What about future reflective features? The class for invoking DoFn's,
>> > > DoFnInvokers, is absent from the SDK (and present in runners-core) for
>> a
>> > > good reason.
>> > >
>> > > I'd rather leave the intricacies of invoking DoFn's to runners, and say
>> > > that you can't wrap DoFn's, period - "adapter", "decorator" and other
>> > > design patterns just don't apply to DoFn's.
>> >
>> > As a simple example, given a DoFn<T, O> it's perfectly natural to want
>> > to "wrap" this as a DoFn<KV<K, T>, KV<K, O>>. State, side inputs,
>> > windows, etc. would just be passed through.
>> >
>> > The fact that this is complicated, with reflection and flexible
>> > signatures and byte generation, is a property of the SDK (to provide a
>> > flexible DoFn API). I agree that it's nice to hide this complexity
>> > from the user, and it discourages this kind of composability.
>> >
>> > I would say that it's nice to let the "batching fn" have side inputs,
>> > setup/teardown, etc. Pretty much everything the current DoFn has,
>> > though of course using certain properties (e.g. state and timers, or
>> > windows) would restrict bundles to be contained within a single
>> > key/window/whatever.
>> >
>> > > The two options for batching are:
>> > > - A transform that takes elements and produces batches, like Robert
>> said
>> > > - A simple Beam-agnostic library that takes Java objects and produces
>> > > batches of Java objects, with an API that makes it convenient to use
>> in a
>> > > typical batching DoFn
>> >
>> > I don't think a Beam-agnostic library could correctly handle details
>> > like windowing and timestamps.
>> >
>> >
>> > On Thu, Jan 26, 2017 at 3:53 PM, Ben Chambers
>> > <bchamb...@google.com.invalid> wrote:
>> > > The third option for batching:
>> > >
>> > > - Functionality within the DoFn and DoFnRunner built as part of the
>> SDK.
>> > >
>> > > I haven't thought through Batching, but at least for the
>> > > IntraBundleParallelization use case this actually does make sense to
>> > expose
>> > > as a part of the model. Knowing that a DoFn supports parallelization, a
>> > > runner may want to control how much parallelization is allowed, and the
>> > > DoFn also needs to make sure to wait on all those threads (and make
>> sure
>> > > they're properly setup for logging/metrics/etc. associated with the
>> > current
>> > > step).
>> > >
>> > > There may be good reasons to make this a property of a DoFn that the
>> > runner
>> > > can inspect, and support. For instance, if a DoFn wants to process
>> > batches
>> > > of 50, it may be possible to factor that into how input is
>> split/bundled.
>> >
>> > That's an interesting idea. I think this could also be done via the Fn
>> > API by recognizing the URN of "batching DoFn."
>> >
>>

Reply via email to