On Thu, Aug 10, 2017 at 5:15 AM, Aljoscha Krettek <[email protected]> wrote:
> Ah, also regarding your earlier mail: I didn't know if many people were > using Kafka with Dataflow, thanks for that clarification! :-) > > Also, I don't think that the TwoPhaseCommit Sink of Flink would work in a > Beam context, I was just posting that for reference. > Yep. It was pretty useful to understand Flink checkpoint interactions with operators. Will look into more in future when we want to add exactly-once support for KafkaIO with on Flink. > Best, > Aljoscha > > On 10. Aug 2017, at 11:13, Aljoscha Krettek <[email protected]> wrote: > > > > @Raghu: Yes, exactly, that's what I thought about this morning, > actually. These are the methods of an operator that are relevant to > checkpointing: > > > > class FlinkOperator() { > > open(); > > snapshotState(): > > notifySnapshotComplete(); > > initializeState(); > > } > > > > Input would be buffered in state, would be checkpointed in > snapshotState() and processed when we receive a notification of a complete > checkpoint (which is sent out once all operators have signaled that > checkpointing is complete). In case of failure, we would be re-initialized > with the buffered elements in initializeState() and could re-process them > in open(). > > > > This is somewhat expensive and leads to higher latency so we should only > do it if the DoFn signals that it needs deterministic input. > > > > +Jingsong Who is working on something similar for the output produced in > finishBundle(). > > > >> On 9. Aug 2017, at 19:41, Raghu Angadi <[email protected]> > wrote: > >> > >> Yep, an option to ensure replays see identical input would be pretty > useful. > >> It might be challenging on horizontally checkpointing runners like Flink > >> (only way I see to buffer all the input in state and replay it after > >> checkpoint). > >> > >> On Wed, Aug 9, 2017 at 10:21 AM, Reuven Lax <[email protected]> > >> wrote: > >> > >>> Please see Kenn's proposal. This is a generic thing that is lacking in > the > >>> Beam model, and only works today for specific runners. We should fix > this > >>> at the Beam level, but I don't think that should block your PR. > >>> > >>> > >>> On Wed, Aug 9, 2017 at 10:10 AM, Raghu Angadi > <[email protected]> > >>> wrote: > >>> > >>>> There are quite a few customers using KafkaIO with Dataflow. All of > them > >>>> are potential users of exactly-once sink. Dataflow Pubsub sink does > not > >>>> support EOS yet. Even among those customers, I do expect fraction of > >>>> applications requiring EOS would be pretty small, that's why I don't > >>> think > >>>> extra shuffles are too expensive in overall cost yet. > >>>> > >>>> It is also not clear how Flink's 2-phase commit sink function could be > >>> used > >>>> in Beam's context. Beam could add some checkpoint semantics to > state-API > >>> so > >>>> that all the runners could support in platform specific way. > >>>> > >>>> Took a look at Flink PR, commented on a few issues I see in comments > >>> there > >>>> : https://github.com/apache/flink/pull/4239. May be an extra shuffle > or > >>>> storing all them messages in state can get over those. > >>>> > >>>> On Wed, Aug 9, 2017 at 2:07 AM, Aljoscha Krettek <[email protected] > > > >>>> wrote: > >>>> > >>>>> Yes, I think making this explicit would be good. Having a > >>> transformation > >>>>> that makes assumptions about how the runner implements certain things > >>> is > >>>>> not optimal. Also, I think that most people probably don't use Kafka > >>> with > >>>>> the Dataflow Runner (because GCE has Pubsub, but I'm guest guessing > >>>> here). > >>>>> This would mean that the intersection of "people who would benefit > from > >>>> an > >>>>> exactly-once Kafka sink" and "people who use Beam on Dataflow" is > >>> rather > >>>>> small, and therefore not many people would benefit from such a > >>> Transform. > >>>>> > >>>>> This is all just conjecture, of course. > >>>>> > >>>>> Best, > >>>>> Aljoscha > >>>>> > >>>>>> On 8. Aug 2017, at 23:34, Reuven Lax <[email protected]> > >>> wrote: > >>>>>> > >>>>>> I think the issue we're hitting is how to write this in Beam. > >>>>>> > >>>>>> Dataflow historically guaranteed checkpointing at every GBK (which > >>> due > >>>> to > >>>>>> the design of Dataflow's streaming shuffle was reasonably > efficient). > >>>> In > >>>>>> Beam we never formalized these semantics, leaving these syncs in a > >>> gray > >>>>>> area. I believe the Spark runner currently checkpoints the RDD on > >>> every > >>>>>> GBK, so these unwritten semantics currently work for Dataflow and > for > >>>>> Spark. > >>>>>> > >>>>>> We need someway to express this operation in Beam, whether it be via > >>> an > >>>>>> explicit Checkpoint() operation or via marking DoFns as having side > >>>>>> effects, and having the runner automatically insert such a > Checkpoint > >>>> in > >>>>>> front of them. In Flink, this operation can be implemented using > what > >>>>>> Aljoscha posted. > >>>>>> > >>>>>> Reuven > >>>>>> > >>>>>> On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek < > >>> [email protected]> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi, > >>>>>>> > >>>>>>> In Flink, there is a TwoPhaseCommit SinkFunction that can be used > >>> for > >>>>> such > >>>>>>> cases: [1]. The PR for a Kafka 0.11 exactly once producer builds on > >>>>> that: > >>>>>>> [2] > >>>>>>> > >>>>>>> Best, > >>>>>>> Aljoscha > >>>>>>> > >>>>>>> [1] https://github.com/apache/flink/blob/ > >>>> 62e99918a45b7215c099fbcf160d45 > >>>>>>> aa02d4559e/flink-streaming-java/src/main/java/org/apache/ > >>>>>>> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction. > >>>> java#L55 > >>>>> < > >>>>>>> https://github.com/apache/flink/blob/ > 62e99918a45b7215c099fbcf160d45 > >>>>>>> aa02d4559e/flink-streaming-java/src/main/java/org/apache/ > >>>>>>> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction. > >>>> java#L55> > >>>>>>> [2] https://github.com/apache/flink/pull/4239 > >>>>>>>> On 3. Aug 2017, at 04:03, Raghu Angadi <[email protected] > >>>> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>> Kafka 0.11 added support for transactions[1], which allows > >>> end-to-end > >>>>>>>> exactly-once semantics. Beam's KafkaIO users can benefit from > these > >>>>> while > >>>>>>>> using runners that support exactly-once processing. > >>>>>>>> > >>>>>>>> I have an implementation of EOS support for Kafka sink : > >>>>>>>> https://github.com/apache/beam/pull/3612 > >>>>>>>> It has two shuffles and builds on Beam state-API and checkpoint > >>>> barrier > >>>>>>>> between stages (as in Dataflow). Pull request has a longer > >>>> description. > >>>>>>>> > >>>>>>>> - What other runners in addition to Dataflow would be compatible > >>> with > >>>>>>> such > >>>>>>>> a strategy? > >>>>>>>> - I think it does not quite work for Flink (as it has a global > >>>>>>> checkpoint, > >>>>>>>> not between the stages). How would one go about implementing such > a > >>>>> sink. > >>>>>>>> > >>>>>>>> Any comments on the pull request are also welcome. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Raghu. > >>>>>>>> > >>>>>>>> [1] > >>>>>>>> https://www.confluent.io/blog/exactly-once-semantics-are- > >>>>>>> possible-heres-how-apache-kafka-does-it/ > >>>>>>> > >>>>>>> > >>>>> > >>>>> > >>>> > >>> > > > >
