can be, here the options I have in mind:

I. checkpoint marker:

@AnyBeamAnnotation
@CheckpointAfter
public void someHook(SomeContext ctx);


II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new
CountingAlgo()))

III. (I like this one less)

// in the dofn
@CheckpointTester
public boolean shouldCheckpoint();




Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-15 11:04 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
> And the control is given to the DoFn developer via annotations, right ?
>
> So, bundle would be "hidden" and be internal to the runner (which makes
> sense I think) and we introduce "control" points for the DoFn developer that
> the runner will deal with.
>
> Correct ?
>
> Regards
> JB
>
>
> On 11/15/2017 10:58 AM, Romain Manni-Bucau wrote:
>>
>> Overall goal is to ensure each 100 elements max, a "backend" (as
>> datastore) flush/commit/push is done and is aligned with beam
>> checkpoints. You can see it as bringing the "general" commit-interval
>> notion to beam and kind of get rid of the bundle notion which is
>> almost impossible to use today.
>>
>> Romain Manni-Bucau
>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>
>>
>> 2017-11-15 10:27 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
>>>
>>> It's in the dev list archives, not sure if there's a doc yet.
>>>
>>> I'm not quite sure I understand what you mean by a "flush" Can you
>>> describe
>>> the problem you're trying to solve?
>>>
>>> Reuven
>>>
>>> On Wed, Nov 15, 2017 at 5:25 PM, Romain Manni-Bucau
>>> <rmannibu...@gmail.com>
>>> wrote:
>>>
>>>> Hmm, I didn't find the doc - if you have the link not far it would be
>>>> appreciated - but "before" sounds not enough, it should be "after" in
>>>> case there was a "flush" no?
>>>>
>>>> Romain Manni-Bucau
>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>>>
>>>>
>>>> 2017-11-15 10:10 GMT+01:00 Reuven Lax <re...@google.com.invalid>:
>>>>>
>>>>> If you set @StableReplay before a ParDo, it forces a checkpoint before
>>>>
>>>> that
>>>>>
>>>>> ParDo.
>>>>>
>>>>> On Wed, Nov 15, 2017 at 5:07 PM, Romain Manni-Bucau <
>>>>
>>>> rmannibu...@gmail.com>
>>>>>
>>>>> wrote:
>>>>>
>>>>>> It sounds a good start. I'm not sure how a group by key (and not by
>>>>>> size) can help controlling the checkpointing interval. Wonder if we
>>>>>> shouldn't be able to have a CheckpointPolicy { boolean
>>>>>> shouldCheckpoint() } used in the processing event loop. Default could
>>>>>> be up to the runner but if set on the transform (or dofn) it would be
>>>>>> used to control when the checkpoint is done. Thinking out loud it
>>>>>> sounds close to jbatch checkpoint algorithm
>>>>>> (https://docs.oracle.com/javaee/7/api/javax/batch/api/
>>>>>> chunk/CheckpointAlgorithm.html)
>>>>>>
>>>>>> Romain Manni-Bucau
>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>>>>>
>>>>>>
>>>>>> 2017-11-15 9:55 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
>>>>>>>
>>>>>>> Yes, @StableReplay, that's the annotation. Thanks.
>>>>>>>
>>>>>>>
>>>>>>> On 11/15/2017 09:52 AM, Reuven Lax wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>> Romain,
>>>>>>>>
>>>>>>>> I think the @StableReplay semantic that Kenn proposed a month or so
>>>>
>>>> ago
>>>>>>
>>>>>> is
>>>>>>>>
>>>>>>>> what is needed here.
>>>>>>>>
>>>>>>>> Essentially it will ensure that the GroupByKey iterable is stable
>>>>>>>> and
>>>>>>>> checkpointed. So on replay, the GroupByKey is guaranteed to receive
>>>>
>>>> the
>>>>>>>>
>>>>>>>> exact same iterable as it did before. The annotation can be put on a
>>>>>>
>>>>>> ParDo
>>>>>>>>
>>>>>>>> as well, in which case it ensures stability (and checkpointing) of
>>>>
>>>> the
>>>>>>>>
>>>>>>>> individual ParDo elements.
>>>>>>>>
>>>>>>>> Reuven
>>>>>>>>
>>>>>>>> On Wed, Nov 15, 2017 at 4:49 PM, Romain Manni-Bucau
>>>>>>>> <rmannibu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> 2017-11-15 9:46 GMT+01:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Hi Romain,
>>>>>>>>>>
>>>>>>>>>> You are right: currently, the chunking is related to bundles.
>>>>
>>>> Today,
>>>>>>
>>>>>> the
>>>>>>>>>>
>>>>>>>>>> bundle size is under the runner responsibility.
>>>>>>>>>>
>>>>>>>>>> I think it's fine because only the runner know an efficient bundle
>>>>>>
>>>>>> size.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I'm
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> afraid giving the "control" of the bundle size to the end user
>>>>>>>>>> (via
>>>>>>>>>> pipeline) can result to huge performances issue depending of the
>>>>>>
>>>>>> runner.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> It doesn't mean that we can't use an uber layer: it's what we do
>>>>>>>>>> in
>>>>>>>>>> ParDoWithBatch or DoFn in IO Sink where we have a batch size.
>>>>>>>>>>
>>>>>>>>>> Anyway, the core problem is about the checkpoint: why a checkpoint
>>>>
>>>> is
>>>>>>>>>>
>>>>>>>>>> not
>>>>>>>>>> "respected" by an IO or runner ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Take the example of a runner deciding the bundle size is 4 and the
>>>>
>>>> IO
>>>>>>>>>
>>>>>>>>> deciding the commit-interval (batch semantic) is 2, what happens if
>>>>>>>>> the 3rd record fails? You have pushed to the store 2 records which
>>>>
>>>> can
>>>>>>>>>
>>>>>>>>> be reprocessed by a restart of the bundle and you can get
>>>>
>>>> duplicates.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Rephrased: I think we need as a framework a batch/chunk solution
>>>>
>>>> which
>>>>>>>>>
>>>>>>>>> is reliable. I understand bundles is mapped on the runner and not
>>>>>>>>> really controlled but can we get something more reliable for the
>>>>
>>>> user?
>>>>>>>>>
>>>>>>>>> Maybe we need a @BeforeBatch or something like that.
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Regards
>>>>>>>>>> JB
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 11/15/2017 09:38 AM, Romain Manni-Bucau wrote:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Hi guys,
>>>>>>>>>>>
>>>>>>>>>>> The subject is a bit provocative but the topic is real and coming
>>>>>>>>>>> again and again with the beam usage: how a dofn can handle some
>>>>>>>>>>> "chunking".
>>>>>>>>>>>
>>>>>>>>>>> The need is to be able to commit each N records but with N not
>>>>>>>>>>> too
>>>>>>
>>>>>> big.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> The natural API for that in beam is the bundle one but bundles
>>>>>>>>>>> are
>>>>>>
>>>>>> not
>>>>>>>>>>>
>>>>>>>>>>> reliable since they can be very small (flink) - we can say it is
>>>>
>>>> "ok"
>>>>>>>>>>>
>>>>>>>>>>> even if it has some perf impacts - or too big (spark does full
>>>>
>>>> size /
>>>>>>>>>>>
>>>>>>>>>>> #workers).
>>>>>>>>>>>
>>>>>>>>>>> The workaround is what we see in the ES I/O: a maxSize which does
>>>>
>>>> an
>>>>>>>>>>>
>>>>>>>>>>> eager flush. The issue is that then the checkpoint is not
>>>>
>>>> respected
>>>>>>>>>>>
>>>>>>>>>>> and you can process multiple times the same records.
>>>>>>>>>>>
>>>>>>>>>>> Any plan to make this API reliable and controllable from a beam
>>>>
>>>> point
>>>>>>>>>>>
>>>>>>>>>>> of view (at least in a max manner)?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Romain Manni-Bucau
>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>> jbono...@apache.org
>>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Jean-Baptiste Onofré
>>>>>>> jbono...@apache.org
>>>>>>> http://blog.nanthrax.net
>>>>>>> Talend - http://www.talend.com
>>>>>>
>>>>>>
>>>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com

Reply via email to