On 2021/01/06 11:30, Arvid Heise wrote:
I'm assuming that this is the normal case. In a A->B graph, as soon as A
finishes, B still has a couple of input buffers to process. If you add
backpressure or longer pipelines into the mix, it's quite likely that a
checkpoint may occur with B being the head.

Ahh, I think I know what you mean. This can happen when the checkpoint coordinator issues concurrent checkpoint without waiting for older ones to finish. My head is mostly operating under the premise that there is at most one concurrent checkpoint.

In the current code base the race conditions that Yun and I are talking about cannot occur. Checkpoints can only be triggered at sources and they will then travel through the graph. Intermediate operators are never directly triggered from the JobManager/CheckpointCoordinator.

When source start to shut down, the JM has to directly inject/trigger checkpoints at the now new "sources" of the graph, which have previously been intermediate operators.

I want to repeat that I have a suspicion that maybe this is a degenerate case and we never want to allow operators to be doing checkpoints when they are not connected to at least one running source. Which means that we have to find a solution for declined checkpoints, missing sources.

I'll first show an example where I think we will never have intermediate operators running without the sources being running:

Source -> Map -> Sink

Here, when the Source does its final checkpoint and then shuts down, that same final checkpoint would travel downstream ahead of the EOF, which would in turn cause Map and Sink to also shut down. *We can't have the case that Map is still running when we want to take a checkpoint and Source is not running*.

A similar case is this one:

Source1 --+
          |->Map -> Sink
Source2 --+

Here, if Source1 is finished but Source2 is not, Map is still connected to at least one upstream source that is still running. Again. Map would never be running and doing checkpoints if neither of Source1 or Source2 are online.

The cases I see where intermediate operators would keep running despite not being connected to any upstream operators are when we purposefully keep an operator online despite all inputs having seen EOF. One example is async I/O, another is what Yun mentioned where a sink might want to wait for another checkpoint to confirm some data. Example:

Source -> Async I/O -> Sink

Here, Async I/O will stay online as long as there are some scheduled requests outstanding, even when the Source has shut down. In those cases, the checkpoint coordinator would have to trigger new checkpoints at Async I/O and not Source, because it has become the new "head" of the graph.

For Async I/O at least, we could say that the operator will wait for all outstanding requests to finish before it allows the final checkpoint and passes the barrier forward.

Best,
Aljoscha

Reply via email to