On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi <ang...@gmail.com> wrote:
> > Now why does the Flink Runner not support KafkaIO EOS? Flink's native >> KafkaProducer supports exactly-once. It simply commits the pending >> transaction once it has completed a checkpoint. > > > > On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels <m...@apache.org> wrote: > >> Hi, >> >> I came across KafkaIO's Runner whitelist [1] for enabling exactly-once >> semantics (EOS). I think it is questionable to exclude Runners from >> inside a transform, but I see that the intention was to save users from >> surprises. >> >> Now why does the Flink Runner not support KafkaIO EOS? Flink's native >> KafkaProducer supports exactly-once. It simply commits the pending >> transaction once it has completed a checkpoint. >> > > > When we discussed this in Aug 2017, the understanding was that 2 Phase > commit utility in Flink used to implement Flink's Kafka EOS could not be > implemented in Beam's context. > See this message > <https://www.mail-archive.com/dev@beam.apache.org/msg02664.html> in that > dev thread. Has anything changed in this regard? The whole thread is > relevant to this topic and worth going through. > I think that TwoPhaseCommit utility class wouldn't work. The Flink runner would probably want to directly use notifySnapshotComplete in order to implement @RequiresStableInput. > > >> >> A checkpoint is realized by sending barriers through all channels >> starting from the source until reaching all sinks. Every operator >> persists its state once it has received a barrier on all its input >> channels, it then forwards it to the downstream operators. >> >> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]: >> >> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds -> >> GroupByKey -> ExactlyOnceWriter >> >> As I understood, Spark or Dataflow use the GroupByKey stages to persist >> the input. That is not required in Flink to be able to take a consistent >> snapshot of the pipeline. >> >> Basically, for Flink we don't need any of that magic that KafkaIO does. >> What we would need to support EOS is a way to tell the ExactlyOnceWriter >> (a DoFn) to commit once a checkpoint has completed. > > I know that the new version of SDF supports checkpointing which should >> solve this issue. But there is still a lot of work to do to make this >> reality. >> > > I don't see how SDF solves this problem.. May be pseudo code would make > more clear. But if helps, that is great! > > So I think it would make sense to think about a way to make KafkaIO's >> EOS more accessible to Runners which support a different way of >> checkpointing. >> > > Absolutely. I would love to support EOS in KakaIO for Flink. I think that > will help many future exactly-once sinks.. and address fundamental > incompatibility between Beam model and Flink's horizontal checkpointing for > such applications. > > Raghu. > > >> Cheers, >> Max >> >> PS: I found this document about RequiresStableInput [3], but IMHO >> defining an annotation only manifests the conceptual difference between >> the Runners. >> >> >> [1] >> >> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144 >> [2] >> >> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166 >> [3] >> >> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM >> >