For 2) the race condition, I was more thinking of still injecting the barrier at the source in all cases, but having some kind of short-cut to immediately execute the RPC inside the respective taskmanager. However, that may prove hard in case of dynamic scale-ins. Nevertheless, because of this race condition, we should still take some time to think about it as it effectively means we need to support handling a barrier in a finished task anyways. Maybe a finished task is still assigned to a TM with JM as a fallback?
For your question: will there ever be intermediate operators that should be running that are not connected to at least once source? I think there are plenty of examples if you go beyond chained operators and fully connected exchanges. Think of any fan-in, let's assume you have source S1...S4, with S1+S2->M1, and S3+S4->M2. If S1 is finished, S2 and M1 is still running. Or I didn't get your question ;). On Tue, Jan 5, 2021 at 5:00 PM Yun Gao <yungao...@aliyun.com> wrote: > Hi Aljoscha, > > Very thanks for the feedbacks! > > For the second issue, I'm indeed thinking the race condition > between deciding to trigger and operator get finished. And for this point, > > > > One thought here is this: will there ever be intermediate operators that > > should be running that are not connected to at least once source? The > > only case I can think of right now is async I/O. Or are there others? If > > we think that there will never be intermediate operators that are not > > connected to at least once source we might come up with a simpler > > solution. > > I think there are still cases that the intermediate operators runs > with all its sources have finished, for example, source -> sink writer -> > sink committer -> sink global committer, since sink committer need to wait > for one more checkpoint between endOfInput and close, > it would continue to run after the source and sink writer are finished, > until we could finish one checkpoint. And since the four operators could > also be chained in one task, we may also need to consider the case that > part of operators are finished when taking snapshot in > of the tasks. > > Best, > Yun > > > ------------------------------------------------------------------ > From:Aljoscha Krettek <aljos...@apache.org> > Send Time:2021 Jan. 5 (Tue.) 22:34 > To:dev <dev@flink.apache.org> > Cc:Yun Gao <yungao...@aliyun.com> > Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished > > On 2021/01/05 10:16, Arvid Heise wrote: > >1. I'd think that this is an orthogonal issue, which I'd solve separately. > >My gut feeling says that this is something we should only address for new > >sinks where we decouple the semantics of commits and checkpoints > >anyways. @Aljoscha > >Krettek <aljos...@apache.org> any idea on this one? > > I also think it's somewhat orthogonal, let's see where we land here once > the other issues are hammered out. > > >2. I'm not sure I get it completely. Let's assume we have a source > > >partition that is finished before the first checkpoint. Then, we would need > >to store the finished state of the subtask somehow. So I'm assuming, we > >still need to trigger some checkpointing code on finished subtasks. > > What he's talking about here is the race condition between a) checkpoint > coordinator decides to do a checkpoint and b) a source operator shuts > down. > > Normally, the checkpoint coordinator only needs to trigger sources, and > not intermediate operators. When we allow sources to shut down, > intermediate operators now can become the "head" of a pipeline and > become the things that need to be triggered. > > One thought here is this: will there ever be intermediate operators that > should be running that are not connected to at least once source? The > only case I can think of right now is async I/O. Or are there others? If > we think that there will never be intermediate operators that are not > connected to at least once source we might come up with a simpler > solution. > > >3. Do we really want to store the finished flag in OperatorState? I was > >assuming we want to have it more fine-grained on OperatorSubtaskState. > >Maybe we can store the flag inside managed or raw state without changing > >the format? > > I think we cannot store it in `OperatorSubtaskState` because of how > operator state (the actual `ListState` that operators use) is reshuffled > on restore to all operators. So normally it doesn't make sense to say > that one of the subtasks is done when operator state is involved. Only > when all subtasks are done can we record this operator as done, I think. > > Best, > Aljoscha > > > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng