Hi Hong, In addition, I guess the inconsistency could also be handled by the reader.start() method such that the operator event is re-sent during restore.
Best, Mason On Tue, May 23, 2023 at 7:58 AM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hi, > > I vaguely remember someone implementing a mechanism to deal with it. I > think at least at some point (it might have changed since I looked at it), > it was solving the problem via canceling the checkpoint in the scenario > that you described. However I can not remember from the top of my head > neither the ticket number nor where is the code for that. Also I might be > completely wrong. If I don't forget, I can try to find it tomorrow. > > Best, > Piotrek > > śr., 17 maj 2023 o 17:39 Teoh, Hong <lian...@amazon.co.uk.invalid> > napisał(a): > > > Hi all, > > > > I’m writing a new source based on the FLIP-27 Source API, and I had some > > questions on the checkpointing mechanisms and associated guarantees. > Would > > appreciate if someone more familiar with the API would be able to provide > > insights here! > > > > In FLIP-27 Source, we now have a SplitEnumerator (running on JM) and a > > SourceReader (running on TM). However, the SourceReader can send events > to > > the SplitEnumerator. Given this, we have introduced a “loopback” > > communication mechanism from TM to JM, and I wonder if/how we handle this > > during checkpoints. > > > > > > Example of how data might be lost: > > 1. Checkpoint 123 triggered > > 2. SplitEnumerator takes checkpoint of state for checkpoint 123 > > 3. SourceReader sends OperatorEvent 1 and mutates state to reflect this > > 4. SourceReader takes checkpoint of state for checkpoint 123 > > … > > 5. Checkpoint 123 completes > > > > Let’s assume OperatorEvent 1 would mutate SplitEnumerator state once > > processed, There is now inconsistent state between SourceReader state and > > SplitEnumerator state. (SourceReader assumes OperatorEvent 1 is > processed, > > whereas SplitEnumerator has not processed OperatorEvent 1) > > > > Do we have any mechanisms for mitigating this issue? For example, does > the > > SplitEnumerator re-take the snapshot of state for a checkpoint if an > > OperatorEvent is sent before the checkpoint is complete? > > > > Regards, > > Hong >