Hi all! I am about to merge the Pull Request from Gyula, Paris, Marton about the streaming Fault Tolerance. Nice work, guys!
There are a few things we need to do as a followup in my opinion: --- State Handling --- The state handling of operators and the triggering of checkpoints should be separate, in my opinion. We can also use state backups in the Batch API (once per superstep for stateful UDFs). The state would be something that operators can send back independent of an acknowledged checkpoint barrier or as part of one. In any case, it is something simply stored in the execution independent of the streaming state monitor. The streaming state monitor would simply trigger barriers and wait for acknowledgements and select which version of the state is the currently committed one. --- Checkpoint Coordination --- - The CheckpointMonitor needs to know from which tasks it needs a confirmation of the checkpoint before a checkpoint is committed. This includes always sources and sinks, but also the other stateful tasks. - The checkpoint monitor should have a timeout after which it discards a checkpoint when not all tasks have confirmed the checkpoint in this interval. This should safeguard us against hanging on incomplete checkpoints (which can always occur in case of failures while a checkpoint is in progress, or in case where a source has not been deployed in time and misses the first checkpoint trigger) - Once too many successive checkpoints time out, we have a hard failure. - Can we shut down / suspend the monitor for the times between job graph failing and restart? Will safe us potential checkpoint timeouts due to some tasks being deployed and others not yet. --- Modes --- - Instead of having the JobGraph is a mode "streaming", can we have a flag (and interval) that say checkpointed ? There may be streaming jobs that want to run without the checkpointing (when throughput matters more than state and data loss, for approximate computation). In that case, restart would not have access to any state and would simply start consuming stream sources again whereever something is available. --- Timestamping --- - In general, timestamps will probably have to be assigned at the sources - Barriers should have timestamps and act as watermarks for the timestamps to allow any operators to process its windows before this timestamp. --- Barriers --- - I think it would help general debuggability / fail fast behavior if we could mark events as "require pipelined" and fail a batch execution when we encounter them. Just a safeguard to detect misconfigurations in the Channels early. I think out of these changes the ones in "Checkpoint Coordination" are probably the most important ones. Greetings, Stephan