Ah, also regarding your earlier mail: I didn't know if many people were using Kafka with Dataflow, thanks for that clarification! :-)
Also, I don't think that the TwoPhaseCommit Sink of Flink would work in a Beam context, I was just posting that for reference. Best, Aljoscha > On 10. Aug 2017, at 11:13, Aljoscha Krettek <aljos...@apache.org> wrote: > > @Raghu: Yes, exactly, that's what I thought about this morning, actually. > These are the methods of an operator that are relevant to checkpointing: > > class FlinkOperator() { > open(); > snapshotState(): > notifySnapshotComplete(); > initializeState(); > } > > Input would be buffered in state, would be checkpointed in snapshotState() > and processed when we receive a notification of a complete checkpoint (which > is sent out once all operators have signaled that checkpointing is complete). > In case of failure, we would be re-initialized with the buffered elements in > initializeState() and could re-process them in open(). > > This is somewhat expensive and leads to higher latency so we should only do > it if the DoFn signals that it needs deterministic input. > > +Jingsong Who is working on something similar for the output produced in > finishBundle(). > >> On 9. Aug 2017, at 19:41, Raghu Angadi <rang...@google.com.INVALID> wrote: >> >> Yep, an option to ensure replays see identical input would be pretty useful. >> It might be challenging on horizontally checkpointing runners like Flink >> (only way I see to buffer all the input in state and replay it after >> checkpoint). >> >> On Wed, Aug 9, 2017 at 10:21 AM, Reuven Lax <re...@google.com.invalid> >> wrote: >> >>> Please see Kenn's proposal. This is a generic thing that is lacking in the >>> Beam model, and only works today for specific runners. We should fix this >>> at the Beam level, but I don't think that should block your PR. >>> >>> >>> On Wed, Aug 9, 2017 at 10:10 AM, Raghu Angadi <rang...@google.com.invalid> >>> wrote: >>> >>>> There are quite a few customers using KafkaIO with Dataflow. All of them >>>> are potential users of exactly-once sink. Dataflow Pubsub sink does not >>>> support EOS yet. Even among those customers, I do expect fraction of >>>> applications requiring EOS would be pretty small, that's why I don't >>> think >>>> extra shuffles are too expensive in overall cost yet. >>>> >>>> It is also not clear how Flink's 2-phase commit sink function could be >>> used >>>> in Beam's context. Beam could add some checkpoint semantics to state-API >>> so >>>> that all the runners could support in platform specific way. >>>> >>>> Took a look at Flink PR, commented on a few issues I see in comments >>> there >>>> : https://github.com/apache/flink/pull/4239. May be an extra shuffle or >>>> storing all them messages in state can get over those. >>>> >>>> On Wed, Aug 9, 2017 at 2:07 AM, Aljoscha Krettek <aljos...@apache.org> >>>> wrote: >>>> >>>>> Yes, I think making this explicit would be good. Having a >>> transformation >>>>> that makes assumptions about how the runner implements certain things >>> is >>>>> not optimal. Also, I think that most people probably don't use Kafka >>> with >>>>> the Dataflow Runner (because GCE has Pubsub, but I'm guest guessing >>>> here). >>>>> This would mean that the intersection of "people who would benefit from >>>> an >>>>> exactly-once Kafka sink" and "people who use Beam on Dataflow" is >>> rather >>>>> small, and therefore not many people would benefit from such a >>> Transform. >>>>> >>>>> This is all just conjecture, of course. >>>>> >>>>> Best, >>>>> Aljoscha >>>>> >>>>>> On 8. Aug 2017, at 23:34, Reuven Lax <re...@google.com.INVALID> >>> wrote: >>>>>> >>>>>> I think the issue we're hitting is how to write this in Beam. >>>>>> >>>>>> Dataflow historically guaranteed checkpointing at every GBK (which >>> due >>>> to >>>>>> the design of Dataflow's streaming shuffle was reasonably efficient). >>>> In >>>>>> Beam we never formalized these semantics, leaving these syncs in a >>> gray >>>>>> area. I believe the Spark runner currently checkpoints the RDD on >>> every >>>>>> GBK, so these unwritten semantics currently work for Dataflow and for >>>>> Spark. >>>>>> >>>>>> We need someway to express this operation in Beam, whether it be via >>> an >>>>>> explicit Checkpoint() operation or via marking DoFns as having side >>>>>> effects, and having the runner automatically insert such a Checkpoint >>>> in >>>>>> front of them. In Flink, this operation can be implemented using what >>>>>> Aljoscha posted. >>>>>> >>>>>> Reuven >>>>>> >>>>>> On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek < >>> aljos...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> In Flink, there is a TwoPhaseCommit SinkFunction that can be used >>> for >>>>> such >>>>>>> cases: [1]. The PR for a Kafka 0.11 exactly once producer builds on >>>>> that: >>>>>>> [2] >>>>>>> >>>>>>> Best, >>>>>>> Aljoscha >>>>>>> >>>>>>> [1] https://github.com/apache/flink/blob/ >>>> 62e99918a45b7215c099fbcf160d45 >>>>>>> aa02d4559e/flink-streaming-java/src/main/java/org/apache/ >>>>>>> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction. >>>> java#L55 >>>>> < >>>>>>> https://github.com/apache/flink/blob/62e99918a45b7215c099fbcf160d45 >>>>>>> aa02d4559e/flink-streaming-java/src/main/java/org/apache/ >>>>>>> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction. >>>> java#L55> >>>>>>> [2] https://github.com/apache/flink/pull/4239 >>>>>>>> On 3. Aug 2017, at 04:03, Raghu Angadi <rang...@google.com.INVALID >>>> >>>>>>> wrote: >>>>>>>> >>>>>>>> Kafka 0.11 added support for transactions[1], which allows >>> end-to-end >>>>>>>> exactly-once semantics. Beam's KafkaIO users can benefit from these >>>>> while >>>>>>>> using runners that support exactly-once processing. >>>>>>>> >>>>>>>> I have an implementation of EOS support for Kafka sink : >>>>>>>> https://github.com/apache/beam/pull/3612 >>>>>>>> It has two shuffles and builds on Beam state-API and checkpoint >>>> barrier >>>>>>>> between stages (as in Dataflow). Pull request has a longer >>>> description. >>>>>>>> >>>>>>>> - What other runners in addition to Dataflow would be compatible >>> with >>>>>>> such >>>>>>>> a strategy? >>>>>>>> - I think it does not quite work for Flink (as it has a global >>>>>>> checkpoint, >>>>>>>> not between the stages). How would one go about implementing such a >>>>> sink. >>>>>>>> >>>>>>>> Any comments on the pull request are also welcome. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Raghu. >>>>>>>> >>>>>>>> [1] >>>>>>>> https://www.confluent.io/blog/exactly-once-semantics-are- >>>>>>> possible-heres-how-apache-kafka-does-it/ >>>>>>> >>>>>>> >>>>> >>>>> >>>> >>> >