For 2) the race condition, I was more thinking of still injecting the
barrier at the source in all cases, but having some kind of short-cut to
immediately execute the RPC inside the respective taskmanager. However,
that may prove hard in case of dynamic scale-ins. Nevertheless, because of
this race condition, we should still take some time to think about it as it
effectively means we need to support handling a barrier in a finished task
anyways. Maybe a finished task is still assigned to a TM with JM as a
fallback?

For your question: will there ever be intermediate operators that should be
running that are not connected to at least once source?
I think there are plenty of examples if you go beyond chained operators and
fully connected exchanges. Think of any fan-in, let's assume you have
source S1...S4, with S1+S2->M1, and S3+S4->M2. If S1 is finished, S2 and M1
is still running. Or I didn't get your question ;).

On Tue, Jan 5, 2021 at 5:00 PM Yun Gao <yungao...@aliyun.com> wrote:

>      Hi Aljoscha,
>
>          Very thanks for the feedbacks!
>
>          For the second issue, I'm indeed thinking the race condition
> between deciding to trigger and operator get finished. And for this point,
>
>  >
> One thought here is this: will there ever be intermediate operators that
> > should be running that are not connected to at least once source? The
> > only case I can think of right now is async I/O. Or are there others? If
> > we think that there will never be intermediate operators that are not
> > connected to at least once source we might come up with a simpler
> > solution.
>
>      I think there are still cases that the intermediate operators runs
> with all its sources have finished, for example, source -> sink writer ->
> sink committer -> sink global committer,  since sink committer need to wait
> for one more checkpoint between endOfInput and close,
> it would continue to run after the source and sink writer are finished,
> until we could finish one checkpoint. And since the four operators could
> also be chained in one task, we may also need to consider the case that
> part of operators are finished when taking snapshot in
> of the tasks.
>
>    Best,
>     Yun
>
>
> ------------------------------------------------------------------
> From:Aljoscha Krettek <aljos...@apache.org>
> Send Time:2021 Jan. 5 (Tue.) 22:34
> To:dev <dev@flink.apache.org>
> Cc:Yun Gao <yungao...@aliyun.com>
> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
> On 2021/01/05 10:16, Arvid Heise wrote:
> >1. I'd think that this is an orthogonal issue, which I'd solve separately.
> >My gut feeling says that this is something we should only address for new
> >sinks where we decouple the semantics of commits and checkpoints
> >anyways. @Aljoscha
> >Krettek <aljos...@apache.org> any idea on this one?
>
> I also think it's somewhat orthogonal, let's see where we land here once
> the other issues are hammered out.
>
> >2. I'm not sure I get it completely. Let's assume we have a source
>
> >partition that is finished before the first checkpoint. Then, we would need
> >to store the finished state of the subtask somehow. So I'm assuming, we
> >still need to trigger some checkpointing code on finished subtasks.
>
> What he's talking about here is the race condition between a) checkpoint
> coordinator decides to do a checkpoint and b) a source operator shuts
> down.
>
> Normally, the checkpoint coordinator only needs to trigger sources, and
> not intermediate operators. When we allow sources to shut down,
> intermediate operators now can become the "head" of a pipeline and
> become the things that need to be triggered.
>
> One thought here is this: will there ever be intermediate operators that
> should be running that are not connected to at least once source? The
> only case I can think of right now is async I/O. Or are there others? If
> we think that there will never be intermediate operators that are not
> connected to at least once source we might come up with a simpler
> solution.
>
> >3. Do we really want to store the finished flag in OperatorState? I was
> >assuming we want to have it more fine-grained on OperatorSubtaskState.
> >Maybe we can store the flag inside managed or raw state without changing
> >the format?
>
> I think we cannot store it in `OperatorSubtaskState` because of how
> operator state (the actual `ListState` that operators use) is reshuffled
> on restore to all operators. So normally it doesn't make sense to say
> that one of the subtasks is done when operator state is involved. Only
> when all subtasks are done can we record this operator as done, I think.
>
> Best,
> Aljoscha
>
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to