On Thu, Aug 10, 2017 at 5:15 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Ah, also regarding your earlier mail: I didn't know if many people were
> using Kafka with Dataflow, thanks for that clarification! :-)
>
> Also, I don't think that the TwoPhaseCommit Sink of Flink would work in a
> Beam context, I was just posting that for reference.
>

Yep. It was pretty useful to understand Flink checkpoint interactions with
operators. Will look into more in future when we want to add exactly-once
support for KafkaIO with on Flink.


> Best,
> Aljoscha
> > On 10. Aug 2017, at 11:13, Aljoscha Krettek <aljos...@apache.org> wrote:
> >
> > @Raghu: Yes, exactly, that's what I thought about this morning,
> actually. These are the methods of an operator that are relevant to
> checkpointing:
> >
> > class FlinkOperator() {
> >  open();
> >  snapshotState():
> >  notifySnapshotComplete();
> >  initializeState();
> > }
> >
> > Input would be buffered in state, would be checkpointed in
> snapshotState() and processed when we receive a notification of a complete
> checkpoint (which is sent out once all operators have signaled that
> checkpointing is complete). In case of failure, we would be re-initialized
> with the buffered elements in initializeState() and could re-process them
> in open().
> >
> > This is somewhat expensive and leads to higher latency so we should only
> do it if the DoFn signals that it needs deterministic input.
> >
> > +Jingsong Who is working on something similar for the output produced in
> finishBundle().
> >
> >> On 9. Aug 2017, at 19:41, Raghu Angadi <rang...@google.com.INVALID>
> wrote:
> >>
> >> Yep, an option to ensure replays see identical input would be pretty
> useful.
> >> It might be challenging on horizontally checkpointing runners like Flink
> >> (only way I see to buffer all the input in state and replay it after
> >> checkpoint).
> >>
> >> On Wed, Aug 9, 2017 at 10:21 AM, Reuven Lax <re...@google.com.invalid>
> >> wrote:
> >>
> >>> Please see Kenn's proposal. This is a generic thing that is lacking in
> the
> >>> Beam model, and only works today for specific runners. We should fix
> this
> >>> at the Beam level, but I don't think that should block your PR.
> >>>
> >>>
> >>> On Wed, Aug 9, 2017 at 10:10 AM, Raghu Angadi
> <rang...@google.com.invalid>
> >>> wrote:
> >>>
> >>>> There are quite a few customers using KafkaIO with Dataflow. All of
> them
> >>>> are potential users of exactly-once sink. Dataflow Pubsub sink does
> not
> >>>> support EOS yet. Even among those customers, I do expect fraction of
> >>>> applications requiring EOS would be pretty small, that's why I don't
> >>> think
> >>>> extra shuffles are too expensive in overall cost yet.
> >>>>
> >>>> It is also not clear how Flink's 2-phase commit sink function could be
> >>> used
> >>>> in Beam's context. Beam could add some checkpoint semantics to
> state-API
> >>> so
> >>>> that all the runners could support in platform specific way.
> >>>>
> >>>> Took a look at Flink PR, commented on a few issues I see in comments
> >>> there
> >>>> : https://github.com/apache/flink/pull/4239. May be an extra shuffle
> or
> >>>> storing all them messages in state can get over those.
> >>>>
> >>>> On Wed, Aug 9, 2017 at 2:07 AM, Aljoscha Krettek <aljos...@apache.org
> >
> >>>> wrote:
> >>>>
> >>>>> Yes, I think making this explicit would be good. Having a
> >>> transformation
> >>>>> that makes assumptions about how the runner implements certain things
> >>> is
> >>>>> not optimal. Also, I think that most people probably don't use Kafka
> >>> with
> >>>>> the Dataflow Runner (because GCE has Pubsub, but I'm guest guessing
> >>>> here).
> >>>>> This would mean that the intersection of "people who would benefit
> from
> >>>> an
> >>>>> exactly-once Kafka sink" and "people who use Beam on Dataflow" is
> >>> rather
> >>>>> small, and therefore not many people would benefit from such a
> >>> Transform.
> >>>>>
> >>>>> This is all just conjecture, of course.
> >>>>>
> >>>>> Best,
> >>>>> Aljoscha
> >>>>>
> >>>>>> On 8. Aug 2017, at 23:34, Reuven Lax <re...@google.com.INVALID>
> >>> wrote:
> >>>>>>
> >>>>>> I think the issue we're hitting is how to write this in Beam.
> >>>>>>
> >>>>>> Dataflow historically guaranteed checkpointing at every GBK (which
> >>> due
> >>>> to
> >>>>>> the design of Dataflow's streaming shuffle was reasonably
> efficient).
> >>>> In
> >>>>>> Beam we never formalized these semantics, leaving these syncs in a
> >>> gray
> >>>>>> area. I believe the Spark runner currently checkpoints the RDD on
> >>> every
> >>>>>> GBK, so these unwritten semantics currently work for Dataflow and
> for
> >>>>> Spark.
> >>>>>>
> >>>>>> We need someway to express this operation in Beam, whether it be via
> >>> an
> >>>>>> explicit Checkpoint() operation or via marking DoFns as having side
> >>>>>> effects, and having the runner automatically insert such a
> Checkpoint
> >>>> in
> >>>>>> front of them. In Flink, this operation can be implemented using
> what
> >>>>>> Aljoscha posted.
> >>>>>>
> >>>>>> Reuven
> >>>>>>
> >>>>>> On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek <
> >>> aljos...@apache.org>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> In Flink, there is a TwoPhaseCommit SinkFunction that can be used
> >>> for
> >>>>> such
> >>>>>>> cases: [1]. The PR for a Kafka 0.11 exactly once producer builds on
> >>>>> that:
> >>>>>>> [2]
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Aljoscha
> >>>>>>>
> >>>>>>> [1] https://github.com/apache/flink/blob/
> >>>> 62e99918a45b7215c099fbcf160d45
> >>>>>>> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
> >>>>>>> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.
> >>>> java#L55
> >>>>> <
> >>>>>>> https://github.com/apache/flink/blob/
> 62e99918a45b7215c099fbcf160d45
> >>>>>>> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
> >>>>>>> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.
> >>>> java#L55>
> >>>>>>> [2] https://github.com/apache/flink/pull/4239
> >>>>>>>> On 3. Aug 2017, at 04:03, Raghu Angadi <rang...@google.com.INVALID
> >>>>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> Kafka 0.11 added support for transactions[1], which allows
> >>> end-to-end
> >>>>>>>> exactly-once semantics. Beam's KafkaIO users can benefit from
> these
> >>>>> while
> >>>>>>>> using runners that support exactly-once processing.
> >>>>>>>>
> >>>>>>>> I have an implementation of EOS support for Kafka sink :
> >>>>>>>> https://github.com/apache/beam/pull/3612
> >>>>>>>> It has two shuffles and builds on Beam state-API and checkpoint
> >>>> barrier
> >>>>>>>> between stages (as in Dataflow). Pull request has a longer
> >>>> description.
> >>>>>>>>
> >>>>>>>> - What other runners in addition to Dataflow would be compatible
> >>> with
> >>>>>>> such
> >>>>>>>> a strategy?
> >>>>>>>> - I think it does not quite work for Flink (as it has a global
> >>>>>>> checkpoint,
> >>>>>>>> not between the stages). How would one go about implementing such
> a
> >>>>> sink.
> >>>>>>>>
> >>>>>>>> Any comments on the pull request are also welcome.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Raghu.
> >>>>>>>>
> >>>>>>>> [1]
> >>>>>>>> https://www.confluent.io/blog/exactly-once-semantics-are-
> >>>>>>> possible-heres-how-apache-kafka-does-it/
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >
>
>

Reply via email to