On 2021/01/06 11:30, Arvid Heise wrote:
I'm assuming that this is the normal case. In a A->B graph, as soon as A
finishes, B still has a couple of input buffers to process. If you add
backpressure or longer pipelines into the mix, it's quite likely that a
checkpoint may occur with B being the head.
Ahh, I think I know what you mean. This can happen when the checkpoint
coordinator issues concurrent checkpoint without waiting for older ones
to finish. My head is mostly operating under the premise that there is
at most one concurrent checkpoint.
In the current code base the race conditions that Yun and I are talking
about cannot occur. Checkpoints can only be triggered at sources and
they will then travel through the graph. Intermediate operators are
never directly triggered from the JobManager/CheckpointCoordinator.
When source start to shut down, the JM has to directly inject/trigger
checkpoints at the now new "sources" of the graph, which have previously
been intermediate operators.
I want to repeat that I have a suspicion that maybe this is a degenerate
case and we never want to allow operators to be doing checkpoints when
they are not connected to at least one running source. Which means that
we have to find a solution for declined checkpoints, missing sources.
I'll first show an example where I think we will never have intermediate
operators running without the sources being running:
Source -> Map -> Sink
Here, when the Source does its final checkpoint and then shuts down,
that same final checkpoint would travel downstream ahead of the EOF,
which would in turn cause Map and Sink to also shut down. *We can't have
the case that Map is still running when we want to take a checkpoint and
Source is not running*.
A similar case is this one:
Source1 --+
|->Map -> Sink
Source2 --+
Here, if Source1 is finished but Source2 is not, Map is still connected
to at least one upstream source that is still running. Again. Map would
never be running and doing checkpoints if neither of Source1 or Source2
are online.
The cases I see where intermediate operators would keep running despite
not being connected to any upstream operators are when we purposefully
keep an operator online despite all inputs having seen EOF. One example
is async I/O, another is what Yun mentioned where a sink might want to
wait for another checkpoint to confirm some data. Example:
Source -> Async I/O -> Sink
Here, Async I/O will stay online as long as there are some scheduled
requests outstanding, even when the Source has shut down. In those
cases, the checkpoint coordinator would have to trigger new checkpoints
at Async I/O and not Source, because it has become the new "head" of the
graph.
For Async I/O at least, we could say that the operator will wait for all
outstanding requests to finish before it allows the final checkpoint and
passes the barrier forward.
Best,
Aljoscha