I think the issue we're hitting is how to write this in Beam. Dataflow historically guaranteed checkpointing at every GBK (which due to the design of Dataflow's streaming shuffle was reasonably efficient). In Beam we never formalized these semantics, leaving these syncs in a gray area. I believe the Spark runner currently checkpoints the RDD on every GBK, so these unwritten semantics currently work for Dataflow and for Spark.
We need someway to express this operation in Beam, whether it be via an explicit Checkpoint() operation or via marking DoFns as having side effects, and having the runner automatically insert such a Checkpoint in front of them. In Flink, this operation can be implemented using what Aljoscha posted. Reuven On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek <[email protected]> wrote: > Hi, > > In Flink, there is a TwoPhaseCommit SinkFunction that can be used for such > cases: [1]. The PR for a Kafka 0.11 exactly once producer builds on that: > [2] > > Best, > Aljoscha > > [1] https://github.com/apache/flink/blob/62e99918a45b7215c099fbcf160d45 > aa02d4559e/flink-streaming-java/src/main/java/org/apache/ > flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java#L55 < > https://github.com/apache/flink/blob/62e99918a45b7215c099fbcf160d45 > aa02d4559e/flink-streaming-java/src/main/java/org/apache/ > flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java#L55> > [2] https://github.com/apache/flink/pull/4239 > > On 3. Aug 2017, at 04:03, Raghu Angadi <[email protected]> > wrote: > > > > Kafka 0.11 added support for transactions[1], which allows end-to-end > > exactly-once semantics. Beam's KafkaIO users can benefit from these while > > using runners that support exactly-once processing. > > > > I have an implementation of EOS support for Kafka sink : > > https://github.com/apache/beam/pull/3612 > > It has two shuffles and builds on Beam state-API and checkpoint barrier > > between stages (as in Dataflow). Pull request has a longer description. > > > > - What other runners in addition to Dataflow would be compatible with > such > > a strategy? > > - I think it does not quite work for Flink (as it has a global > checkpoint, > > not between the stages). How would one go about implementing such a sink. > > > > Any comments on the pull request are also welcome. > > > > Thanks, > > Raghu. > > > > [1] > > https://www.confluent.io/blog/exactly-once-semantics-are- > possible-heres-how-apache-kafka-does-it/ > >
