Hi all!

I am about to merge the Pull Request from Gyula, Paris, Marton about the
streaming Fault Tolerance. Nice work, guys!

There are a few things we need to do as a followup in my opinion:


--- State Handling ---

The state handling of operators and the triggering of checkpoints should be
separate, in my opinion. We can also use state backups in the Batch API
(once per superstep for stateful UDFs).

The state would be something that operators can send back independent of an
acknowledged checkpoint barrier or as part of one. In any case, it is
something simply stored in the execution independent of the streaming state
monitor.

The streaming state monitor would simply trigger barriers and wait for
acknowledgements and select which version of the state is the currently
committed one.


--- Checkpoint Coordination ---

  - The CheckpointMonitor needs to know from which tasks it needs a
confirmation of the checkpoint before a checkpoint is committed. This
includes always sources and sinks, but also the other stateful tasks.

  - The checkpoint monitor should have a timeout after which it discards a
checkpoint when not all tasks have confirmed the checkpoint in this
interval. This should safeguard us against hanging on incomplete
checkpoints (which can always occur in case of failures while a checkpoint
is in progress, or in case where a source has not been deployed in time and
misses the first checkpoint trigger)

  - Once too many successive checkpoints time out, we have a hard failure.

  - Can we shut down / suspend the monitor for the times between job graph
failing and restart? Will safe us potential checkpoint timeouts due to some
tasks being deployed and others not yet.


--- Modes ---

  - Instead of having the JobGraph is a mode "streaming", can we have a
flag (and interval) that say checkpointed ? There may be streaming jobs
that want to run without the checkpointing (when throughput matters more
than state and data loss, for approximate computation).

In that case, restart would not have access to any state and would simply
start consuming stream sources again whereever something is available.


--- Timestamping ---

  - In general, timestamps will probably have to be assigned at the sources

  - Barriers should have timestamps and act as watermarks for the
timestamps to allow any
    operators to process its windows before this timestamp.


--- Barriers ---

  - I think it would help general debuggability / fail fast behavior if we
could mark events as "require pipelined" and fail a batch execution when we
encounter them. Just a safeguard to detect misconfigurations in the
Channels early.



I think out of these changes the ones in "Checkpoint Coordination" are
probably the most important ones.


Greetings,
Stephan

Reply via email to