BTW - as a followup - there is a cost to having a Flink-specific override
for the Kafka sink. Part of that is test coverage - users who write
DirectRunner tests for their pipeline will now be using a different version
of the code than is used on the actual Flink runner. It also makes the code
less obvious: people who read the KafkaIO code will tend not to realize
that Flink is running something a bit different, and this can lead to
confusion.

Reuven

On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax <re...@google.com> wrote:

> RE: Kenn's suggestion. i think Raghu looked into something that, and
> something about it didn't work. I don't remember all the details, but I
> think there might have been some subtle problem with it that wasn't
> obvious. Doesn't mean that there isn't another way to solve that issue.
>
> Hopefully we can make that work. Another possibility if we can't is to do
> something special for Flink. Beam allows runners to splice out well-known
> transforms with their own implementation. Dataflow already does that for
> Google Cloud Pub/Sub sources/sinks. The Flink runner could splice out the
> Kafka sink with one that uses Flink-specific functionality.  Ideally this
> would reuse most of the existing Kafka code (maybe we could refactor just
> the EOS part into something that could be subbed out).
>
> Reuven
>
> On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels <m...@apache.org> wrote:
>
>> > It would be interesting to see if there's something we could add to the
>> Beam model that would create a better story for Kafka's EOS writes.
>>
>> There would have to be a checkpoint-completed callback the DoFn can
>> register with the Runner. Does not seem applicable for most Runners
>> though.
>>
>> > This is true, however isn't it already true for such uses of Flink?
>>
>> Yes, that's correct. In the case of Kafka, Flink can offload the
>> buffering but for the general case, idempotent writes are only possible
>> if we buffer data until the checkpoint is completed.
>>
>> On 04.03.19 17:45, Reuven Lax wrote:
>> >
>> >
>> > On Mon, Mar 4, 2019 at 6:55 AM Maximilian Michels <m...@apache.org
>> > <mailto:m...@apache.org>> wrote:
>> >
>> >      > Can we do 2? I seem to remember that we had trouble in some cases
>> >     (e..g in the BigQuery case, there was no obvious way to create a
>> >     deterministic id, which is why we went for a random number followed
>> >     by a reshuffle). Also remember that the user ParDo that is producing
>> >     data to the sink is not guaranteed to be deterministic; the Beam
>> >     model allows for non-deterministic transforms.
>> >
>> >     I believe we could use something like the worker id to make it
>> >     deterministic, though the worker id can change after a restart. We
>> >     could
>> >     persist it in Flink's operator state. I do not know if we can come
>> up
>> >     with a Runner-independent solution.
>> >
>> >
>> > If we did this, we would break it on runners that don't have a concept
>> > of a stable worker id :( The Dataflow runner can load balance work at
>> > any time (including moving work around between workers).
>> >
>> >
>> >      > I'm not quite sure I understand. If a ParDo is marked with
>> >     RequiresStableInput, can't the flink runner buffer the input message
>> >     until after the checkpoint is complete and only then deliver it to
>> >     the ParDo?
>> >
>> >     You're correct. I thought that it could suffice to only buffer
>> during a
>> >     checkpoint and otherwise rely on the deterministic execution of the
>> >     pipeline and KafkaIO's de-duplication code.
>> >
>> >
>> > Yes, I want to distinguish the KafkaIO case from the general case. It
>> > would be interesting to see if there's something we could add to the
>> > Beam model that would create a better story for Kafka's EOS writes.
>> >
>> >
>> >     In any case, emitting only after finalization of checkpoints gives
>> us
>> >     guaranteed stable input. It also means that the processing is tight
>> to
>> >     the checkpoint interval, the checkpoint duration, and the available
>> >     memory.
>> >
>> >
>> > This is true, however isn't it already true for such uses of Flink?
>> >
>> >
>> >     On 01.03.19 19:41, Reuven Lax wrote:
>> >      >
>> >      >
>> >      > On Fri, Mar 1, 2019 at 10:37 AM Maximilian Michels
>> >     <m...@apache.org <mailto:m...@apache.org>
>> >      > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>> >      >
>> >      >     Fully agree. I think we can improve the situation
>> >     drastically. For
>> >      >     KafkaIO EOS with Flink we need to make these two changes:
>> >      >
>> >      >     1) Introduce buffering while the checkpoint is being taken
>> >      >     2) Replace the random shard id assignment with something
>> >     deterministic
>> >      >
>> >      >
>> >      > Can we do 2? I seem to remember that we had trouble in some cases
>> >     (e..g
>> >      > in the BigQuery case, there was no obvious way to create a
>> >     deterministic
>> >      > id, which is why we went for a random number followed by a
>> >     reshuffle).
>> >      > Also remember that the user ParDo that is producing data to the
>> >     sink is
>> >      > not guaranteed to be deterministic; the Beam model allows for
>> >      > non-deterministic transforms.
>> >      >
>> >      >
>> >      >     However, we won't be able to provide full compatibility with
>> >      >     RequiresStableInput because Flink only guarantees stable
>> >     input after a
>> >      >     checkpoint. RequiresStableInput requires input at any point
>> >     in time to
>> >      >     be stable.
>> >      >
>> >      >
>> >      > I'm not quite sure I understand. If a ParDo is marked with
>> >      > RequiresStableInput, can't the flink runner buffer the input
>> message
>> >      > until after the checkpoint is complete and only then deliver it
>> >     to the
>> >      > ParDo? This adds latency of course, but I'm not sure how else to
>> do
>> >      > things correctly with the Beam model.
>> >      >
>> >      >     IMHO the only way to achieve that is materializing output
>> >      >     which Flink does not currently support.
>> >      >
>> >      >     KafkaIO does not need all the power of RequiresStableInput to
>> >     achieve
>> >      >     EOS with Flink, but for the general case I don't see a good
>> >     solution at
>> >      >     the moment.
>> >      >
>> >      >     -Max
>> >      >
>> >      >     On 01.03.19 16:45, Reuven Lax wrote:
>> >      >      > Yeah, the person who was working on it originally stopped
>> >     working on
>> >      >      > Beam, and nobody else ever finished it. I think it is
>> >     important to
>> >      >      > finish though. Many of the existing Sinks are only fully
>> >     correct for
>> >      >      > Dataflow today, because they generate either Reshuffle or
>> >      >     GroupByKey to
>> >      >      > ensure input stability before outputting (in many cases
>> >     this code
>> >      >     was
>> >      >      > inherited from before Beam existed). On Flink today, these
>> >     sinks
>> >      >     might
>> >      >      > occasionally produce duplicate output in the case of
>> failures.
>> >      >      >
>> >      >      > Reuven
>> >      >      >
>> >      >      > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels
>> >     <m...@apache.org <mailto:m...@apache.org>
>> >      >     <mailto:m...@apache.org <mailto:m...@apache.org>>
>> >      >      > <mailto:m...@apache.org <mailto:m...@apache.org>
>> >     <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
>> >      >      >
>> >      >      >     Circling back to the RequiresStableInput
>> >     annotation[1]. I've
>> >      >     done some
>> >      >      >     protoyping to see how this could be integrated into
>> >     Flink. I'm
>> >      >      >     currently
>> >      >      >     writing a test based on RequiresStableInput.
>> >      >      >
>> >      >      >     I found out there are already checks in place at the
>> >     Runners to
>> >      >      >     throw in
>> >      >      >     case transforms use RequiresStableInput and its not
>> >      >     supported. However,
>> >      >      >     not a single transform actually uses the annotation.
>> >      >      >
>> >      >      >     It seems that the effort stopped at some point? Would
>> >     it make
>> >      >     sense to
>> >      >      >     start annotating KafkaExactlyOnceSink with
>> >      >     @RequiresStableInput? We
>> >      >      >     could then get rid of the whitelist.
>> >      >      >
>> >      >      >     -Max
>> >      >      >
>> >      >      >     [1]
>> >      >      >
>> >      >
>> >
>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>> >      >      >
>> >      >      >
>> >      >      >
>> >      >      >     On 01.03.19 14:28, Maximilian Michels wrote:
>> >      >      >      > Just realized that transactions do not spawn
>> multiple
>> >      >     elements in
>> >      >      >      > KafkaExactlyOnceSink. So the proposed solution to
>> stop
>> >      >     processing
>> >      >      >      > elements while a snapshot is pending would work.
>> >      >      >      >
>> >      >      >      > It is certainly not optimal in terms of
>> performance for
>> >      >     Flink and
>> >      >      >     poses
>> >      >      >      > problems when checkpoints take long to complete,
>> but it
>> >      >     would be
>> >      >      >      > worthwhile to implement this to make use of the EOS
>> >     feature.
>> >      >      >      >
>> >      >      >      > Thanks,
>> >      >      >      > Max
>> >      >      >      >
>> >      >      >      > On 01.03.19 12:23, Maximilian Michels wrote:
>> >      >      >      >> Thanks you for the prompt replies. It's great to
>> >     see that
>> >      >     there is
>> >      >      >      >> good understanding of how EOS in Flink works.
>> >      >      >      >>
>> >      >      >      >>> This is exactly what RequiresStableInput is
>> >     supposed to
>> >      >     do. On the
>> >      >      >      >>> Flink runner, this would be implemented by
>> delaying
>> >      >     processing
>> >      >      >     until
>> >      >      >      >>> the current checkpoint is done.
>> >      >      >      >>
>> >      >      >      >> I don't think that works because we have no
>> >     control over
>> >      >     the Kafka
>> >      >      >      >> transactions. Imagine:
>> >      >      >      >>
>> >      >      >      >> 1) ExactlyOnceWriter writes records to Kafka and
>> >     commits,
>> >      >     then
>> >      >      >     starts
>> >      >      >      >> a new transaction.
>> >      >      >      >> 2) Flink checkpoints, delaying the processing of
>> >      >     elements, the
>> >      >      >      >> checkpoint fails.
>> >      >      >      >> 3) We restore from an old checkpoint and will
>> >     start writing
>> >      >      >     duplicate
>> >      >      >      >> data to Kafka. The de-duplication that the sink
>> >     performs
>> >      >     does not
>> >      >      >      >> help, especially because the random shards ids
>> >     might be
>> >      >     assigned
>> >      >      >      >> differently.
>> >      >      >      >>
>> >      >      >      >> IMHO we have to have control over commit to be
>> able to
>> >      >     provide EOS.
>> >      >      >      >>
>> >      >      >      >>> When we discussed this in Aug 2017, the
>> understanding
>> >      >     was that 2
>> >      >      >      >>> Phase commit utility in Flink used to implement
>> >     Flink's
>> >      >     Kafka EOS
>> >      >      >      >>> could not be implemented in Beam's context.
>> >      >      >      >>
>> >      >      >      >> That's also my understanding, unless we change the
>> >     interface.
>> >      >      >      >>
>> >      >      >      >>> I don't see how SDF solves this problem..
>> >      >      >      >>
>> >      >      >      >> SDF has a checkpoint method which the Runner can
>> call,
>> >      >     but I think
>> >      >      >      >> that you are right, that the above problem would
>> >     be the same.
>> >      >      >      >>
>> >      >      >      >>> Absolutely. I would love to support EOS in
>> KakaIO for
>> >      >     Flink. I
>> >      >      >     think
>> >      >      >      >>> that will help many future exactly-once sinks..
>> >     and address
>> >      >      >      >>> fundamental incompatibility between Beam model
>> >     and Flink's
>> >      >      >     horizontal
>> >      >      >      >>> checkpointing for such applications.
>> >      >      >      >>
>> >      >      >      >> Great :)
>> >      >      >      >>
>> >      >      >      >>> The FlinkRunner would need to insert the "wait
>> until
>> >      >     checkpoint
>> >      >      >      >>> finalization" logic wherever it sees
>> >     @RequiresStableInput,
>> >      >      >     which is
>> >      >      >      >>> already what it would have to do.
>> >      >      >      >>
>> >      >      >      >> I don't think that fixes the problem. See above
>> >     example.
>> >      >      >      >>
>> >      >      >      >> Thanks,
>> >      >      >      >> Max
>> >      >      >      >>
>> >      >      >      >> On 01.03.19 00:04, Raghu Angadi wrote:
>> >      >      >      >>>
>> >      >      >      >>>
>> >      >      >      >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi
>> >      >     <ang...@gmail.com <mailto:ang...@gmail.com>
>> >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
>> >      >      >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
>> >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
>> >      >      >      >>> <mailto:ang...@gmail.com
>> >     <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
>> >     <mailto:ang...@gmail.com>>
>> >      >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
>> >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>> wrote:
>> >      >      >      >>>
>> >      >      >      >>>     On Thu, Feb 28, 2019 at 2:34 PM Kenneth
>> Knowles
>> >      >      >     <k...@apache.org <mailto:k...@apache.org>
>> >     <mailto:k...@apache.org <mailto:k...@apache.org>>
>> >      >     <mailto:k...@apache.org <mailto:k...@apache.org>
>> >     <mailto:k...@apache.org <mailto:k...@apache.org>>>
>> >      >      >      >>>     <mailto:k...@apache.org
>> >     <mailto:k...@apache.org> <mailto:k...@apache.org
>> >     <mailto:k...@apache.org>>
>> >      >     <mailto:k...@apache.org <mailto:k...@apache.org>
>> >     <mailto:k...@apache.org <mailto:k...@apache.org>>>>> wrote:
>> >      >      >      >>>
>> >      >      >      >>>         I'm not sure what a hard fail is. I
>> probably
>> >      >     have a shallow
>> >      >      >      >>>         understanding, but doesn't
>> >     @RequiresStableInput work
>> >      >      >     for 2PC?
>> >      >      >      >>>         The preCommit() phase should establish
>> the
>> >      >     transaction and
>> >      >      >      >>>         commit() is not called until after
>> checkpoint
>> >      >      >     finalization. Can
>> >      >      >      >>>         you describe the way that it does not
>> work a
>> >      >     little bit
>> >      >      >     more?
>> >      >      >      >>>
>> >      >      >      >>>
>> >      >      >      >>>     - preCommit() is called before checkpoint.
>> >     Kafka EOS in
>> >      >      >     Flink starts
>> >      >      >      >>>     the transaction before this and makes sure it
>> >      >     flushes all
>> >      >      >     records in
>> >      >      >      >>>     preCommit(). So far good.
>> >      >      >      >>>     - commit is called after checkpoint is
>> persisted.
>> >      >     Now, imagine
>> >      >      >      >>>     commit() fails for some reason. There is no
>> >     option
>> >      >     to rerun
>> >      >      >     the 1st
>> >      >      >      >>>     phase to write the records again in a new
>> >      >     transaction. This
>> >      >      >     is a
>> >      >      >      >>>     hard failure for the the job. In practice
>> >     Flink might
>> >      >      >     attempt to
>> >      >      >      >>>     commit again (not sure how many times),
>> which is
>> >      >     likely to
>> >      >      >     fail and
>> >      >      >      >>>     eventually results in job failure.
>> >      >      >      >>>
>> >      >      >      >>>
>> >      >      >      >>> In Apache Beam, the records could be stored in
>> state,
>> >      >     and can be
>> >      >      >      >>> written inside commit() to work around this
>> issue. It
>> >      >     could have
>> >      >      >      >>> scalability issues if checkpoints are not
>> frequent
>> >      >     enough in Flink
>> >      >      >      >>> runner.
>> >      >      >      >>>
>> >      >      >      >>> Raghu.
>> >      >      >      >>>
>> >      >      >      >>>
>> >      >      >      >>>
>> >      >      >      >>>         Kenn
>> >      >      >      >>>
>> >      >      >      >>>         On Thu, Feb 28, 2019 at 1:25 PM Raghu
>> Angadi
>> >      >      >     <ang...@gmail.com <mailto:ang...@gmail.com>
>> >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
>> >      >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
>> >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
>> >      >      >      >>>         <mailto:ang...@gmail.com
>> >     <mailto:ang...@gmail.com>
>> >      >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
>> >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
>> >      >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>>
>> wrote:
>> >      >      >      >>>
>> >      >      >      >>>             On Thu, Feb 28, 2019 at 11:01 AM
>> >     Kenneth Knowles
>> >      >      >      >>>             <k...@apache.org
>> >     <mailto:k...@apache.org> <mailto:k...@apache.org
>> >     <mailto:k...@apache.org>>
>> >      >     <mailto:k...@apache.org <mailto:k...@apache.org>
>> >     <mailto:k...@apache.org <mailto:k...@apache.org>>>
>> >      >      >     <mailto:k...@apache.org <mailto:k...@apache.org>
>> >     <mailto:k...@apache.org <mailto:k...@apache.org>>
>> >      >     <mailto:k...@apache.org <mailto:k...@apache.org>
>> >     <mailto:k...@apache.org <mailto:k...@apache.org>>>>> wrote:
>> >      >      >      >>>
>> >      >      >      >>>                 I believe the way you would
>> implement
>> >      >     the logic
>> >      >      >     behind
>> >      >      >      >>>                 Flink's KafkaProducer would be to
>> >     have
>> >      >     two steps:
>> >      >      >      >>>
>> >      >      >      >>>                 1. Start transaction
>> >      >      >      >>>                 2. @RequiresStableInput Close
>> >     transaction
>> >      >      >      >>>
>> >      >      >      >>>
>> >      >      >      >>>             I see.  What happens if closing the
>> >     transaction
>> >      >      >     fails in
>> >      >      >      >>>             (2)? Flink's 2PC requires that
>> >     commit() should
>> >      >      >     never hard
>> >      >      >      >>>             fail once preCommit() succeeds. I
>> >     think that is
>> >      >      >     cost of not
>> >      >      >      >>>             having an extra shuffle. It is
>> >     alright since
>> >      >     this
>> >      >      >     policy has
>> >      >      >      >>>             worked well for Flink so far.
>> >      >      >      >>>
>> >      >      >      >>>             Overall, it will be great to have
>> >      >     @RequiresStableInput
>> >      >      >      >>>             support in Flink runner.
>> >      >      >      >>>
>> >      >      >      >>>             Raghu.
>> >      >      >      >>>
>> >      >      >      >>>                 The FlinkRunner would need to
>> >     insert the
>> >      >     "wait
>> >      >      >     until
>> >      >      >      >>>                 checkpoint finalization" logic
>> >     wherever it
>> >      >      >      >>>                 sees @RequiresStableInput, which
>> is
>> >      >     already what it
>> >      >      >      >>>                 would have to do.
>> >      >      >      >>>
>> >      >      >      >>>                 This matches the KafkaProducer's
>> >     logic -
>> >      >     delay
>> >      >      >     closing
>> >      >      >      >>>                 the transaction until checkpoint
>> >      >     finalization. This
>> >      >      >      >>>                 answers my main question, which
>> >     is "is
>> >      >      >      >>>                 @RequiresStableInput expressive
>> >     enough
>> >      >     to allow
>> >      >      >      >>>                 Beam-on-Flink to have exactly
>> >     once behavior
>> >      >      >     with the
>> >      >      >      >>>                 same performance characteristics
>> as
>> >      >     native Flink
>> >      >      >      >>>                 checkpoint finalization?"
>> >      >      >      >>>
>> >      >      >      >>>                 Kenn
>> >      >      >      >>>
>> >      >      >      >>>                 [1]
>> >     https://github.com/apache/beam/pull/7955
>> >      >      >      >>>
>> >      >      >      >>>                 On Thu, Feb 28, 2019 at 10:43 AM
>> >     Reuven Lax
>> >      >      >      >>>                 <re...@google.com
>> >     <mailto:re...@google.com>
>> >      >     <mailto:re...@google.com <mailto:re...@google.com>>
>> >     <mailto:re...@google.com <mailto:re...@google.com>
>> >      >     <mailto:re...@google.com <mailto:re...@google.com>>>
>> >      >      >     <mailto:re...@google.com <mailto:re...@google.com>
>> >     <mailto:re...@google.com <mailto:re...@google.com>>
>> >      >     <mailto:re...@google.com <mailto:re...@google.com>
>> >     <mailto:re...@google.com <mailto:re...@google.com>>>>> wrote:
>> >      >      >      >>>
>> >      >      >      >>>
>> >      >      >      >>>
>> >      >      >      >>>                     On Thu, Feb 28, 2019 at
>> 10:41 AM
>> >      >     Raghu Angadi
>> >      >      >      >>>                     <ang...@gmail.com
>> >     <mailto:ang...@gmail.com>
>> >      >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
>> >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
>> >      >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
>> >      >      >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
>> >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
>> >      >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
>> >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>> wrote:
>> >      >      >      >>>
>> >      >      >      >>>
>> >      >      >      >>>                             Now why does the
>> Flink
>> >      >     Runner not
>> >      >      >     support
>> >      >      >      >>>                             KafkaIO EOS? Flink's
>> >     native
>> >      >      >      >>>                             KafkaProducer
>> supports
>> >      >     exactly-once. It
>> >      >      >      >>>                             simply commits the
>> >     pending
>> >      >      >      >>>                             transaction once it
>> has
>> >      >     completed a
>> >      >      >      >>> checkpoint.
>> >      >      >      >>>
>> >      >      >      >>>
>> >      >      >      >>>
>> >      >      >      >>>                         On Thu, Feb 28, 2019 at
>> >     9:59 AM
>> >      >     Maximilian
>> >      >      >      >>>                         Michels <m...@apache.org
>> >     <mailto:m...@apache.org>
>> >      >     <mailto:m...@apache.org <mailto:m...@apache.org>>
>> >      >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
>> >     <mailto:m...@apache.org <mailto:m...@apache.org>>>
>> >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
>> >     <mailto:m...@apache.org <mailto:m...@apache.org>>
>> >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
>> >     <mailto:m...@apache.org <mailto:m...@apache.org>>>>>
>> >      >      >      >>>                         wrote:
>> >      >      >      >>>
>> >      >      >      >>>                             Hi,
>> >      >      >      >>>
>> >      >      >      >>>                             I came across
>> >     KafkaIO's Runner
>> >      >      >     whitelist [1]
>> >      >      >      >>>                             for enabling
>> exactly-once
>> >      >      >      >>>                             semantics (EOS). I
>> >     think it is
>> >      >      >     questionable
>> >      >      >      >>>                             to exclude Runners
>> from
>> >      >      >      >>>                             inside a transform,
>> but I
>> >      >     see that the
>> >      >      >      >>>                             intention was to save
>> >     users from
>> >      >      >      >>>                             surprises.
>> >      >      >      >>>
>> >      >      >      >>>                             Now why does the
>> Flink
>> >      >     Runner not
>> >      >      >     support
>> >      >      >      >>>                             KafkaIO EOS? Flink's
>> >     native
>> >      >      >      >>>                             KafkaProducer
>> supports
>> >      >     exactly-once. It
>> >      >      >      >>>                             simply commits the
>> >     pending
>> >      >      >      >>>                             transaction once it
>> has
>> >      >     completed a
>> >      >      >      >>> checkpoint.
>> >      >      >      >>>
>> >      >      >      >>>
>> >      >      >      >>>
>> >      >      >      >>>                         When we discussed this
>> in Aug
>> >      >     2017, the
>> >      >      >      >>>                         understanding was that 2
>> >     Phase
>> >      >     commit
>> >      >      >     utility in
>> >      >      >      >>>                         Flink used to implement
>> >     Flink's
>> >      >     Kafka
>> >      >      >     EOS could
>> >      >      >      >>>                         not be implemented in
>> Beam's
>> >      >     context.
>> >      >      >      >>>                         See this message
>> >      >      >      >>>
>> >      >
>> >       <https://www.mail-archive.com/dev@beam.apache.org/msg02664.html
>> > in
>> >      >      >      >>>                         that dev thread. Has
>> anything
>> >      >     changed
>> >      >      >     in this
>> >      >      >      >>>                         regard? The whole thread
>> is
>> >      >     relevant to
>> >      >      >     this
>> >      >      >      >>>                         topic and worth going
>> >     through.
>> >      >      >      >>>
>> >      >      >      >>>                     I think that TwoPhaseCommit
>> >     utility
>> >      >     class
>> >      >      >     wouldn't
>> >      >      >      >>>                     work. The Flink runner would
>> >      >     probably want to
>> >      >      >      >>>                     directly use
>> >     notifySnapshotComplete
>> >      >     in order to
>> >      >      >      >>>
>> implement @RequiresStableInput.
>> >      >      >      >>>
>> >      >      >      >>>
>> >      >      >      >>>                             A checkpoint is
>> >     realized by
>> >      >     sending
>> >      >      >     barriers
>> >      >      >      >>>                             through all channels
>> >      >      >      >>>                             starting from the
>> >     source until
>> >      >      >     reaching all
>> >      >      >      >>>                             sinks. Every operator
>> >      >      >      >>>                             persists its state
>> >     once it has
>> >      >      >     received a
>> >      >      >      >>>                             barrier on all its
>> input
>> >      >      >      >>>                             channels, it then
>> >     forwards
>> >      >     it to the
>> >      >      >      >>>                             downstream operators.
>> >      >      >      >>>
>> >      >      >      >>>                             The architecture of
>> >     Beam's
>> >      >      >      >>>                             KafkaExactlyOnceSink
>> >     is as
>> >      >     follows[2]:
>> >      >      >      >>>
>> >      >      >      >>>                             Input ->
>> >     AssignRandomShardIds ->
>> >      >      >     GroupByKey
>> >      >      >      >>>                             -> AssignSequenceIds
>> ->
>> >      >      >      >>>                             GroupByKey ->
>> >     ExactlyOnceWriter
>> >      >      >      >>>
>> >      >      >      >>>                             As I understood,
>> Spark or
>> >      >     Dataflow
>> >      >      >     use the
>> >      >      >      >>>                             GroupByKey stages to
>> >     persist
>> >      >      >      >>>                             the input. That is
>> not
>> >      >     required in
>> >      >      >     Flink to
>> >      >      >      >>>                             be able to take a
>> >     consistent
>> >      >      >      >>>                             snapshot of the
>> pipeline.
>> >      >      >      >>>
>> >      >      >      >>>                             Basically, for Flink
>> we
>> >      >     don't need
>> >      >      >     any of
>> >      >      >      >>>                             that magic that
>> >     KafkaIO does.
>> >      >      >      >>>                             What we would need to
>> >      >     support EOS
>> >      >      >     is a way
>> >      >      >      >>>                             to tell the
>> >     ExactlyOnceWriter
>> >      >      >      >>>                             (a DoFn) to commit
>> once a
>> >      >      >     checkpoint has
>> >      >      >      >>>                             completed.
>> >      >      >      >>>
>> >      >      >      >>>                             I know that the new
>> >     version
>> >      >     of SDF
>> >      >      >     supports
>> >      >      >      >>>                             checkpointing which
>> >     should
>> >      >      >      >>>                             solve this issue. But
>> >     there is
>> >      >      >     still a lot
>> >      >      >      >>>                             of work to do to make
>> >     this
>> >      >      >      >>>                             reality.
>> >      >      >      >>>
>> >      >      >      >>>
>> >      >      >      >>>                         I don't see how SDF
>> >     solves this
>> >      >      >     problem.. May be
>> >      >      >      >>>                         pseudo code would make
>> more
>> >      >     clear.  But if
>> >      >      >      >>>                         helps, that is great!
>> >      >      >      >>>
>> >      >      >      >>>                             So I think it would
>> make
>> >      >     sense to think
>> >      >      >      >>>                             about a way to make
>> >     KafkaIO's
>> >      >      >      >>>                             EOS more accessible
>> >     to Runners
>> >      >      >     which support
>> >      >      >      >>>                             a different way of
>> >      >      >      >>>                             checkpointing.
>> >      >      >      >>>
>> >      >      >      >>>
>> >      >      >      >>>                         Absolutely. I would love
>> to
>> >      >     support EOS in
>> >      >      >      >>>                         KakaIO for Flink. I think
>> >     that will
>> >      >      >     help many
>> >      >      >      >>>                         future exactly-once
>> >     sinks.. and
>> >      >     address
>> >      >      >      >>>                         fundamental
>> >     incompatibility between
>> >      >      >     Beam model
>> >      >      >      >>>                         and Flink's horizontal
>> >     checkpointing
>> >      >      >     for such
>> >      >      >      >>>                         applications.
>> >      >      >      >>>
>> >      >      >      >>>                         Raghu.
>> >      >      >      >>>
>> >      >      >      >>>                             Cheers,
>> >      >      >      >>>                             Max
>> >      >      >      >>>
>> >      >      >      >>>                             PS: I found this
>> >     document about
>> >      >      >      >>>                             RequiresStableInput
>> >     [3], but
>> >      >     IMHO
>> >      >      >      >>>                             defining an
>> >     annotation only
>> >      >      >     manifests the
>> >      >      >      >>>                             conceptual difference
>> >     between
>> >      >      >      >>>                             the Runners.
>> >      >      >      >>>
>> >      >      >      >>>
>> >      >      >      >>>                             [1]
>> >      >      >      >>>
>> >      >      >
>> >      >
>> >
>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
>> >      >      >
>> >      >      >      >>>
>> >      >      >      >>>                             [2]
>> >      >      >      >>>
>> >      >      >
>> >      >
>> >
>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
>> >      >      >
>> >      >      >      >>>
>> >      >      >      >>>                             [3]
>> >      >      >      >>>
>> >      >      >
>> >      >
>> >
>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>> >      >      >
>> >      >      >      >>>
>> >      >      >      >>>
>> >      >      >
>> >      >
>> >
>>
>

Reply via email to