Hi, Jiangjie,

Thanks for your reply and suggestion.

In fact, we don't want to modify the way JM triggers checkpoint, but we
hope to give OperatorCoodinator a mechanism similar to
ExternallyInducedSourceReader to coordinate the sending timing of
checkpoint barrier (just advance from Source to OperatorCoodinator). We
hope that the produced data and Checkpoint have a one-to-one mapping. If
there is such a mechanism, the difficulty of programming and design can be
greatly simplified.

In addition, I am not sure if there is the same need in other
OperatorCoordinator, because we always make a snapshot of
OperatorCoordinator immediately.

Thanks,
Ming Li


Becket Qin <becket....@gmail.com> 于2023年2月28日周二 08:31写道:

> Hi Ming,
>
> I am not sure if I fully understand what you want. It seems what you are
> looking for is to have a checkpoint triggered at a customized timing which
> aligns with some semantic. This is not what the current checkpoint in Flink
> was designed for. I think the basic idea of checkpoint is to just take a
> snapshot of the current state, so we can restore to that state in case of
> failure. This is completely orthogonal to the data semantic.
>
> Even with the ExternallyInducedSourceReader, the checkpoint is still
> triggered by the JM. It is just the effective checkpoint barrier message (a
> custom message in this case) will not be sent by the JM, but by the
> external source storage. This helps when the external source storage needs
> its own internal state to be aligned with the state of the Flink
> SourceReader. For example, if the external source storage can only seek at
> some bulk boundary, then it might wait until the current bulk to finish
> before it sends the custom checkpoint barrier to the SourceReader.
>
>  Considering this scenario, if the data we want has not been produced yet,
> > but the *SourceCoordinator* receives the c*heckpoint* message, it will
> > directly make a *checkpoint*, and the *ExternallyInducedSource* will not
> > make a *checkpoint* immediately after receiving the *checkpoint*, but
> > continues to wait for a new split. Even if a new split is generated, due
> to
> > the behavior of closing *gateway* in *FLINK-28606*, the new split cannot
> be
> > assigned to the *Source*, resulting in a deadlock (or forced to wait for
> > checkpoint to time out).
>
>
> In this case, the source reader should not "wait" for the splits that are
> not included in this checkpoint. These splits should be a part of the next
> checkpoint. It would be the Sink's responsibility to ensure the output is
> committed in a way that aligns with the user semantic.
>
> That said, I agree it might be useful in some cases if users can decided
> the checkpoint triggering timing. But that will be a new feature which
> needs some careful design.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>

Reply via email to