2017-11-15 15:23 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
> Hi Romain,
>
> 1. you always have the GlobalWindow at least. It's more related to trigger.

Yep but having an implicit window and being able to rely on window
features are 2 things, think some runners don't support it yet so can
be something not reliable enough - this was my main concern. If not
true then it is ok.

> 2. How would you define this ? With annotation (on what in that case) or
> using checkpoint method ?

Both work, annotation on a new type of method in the dofn or an
external checkpoint algorithm. Tend to think the first is easier.

> 3. Agree to have a core PTransform for that.
>
> Regards
> JB
>
>
> On 11/15/2017 02:16 PM, Romain Manni-Bucau wrote:
>>
>> @Reuven: it looks like a good workaround
>> @Ken: thks a lot for the link!
>>
>> @all:
>>
>> 1. do you think it is doable without windowing usage (to have
>> something more reliable in term of runner since it will depend on less
>> primitives?
>> 2. what about allowing the user to define when to checkpoint?
>> 3. can we get this kind of "composite" pattern in the beam core?
>>
>>
>>
>> Romain Manni-Bucau
>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>
>>
>> 2017-11-15 14:12 GMT+01:00 Kenneth Knowles <k...@google.com.invalid>:
>>>
>>> In case the connection is not clear to folks on this thread, I pinged the
>>> thread on @StableReplay / @RequiresStableInput / etc and opened a draft
>>> PR
>>> at https://github.com/apache/beam/pull/4135.
>>>
>>> On Wed, Nov 15, 2017 at 3:24 AM, Reuven Lax <re...@google.com.invalid>
>>> wrote:
>>>
>>>> so I think the following will do exactly that and can be easily factored
>>>> into a reusable transform (modulo Java type boilerplate):
>>>>
>>>> pCollection.apply(WithKeys.of((Element e) ->
>>>> ThreadLocalRandom.current().nextInt(N))
>>>>                    .apply(Window.into(new GlobalWindows())
>>>>
>>>> .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.
>>>> elementCountAtLeast(100))))
>>>>                    .apply(GroupByKey.create())
>>>>                    .apply(ParDo.of(new DoFn<>() {
>>>>                        @ProcessElement
>>>>                        @StableReplay
>>>>                         public void processElement(ProcessContext c) {
>>>>                           // Insert c.element().getValue() into backend.
>>>>                         }
>>>>                     });
>>>>
>>>> On Wed, Nov 15, 2017 at 7:00 PM, Romain Manni-Bucau
>>>> <rmannibu...@gmail.com
>>>>>
>>>>>
>>>> wrote:
>>>>
>>>>> 2017-11-15 11:42 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
>>>>>>
>>>>>> Can we describe this at a higher level?
>>>>>>
>>>>>> I think what you want is the following. Please correct if I'm
>>>>>> misunderstanding.
>>>>>>
>>>>>> Batches of 100 elements (is this a hard requirement, or do they have
>>>>>> to
>>>>>
>>>>> be
>>>>>>
>>>>>> "approximately" 100 element?)
>>>>>
>>>>>
>>>>> Approximately is fine while documented (what is not is 1000000 instead
>>>>> of 10 for instance)
>>>>>
>>>>>>
>>>>>> Once you see a batch, you're guaranteed to see the same batch on
>>>>
>>>> retries.
>>>>>
>>>>>
>>>>> +1
>>>>>
>>>>>>
>>>>>> You want to then idempotently insert this batch into some backend.
>>>>
>>>> Things
>>>>>>
>>>>>> may fail, workers may crash, but in that case you want to get the
>>>>>> exact
>>>>>> same batch back so you can insert it again.
>>>>>
>>>>>
>>>>> +1
>>>>>
>>>>>>
>>>>>> Do you care about ordering? On failure do you have to see the same
>>>>>
>>>>> batches
>>>>>>
>>>>>> in the same order as before, or is it sufficient to see the same
>>>>
>>>> batches?
>>>>>
>>>>>
>>>>> Beam doesnt everywhere so I guess it is not important - at least for
>>>>> my cases this statement is true.
>>>>>
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Wed, Nov 15, 2017 at 5:58 PM, Romain Manni-Bucau <
>>>>>
>>>>> rmannibu...@gmail.com>
>>>>>>
>>>>>> wrote:
>>>>>>
>>>>>>> Overall goal is to ensure each 100 elements max, a "backend" (as
>>>>>>> datastore) flush/commit/push is done and is aligned with beam
>>>>>>> checkpoints. You can see it as bringing the "general" commit-interval
>>>>>>> notion to beam and kind of get rid of the bundle notion which is
>>>>>>> almost impossible to use today.
>>>>>>>
>>>>>>> Romain Manni-Bucau
>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>>>>>>
>>>>>>>
>>>>>>> 2017-11-15 10:27 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
>>>>>>>>
>>>>>>>> It's in the dev list archives, not sure if there's a doc yet.
>>>>>>>>
>>>>>>>> I'm not quite sure I understand what you mean by a "flush" Can you
>>>>>>>
>>>>>>> describe
>>>>>>>>
>>>>>>>> the problem you're trying to solve?
>>>>>>>>
>>>>>>>> Reuven
>>>>>>>>
>>>>>>>> On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau <
>>>>>>>
>>>>>>> rmannibu...@gmail.com>
>>>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hmm, I didn't find the doc - if you have the link not far it would
>>>>
>>>> be
>>>>>>>>>
>>>>>>>>> appreciated - but "before" sounds not enough, it should be "after"
>>>>
>>>> in
>>>>>>>>>
>>>>>>>>> case there was a "flush" no?
>>>>>>>>>
>>>>>>>>> Romain Manni-Bucau
>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2017-11-15 10:10 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
>>>>>>>>>>
>>>>>>>>>> If you set @StableReplay before a ParDo, it forces a checkpoint
>>>>>
>>>>> before
>>>>>>>>>
>>>>>>>>> that
>>>>>>>>>>
>>>>>>>>>> ParDo.
>>>>>>>>>>
>>>>>>>>>> On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <
>>>>>>>>>
>>>>>>>>> rmannibu...@gmail.com>
>>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> It sounds a good start. I'm not sure how a group by key (and not
>>>>>
>>>>> by
>>>>>>>>>>>
>>>>>>>>>>> size) can help controlling the checkpointing interval. Wonder if
>>>>>
>>>>> we
>>>>>>>>>>>
>>>>>>>>>>> shouldn't be able to have a CheckpointPolicy { boolean
>>>>>>>>>>> shouldCheckpoint() } used in the processing event loop. Default
>>>>>
>>>>> could
>>>>>>>>>>>
>>>>>>>>>>> be up to the runner but if set on the transform (or dofn) it
>>>>>
>>>>> would be
>>>>>>>>>>>
>>>>>>>>>>> used to control when the checkpoint is done. Thinking out loud
>>>>
>>>> it
>>>>>>>>>>>
>>>>>>>>>>> sounds close to jbatch checkpoint algorithm
>>>>>>>>>>> (https://docs.oracle.com/javaee/7/api/javax/batch/api/
>>>>>>>>>>> chunk/CheckpointAlgorithm.html)
>>>>>>>>>>>
>>>>>>>>>>> Romain Manni-Bucau
>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net
>>>>>
>>>>> :
>>>>>>>>>>>>
>>>>>>>>>>>> Yes, @StableReplay, that's the annotation. Thanks.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 11/15/2017 09:52 AM, Reuven Lax wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Romain,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think the @StableReplay semantic that Kenn proposed a month
>>>>>
>>>>> or
>>>>>>>
>>>>>>> so
>>>>>>>>>
>>>>>>>>> ago
>>>>>>>>>>>
>>>>>>>>>>> is
>>>>>>>>>>>>>
>>>>>>>>>>>>> what is needed here.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Essentially it will ensure that the GroupByKey iterable is
>>>>>
>>>>> stable
>>>>>>>
>>>>>>> and
>>>>>>>>>>>>>
>>>>>>>>>>>>> checkpointed. So on replay, the GroupByKey is guaranteed to
>>>>>>>
>>>>>>> receive
>>>>>>>>>
>>>>>>>>> the
>>>>>>>>>>>>>
>>>>>>>>>>>>> exact same iterable as it did before. The annotation can be
>>>>
>>>> put
>>>>>>>
>>>>>>> on a
>>>>>>>>>>>
>>>>>>>>>>> ParDo
>>>>>>>>>>>>>
>>>>>>>>>>>>> as well, in which case it ensures stability (and
>>>>>
>>>>> checkpointing) of
>>>>>>>>>
>>>>>>>>> the
>>>>>>>>>>>>>
>>>>>>>>>>>>> individual ParDo elements.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Reuven
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
>>>>>>>>>>>>> <rmannibu...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré <
>>>>>
>>>>> j...@nanthrax.net
>>>>>>>>
>>>>>>>> :
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Romain,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> You are right: currently, the chunking is related to
>>>>
>>>> bundles.
>>>>>>>>>
>>>>>>>>> Today,
>>>>>>>>>>>
>>>>>>>>>>> the
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> bundle size is under the runner responsibility.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think it's fine because only the runner know an efficient
>>>>>>>
>>>>>>> bundle
>>>>>>>>>>>
>>>>>>>>>>> size.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> afraid giving the "control" of the bundle size to the end
>>>>>
>>>>> user
>>>>>>>
>>>>>>> (via
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> pipeline) can result to huge performances issue depending
>>>>
>>>> of
>>>>>
>>>>> the
>>>>>>>>>>>
>>>>>>>>>>> runner.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It doesn't mean that we can't use an uber layer: it's what
>>>>
>>>> we
>>>>>>>
>>>>>>> do in
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ParDoWithBatch or DoFn in IO Sink where we have a batch
>>>>
>>>> size.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Anyway, the core problem is about the checkpoint: why a
>>>>>>>
>>>>>>> checkpoint
>>>>>>>>>
>>>>>>>>> is
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>> "respected" by an IO or runner ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Take the example of a runner deciding the bundle size is 4
>>>>
>>>> and
>>>>>>>
>>>>>>> the
>>>>>>>>>
>>>>>>>>> IO
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> deciding the commit-interval (batch semantic) is 2, what
>>>>>
>>>>> happens
>>>>>>>
>>>>>>> if
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the 3rd record fails? You have pushed to the store 2 records
>>>>>>>
>>>>>>> which
>>>>>>>>>
>>>>>>>>> can
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> be reprocessed by a restart of the bundle and you can get
>>>>>>>>>
>>>>>>>>> duplicates.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Rephrased: I think we need as a framework a batch/chunk
>>>>>
>>>>> solution
>>>>>>>>>
>>>>>>>>> which
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> is reliable. I understand bundles is mapped on the runner
>>>>
>>>> and
>>>>>
>>>>> not
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> really controlled but can we get something more reliable for
>>>>>
>>>>> the
>>>>>>>>>
>>>>>>>>> user?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Maybe we need a @BeforeBatch or something like that.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>>> JB
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi guys,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The subject is a bit provocative but the topic is real and
>>>>>>>
>>>>>>> coming
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> again and again with the beam usage: how a dofn can handle
>>>>>
>>>>> some
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> "chunking".
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The need is to be able to commit each N records but with N
>>>>>
>>>>> not
>>>>>>>
>>>>>>> too
>>>>>>>>>>>
>>>>>>>>>>> big.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The natural API for that in beam is the bundle one but
>>>>>
>>>>> bundles
>>>>>>>
>>>>>>> are
>>>>>>>>>>>
>>>>>>>>>>> not
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> reliable since they can be very small (flink) - we can say
>>>>>
>>>>> it
>>>>>>>
>>>>>>> is
>>>>>>>>>
>>>>>>>>> "ok"
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> even if it has some perf impacts - or too big (spark does
>>>>>
>>>>> full
>>>>>>>>>
>>>>>>>>> size /
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> #workers).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The workaround is what we see in the ES I/O: a maxSize
>>>>
>>>> which
>>>>>>>
>>>>>>> does
>>>>>>>>>
>>>>>>>>> an
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> eager flush. The issue is that then the checkpoint is not
>>>>>>>>>
>>>>>>>>> respected
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> and you can process multiple times the same records.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Any plan to make this API reliable and controllable from a
>>>>>
>>>>> beam
>>>>>>>>>
>>>>>>>>> point
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> of view (at least in a max manner)?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Romain Manni-Bucau
>>>>>>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>>>>>>> jbono...@apache.org
>>>>>>>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>>>> jbono...@apache.org
>>>>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com

Reply via email to