Yes, what I propose earlier was:

I. checkpoint marker:

@AnyBeamAnnotation
@CheckpointAfter
public void someHook(SomeContext ctx);


II. pipeline.apply(ParDo.of(new MyFn()).withCheckpointAlgorithm(new
CountingAlgo()))

III. (I like this one less)

// in the dofn
@CheckpointTester
public boolean shouldCheckpoint();

IV. @Checkpointer Serializable getCheckpoint(); in the dofn per element




Romain Manni-Bucau
@rmannibucau |  Blog | Old Blog | Github | LinkedIn


2017-11-17 6:06 GMT+01:00 Raghu Angadi <rang...@google.com.invalid>:
> How would you define it (rough API is fine)?. Without more details, it is
> not easy to see wider applicability and feasibility in runners.
>
> On Thu, Nov 16, 2017 at 1:13 PM, Romain Manni-Bucau <rmannibu...@gmail.com>
> wrote:
>
>> This is a fair summary of the current state but also where beam can have a
>> very strong added value and make big data great and smooth.
>>
>> Instead of this replay feature isnt checkpointing willable? In particular
>> with SDF no?
>>
>>
>> Le 16 nov. 2017 19:50, "Raghu Angadi" <rang...@google.com.invalid> a
>> écrit :
>>
>> > Core issue here is that there is no explicit concept of 'checkpoint' in
>> > Beam (UnboundedSource has a method 'getCheckpointMark' but that refers to
>> > the checkoint on external source). Runners do checkpoint internally as
>> > implementation detail. Flink's checkpoint model is entirely different
>> from
>> > Dataflow's and Spark's.
>> >
>> > @StableReplay helps, but it does not explicitly talk about a checkpoint
>> by
>> > design.
>> >
>> > If you are looking to achieve some guarantees with a sink/DoFn, I think
>> it
>> > is better to start with the requirements. I worked on exactly-once sink
>> for
>> > Kafka (see KafkaIO.write().withEOS()), where we essentially reshard the
>> > elements and assign sequence numbers to elements with in each shard.
>> > Duplicates in replays are avoided based on these sequence numbers. DoFn
>> > state API is used to buffer out-of order replays. The implementation
>> > strategy works in Dataflow but not in Flink which has a horizontal
>> > checkpoint. KafkaIO checks for compatibility.
>> >
>> > On Wed, Nov 15, 2017 at 12:38 AM, Romain Manni-Bucau <
>> > rmannibu...@gmail.com>
>> > wrote:
>> >
>> > > Hi guys,
>> > >
>> > > The subject is a bit provocative but the topic is real and coming
>> > > again and again with the beam usage: how a dofn can handle some
>> > > "chunking".
>> > >
>> > > The need is to be able to commit each N records but with N not too big.
>> > >
>> > > The natural API for that in beam is the bundle one but bundles are not
>> > > reliable since they can be very small (flink) - we can say it is "ok"
>> > > even if it has some perf impacts - or too big (spark does full size /
>> > > #workers).
>> > >
>> > > The workaround is what we see in the ES I/O: a maxSize which does an
>> > > eager flush. The issue is that then the checkpoint is not respected
>> > > and you can process multiple times the same records.
>> > >
>> > > Any plan to make this API reliable and controllable from a beam point
>> > > of view (at least in a max manner)?
>> > >
>> > > Thanks,
>> > > Romain Manni-Bucau
>> > > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> > >
>> >
>>

Reply via email to