Hi Hong,

In addition, I guess the inconsistency could also be handled by the
reader.start() method such that the operator event is re-sent during
restore.

Best,
Mason

On Tue, May 23, 2023 at 7:58 AM Piotr Nowojski <pnowoj...@apache.org> wrote:

> Hi,
>
> I vaguely remember someone implementing a mechanism to deal with it. I
> think at least at some point (it might have changed since I looked at it),
> it was solving the problem via canceling the checkpoint in the scenario
> that you described. However I can not remember from the top of my head
> neither the ticket number nor where is the code for that. Also I might be
> completely wrong. If I don't forget, I can try to find it tomorrow.
>
> Best,
> Piotrek
>
> śr., 17 maj 2023 o 17:39 Teoh, Hong <lian...@amazon.co.uk.invalid>
> napisał(a):
>
> > Hi all,
> >
> > I’m writing a new source based on the FLIP-27 Source API, and I had some
> > questions on the checkpointing mechanisms and associated guarantees.
> Would
> > appreciate if someone more familiar with the API would be able to provide
> > insights here!
> >
> > In FLIP-27 Source, we now have a SplitEnumerator (running on JM) and a
> > SourceReader (running on TM). However, the SourceReader can send events
> to
> > the SplitEnumerator. Given this, we have introduced a “loopback”
> > communication mechanism from TM to JM, and I wonder if/how we handle this
> > during checkpoints.
> >
> >
> > Example of how data might be lost:
> > 1. Checkpoint 123 triggered
> > 2. SplitEnumerator takes checkpoint of state for checkpoint 123
> > 3. SourceReader sends OperatorEvent 1 and mutates state to reflect this
> > 4. SourceReader takes checkpoint of state for checkpoint 123
> > …
> > 5. Checkpoint 123 completes
> >
> > Let’s assume OperatorEvent 1 would mutate SplitEnumerator state once
> > processed, There is now inconsistent state between SourceReader state and
> > SplitEnumerator state. (SourceReader assumes OperatorEvent 1 is
> processed,
> > whereas SplitEnumerator has not processed OperatorEvent 1)
> >
> > Do we have any mechanisms for mitigating this issue? For example, does
> the
> > SplitEnumerator re-take the snapshot of state for a checkpoint if an
> > OperatorEvent is sent before the checkpoint is complete?
> >
> > Regards,
> > Hong
>

Reply via email to