This is not really about barriers, those are an implementation detail.

If a transform is annotated with @RequiresStableInput, no data will be processed by this transform until a complete checkpoint has been taken. After checkpoint completion, the elements will be processed. In case of any failures, the checkpoint will be restored and the elements will be processed again. This requires idempotent writes. KafkaIO's EOS mode does that by ignoring all elements which are already part of a commit.

-Max

On 11.03.19 15:15, Thomas Weise wrote:
So all records between 2 checkpoint barriers will be buffered and on checkpoint complete notification sent in a single transaction to Kafka?

The next question then is what happens if the Kafka transaction does not complete (and checkpoint complete callback fails)? Will the callback be repeated after Flink recovers?


On Mon, Mar 11, 2019 at 3:02 AM Maximilian Michels <m...@apache.org <mailto:m...@apache.org>> wrote:

     > But there is still the possibility that we fail to flush the
    buffer after the checkpoint is complete (data loss)?

    Since we have already checkpointed the buffered data we can retry
    flushing it in case of failures. We may emit elements multiple times
    but
    that is because the Kafka EOS sink will skip records which are already
    part of a committed transaction.

    -Max

    On 06.03.19 19:28, Thomas Weise wrote:
     > A fair amount of work for true true exactly once output was done in
     > Apex. Different from almost exactly-once :)
     >
     > The takeaway was that the mechanism to achieve it depends on the
     > external system. The implementation looks different for let's say
    a file
     > sink or JDBC or Kafka.
     >
     > Apex had an exactly-once producer before Kafka supported
    transactions.
     > That producer relied on the ability to discover what was already
    written
     > to Kafka upon recovery from failure. Why?
     >
     > Runners are not distributed transaction coordinators and no
    matter how
     > we write the code, there is always the small possibility that one
    of two
     > resources fails to commit, resulting in either data loss or
    duplicates.
     > The Kafka EOS was a hybrid of producer and consumer, the consumer
    part
     > used during recovery to find out what was already produced
    previously.
     >
     > Flink and Apex have very similar checkpointing model, that's why
    this
     > thread caught my attention. Within the topology/runner,
    exactly-once is
     > achieved by replay having the same effect. For sinks, it needs to
    rely
     > on the capabilities of the respective system (like atomic rename for
     > file sink, or transaction with metadata table for JDBC).
     >
     > The buffering until checkpoint is complete is a mechanism to get
    away
     > from sink specific implementations. It comes with the latency
    penalty
     > (memory overhead could be solved with a write ahead log). But
    there is
     > still the possibility that we fail to flush the buffer after the
     > checkpoint is complete (data loss)?
     >
     > Thanks,
     > Thomas
     >
     >
     > On Wed, Mar 6, 2019 at 9:37 AM Kenneth Knowles <k...@apache.org
    <mailto:k...@apache.org>
     > <mailto:k...@apache.org <mailto:k...@apache.org>>> wrote:
     >
     >     On Tue, Mar 5, 2019 at 10:02 PM Raghu Angadi
    <ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>> wrote:
     >
     >
     >
     >         On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax
    <re...@google.com <mailto:re...@google.com>
     >         <mailto:re...@google.com <mailto:re...@google.com>>> wrote:
     >
     >             RE: Kenn's suggestion. i think Raghu looked into
    something
     >             that, and something about it didn't work. I don't
    remember
     >             all the details, but I think there might have been some
     >             subtle problem with it that wasn't obvious. Doesn't mean
     >             that there isn't another way to solve that issue.'
     >
     >
     >         Two disadvantages:
     >         - A transaction in Kafka are tied to single producer
    instance.
     >         There is no official API to start a txn in one process and
     >         access it in another process. Flink's sink uses an
    internal REST
     >         API for this.
     >
     >
     >     Can you say more about how this works?
     >
     >         - There is one failure case that I mentioned earlier: if
    closing
     >         the transaction in downstream transform fails, it is data
    loss,
     >         there is no way to replay the upstream transform that
    wrote the
     >         records to Kafka.
     >
     >
     >     With coupling of unrelated failures due to fusion, this is a
    severe
     >     problem. I think I see now how 2PC affects this. From my
    reading, I
     >     can't see the difference in how Flink works. If the checkpoint
     >     finalization callback that does the Kafka commit fails, does it
     >     invalidate the checkpoint so the start transaction + write
    elements
     >     is retried?
     >
     >     Kenn
     >
     >
     >         GBKs don't have major scalability limitations in most runner.
     >         Extra GBK is fine in practice for such a sink (at least
    no one
     >         has complained about it yet, though I don't know real usage
     >         numbers in practice). Flink's implentation in Beam
     >         using @RequiresStableInput  does have storage
    requirements and
     >         latency costs that increase with checkpoint interval. I
    think is
     >         still just as useful. Good to see @RequiresStableInput
    support
     >         added to Flink runner in Max's PR.
     >
     >
     >             Hopefully we can make that work. Another possibility
    if we
     >             can't is to do something special for Flink. Beam allows
     >             runners to splice out well-known transforms with
    their own
     >             implementation. Dataflow already does that for Google
    Cloud
     >             Pub/Sub sources/sinks. The Flink runner could splice
    out the
     >             Kafka sink with one that uses Flink-specific
    functionality.
     >             Ideally this would reuse most of the existing Kafka code
     >             (maybe we could refactor just the EOS part into something
     >             that could be subbed out).
     >
     >             Reuven
     >
     >             On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels
     >             <m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
     >
     >                  > It would be interesting to see if there's
    something
     >                 we could add to the Beam model that would create a
     >                 better story for Kafka's EOS writes.
     >
     >                 There would have to be a checkpoint-completed
    callback
     >                 the DoFn can
     >                 register with the Runner. Does not seem
    applicable for
     >                 most Runners though.
     >
     >                  > This is true, however isn't it already true
    for such
     >                 uses of Flink?
     >
     >                 Yes, that's correct. In the case of Kafka, Flink can
     >                 offload the
     >                 buffering but for the general case, idempotent writes
     >                 are only possible
     >                 if we buffer data until the checkpoint is completed.
     >
     >                 On 04.03.19 17:45, Reuven Lax wrote:
     >                  >
     >                  >
     >                  > On Mon, Mar 4, 2019 at 6:55 AM Maximilian Michels
     >                 <m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>
     >                  > <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
     >                  >
     >                  >      > Can we do 2? I seem to remember that we had
     >                 trouble in some cases
     >                  >     (e..g in the BigQuery case, there was no
    obvious
     >                 way to create a
     >                  >     deterministic id, which is why we went for a
     >                 random number followed
     >                  >     by a reshuffle). Also remember that the user
     >                 ParDo that is producing
     >                  >     data to the sink is not guaranteed to be
     >                 deterministic; the Beam
     >                  >     model allows for non-deterministic transforms.
     >                  >
     >                  >     I believe we could use something like the
    worker
     >                 id to make it
     >                  >     deterministic, though the worker id can change
     >                 after a restart. We
     >                  >     could
     >                  >     persist it in Flink's operator state. I do not
     >                 know if we can come up
     >                  >     with a Runner-independent solution.
     >                  >
     >                  >
     >                  > If we did this, we would break it on runners that
     >                 don't have a concept
     >                  > of a stable worker id :( The Dataflow runner
    can load
     >                 balance work at
     >                  > any time (including moving work around between
    workers).
     >                  >
     >                  >
     >                  >      > I'm not quite sure I understand. If a
    ParDo is
     >                 marked with
     >                  >     RequiresStableInput, can't the flink runner
     >                 buffer the input message
     >                  >     until after the checkpoint is complete and
    only
     >                 then deliver it to
     >                  >     the ParDo?
     >                  >
     >                  >     You're correct. I thought that it could
    suffice
     >                 to only buffer during a
     >                  >     checkpoint and otherwise rely on the
     >                 deterministic execution of the
     >                  >     pipeline and KafkaIO's de-duplication code.
     >                  >
     >                  >
     >                  > Yes, I want to distinguish the KafkaIO case
    from the
     >                 general case. It
     >                  > would be interesting to see if there's
    something we
     >                 could add to the
     >                  > Beam model that would create a better story for
     >                 Kafka's EOS writes.
     >                  >
     >                  >
     >                  >     In any case, emitting only after
    finalization of
     >                 checkpoints gives us
     >                  >     guaranteed stable input. It also means
    that the
     >                 processing is tight to
     >                  >     the checkpoint interval, the checkpoint
    duration,
     >                 and the available
     >                  >     memory.
     >                  >
     >                  >
     >                  > This is true, however isn't it already true
    for such
     >                 uses of Flink?
     >                  >
     >                  >
     >                  >     On 01.03.19 19:41, Reuven Lax wrote:
     >                  >      >
     >                  >      >
     >                  >      > On Fri, Mar 1, 2019 at 10:37 AM
    Maximilian Michels
     >                  >     <m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>
     >                 <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >                  >      > <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
     >                 <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>>> wrote:
     >                  >      >
     >                  >      >     Fully agree. I think we can improve the
     >                 situation
     >                  >     drastically. For
     >                  >      >     KafkaIO EOS with Flink we need to make
     >                 these two changes:
     >                  >      >
     >                  >      >     1) Introduce buffering while the
     >                 checkpoint is being taken
     >                  >      >     2) Replace the random shard id
    assignment
     >                 with something
     >                  >     deterministic
     >                  >      >
     >                  >      >
     >                  >      > Can we do 2? I seem to remember that we had
     >                 trouble in some cases
     >                  >     (e..g
     >                  >      > in the BigQuery case, there was no
    obvious way
     >                 to create a
     >                  >     deterministic
     >                  >      > id, which is why we went for a random
    number
     >                 followed by a
     >                  >     reshuffle).
     >                  >      > Also remember that the user ParDo that is
     >                 producing data to the
     >                  >     sink is
     >                  >      > not guaranteed to be deterministic; the
    Beam
     >                 model allows for
     >                  >      > non-deterministic transforms.
     >                  >      >
     >                  >      >
     >                  >      >     However, we won't be able to
    provide full
     >                 compatibility with
     >                  >      >     RequiresStableInput because Flink only
     >                 guarantees stable
     >                  >     input after a
     >                  >      >     checkpoint. RequiresStableInput
    requires
     >                 input at any point
     >                  >     in time to
     >                  >      >     be stable.
     >                  >      >
     >                  >      >
     >                  >      > I'm not quite sure I understand. If a
    ParDo is
     >                 marked with
     >                  >      > RequiresStableInput, can't the flink runner
     >                 buffer the input message
     >                  >      > until after the checkpoint is complete and
     >                 only then deliver it
     >                  >     to the
     >                  >      > ParDo? This adds latency of course, but I'm
     >                 not sure how else to do
     >                  >      > things correctly with the Beam model.
     >                  >      >
     >                  >      >     IMHO the only way to achieve that is
     >                 materializing output
     >                  >      >     which Flink does not currently support.
     >                  >      >
     >                  >      >     KafkaIO does not need all the power of
     >                 RequiresStableInput to
     >                  >     achieve
     >                  >      >     EOS with Flink, but for the general
    case I
     >                 don't see a good
     >                  >     solution at
     >                  >      >     the moment.
     >                  >      >
     >                  >      >     -Max
     >                  >      >
     >                  >      >     On 01.03.19 16:45, Reuven Lax wrote:
     >                  >      >      > Yeah, the person who was working
    on it
     >                 originally stopped
     >                  >     working on
     >                  >      >      > Beam, and nobody else ever
    finished it.
     >                 I think it is
     >                  >     important to
     >                  >      >      > finish though. Many of the existing
     >                 Sinks are only fully
     >                  >     correct for
     >                  >      >      > Dataflow today, because they
    generate
     >                 either Reshuffle or
     >                  >      >     GroupByKey to
     >                  >      >      > ensure input stability before
     >                 outputting (in many cases
     >                  >     this code
     >                  >      >     was
     >                  >      >      > inherited from before Beam
    existed). On
     >                 Flink today, these
     >                  >     sinks
     >                  >      >     might
     >                  >      >      > occasionally produce duplicate
    output
     >                 in the case of failures.
     >                  >      >      >
     >                  >      >      > Reuven
     >                  >      >      >
     >                  >      >      > On Fri, Mar 1, 2019 at 7:18 AM
     >                 Maximilian Michels
     >                  >     <m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>
     >                 <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >                  >      >     <mailto:m...@apache.org
    <mailto:m...@apache.org>
     >                 <mailto:m...@apache.org <mailto:m...@apache.org>>
    <mailto:m...@apache.org <mailto:m...@apache.org>
     >                 <mailto:m...@apache.org <mailto:m...@apache.org>>>>
     >                  >      >      > <mailto:m...@apache.org
    <mailto:m...@apache.org>
     >                 <mailto:m...@apache.org <mailto:m...@apache.org>>
    <mailto:m...@apache.org <mailto:m...@apache.org>
     >                 <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >                  >     <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
     >                 <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>>>> wrote:
     >                  >      >      >
     >                  >      >      >     Circling back to the
     >                 RequiresStableInput
     >                  >     annotation[1]. I've
     >                  >      >     done some
     >                  >      >      >     protoyping to see how this
    could be
     >                 integrated into
     >                  >     Flink. I'm
     >                  >      >      >     currently
     >                  >      >      >     writing a test based on
     >                 RequiresStableInput.
     >                  >      >      >
     >                  >      >      >     I found out there are already
     >                 checks in place at the
     >                  >     Runners to
     >                  >      >      >     throw in
     >                  >      >      >     case transforms use
     >                 RequiresStableInput and its not
     >                  >      >     supported. However,
     >                  >      >      >     not a single transform actually
     >                 uses the annotation.
     >                  >      >      >
     >                  >      >      >     It seems that the effort
    stopped at
     >                 some point? Would
     >                  >     it make
     >                  >      >     sense to
     >                  >      >      >     start annotating
     >                 KafkaExactlyOnceSink with
     >                  >      >     @RequiresStableInput? We
     >                  >      >      >     could then get rid of the
    whitelist.
     >                  >      >      >
     >                  >      >      >     -Max
     >                  >      >      >
     >                  >      >      >     [1]
     >                  >      >      >
     >                  >      >
     >                  >
     >
    
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
     >                  >      >      >
     >                  >      >      >
     >                  >      >      >
     >                  >      >      >     On 01.03.19 14:28, Maximilian
     >                 Michels wrote:
     >                  >      >      >      > Just realized that
    transactions
     >                 do not spawn multiple
     >                  >      >     elements in
     >                  >      >      >      > KafkaExactlyOnceSink. So the
     >                 proposed solution to stop
     >                  >      >     processing
     >                  >      >      >      > elements while a snapshot is
     >                 pending would work.
     >                  >      >      >      >
     >                  >      >      >      > It is certainly not
    optimal in
     >                 terms of performance for
     >                  >      >     Flink and
     >                  >      >      >     poses
     >                  >      >      >      > problems when checkpoints
    take
     >                 long to complete, but it
     >                  >      >     would be
     >                  >      >      >      > worthwhile to implement
    this to
     >                 make use of the EOS
     >                  >     feature.
     >                  >      >      >      >
     >                  >      >      >      > Thanks,
     >                  >      >      >      > Max
     >                  >      >      >      >
     >                  >      >      >      > On 01.03.19 12:23, Maximilian
     >                 Michels wrote:
     >                  >      >      >      >> Thanks you for the prompt
     >                 replies. It's great to
     >                  >     see that
     >                  >      >     there is
     >                  >      >      >      >> good understanding of
    how EOS
     >                 in Flink works.
     >                  >      >      >      >>
     >                  >      >      >      >>> This is exactly what
     >                 RequiresStableInput is
     >                  >     supposed to
     >                  >      >     do. On the
     >                  >      >      >      >>> Flink runner, this would be
     >                 implemented by delaying
     >                  >      >     processing
     >                  >      >      >     until
     >                  >      >      >      >>> the current checkpoint
    is done.
     >                  >      >      >      >>
     >                  >      >      >      >> I don't think that works
     >                 because we have no
     >                  >     control over
     >                  >      >     the Kafka
     >                  >      >      >      >> transactions. Imagine:
     >                  >      >      >      >>
     >                  >      >      >      >> 1) ExactlyOnceWriter writes
     >                 records to Kafka and
     >                  >     commits,
     >                  >      >     then
     >                  >      >      >     starts
     >                  >      >      >      >> a new transaction.
     >                  >      >      >      >> 2) Flink checkpoints,
    delaying
     >                 the processing of
     >                  >      >     elements, the
     >                  >      >      >      >> checkpoint fails.
     >                  >      >      >      >> 3) We restore from an old
     >                 checkpoint and will
     >                  >     start writing
     >                  >      >      >     duplicate
     >                  >      >      >      >> data to Kafka. The
     >                 de-duplication that the sink
     >                  >     performs
     >                  >      >     does not
     >                  >      >      >      >> help, especially because the
     >                 random shards ids
     >                  >     might be
     >                  >      >     assigned
     >                  >      >      >      >> differently.
     >                  >      >      >      >>
     >                  >      >      >      >> IMHO we have to have control
     >                 over commit to be able to
     >                  >      >     provide EOS.
     >                  >      >      >      >>
     >                  >      >      >      >>> When we discussed this
    in Aug
     >                 2017, the understanding
     >                  >      >     was that 2
     >                  >      >      >      >>> Phase commit utility in
    Flink
     >                 used to implement
     >                  >     Flink's
     >                  >      >     Kafka EOS
     >                  >      >      >      >>> could not be implemented in
     >                 Beam's context.
     >                  >      >      >      >>
     >                  >      >      >      >> That's also my
    understanding,
     >                 unless we change the
     >                  >     interface.
     >                  >      >      >      >>
     >                  >      >      >      >>> I don't see how SDF solves
     >                 this problem..
     >                  >      >      >      >>
     >                  >      >      >      >> SDF has a checkpoint method
     >                 which the Runner can call,
     >                  >      >     but I think
     >                  >      >      >      >> that you are right, that the
     >                 above problem would
     >                  >     be the same.
     >                  >      >      >      >>
     >                  >      >      >      >>> Absolutely. I would love to
     >                 support EOS in KakaIO for
     >                  >      >     Flink. I
     >                  >      >      >     think
     >                  >      >      >      >>> that will help many future
     >                 exactly-once sinks..
     >                  >     and address
     >                  >      >      >      >>> fundamental incompatibility
     >                 between Beam model
     >                  >     and Flink's
     >                  >      >      >     horizontal
     >                  >      >      >      >>> checkpointing for such
     >                 applications.
     >                  >      >      >      >>
     >                  >      >      >      >> Great :)
     >                  >      >      >      >>
     >                  >      >      >      >>> The FlinkRunner would
    need to
     >                 insert the "wait until
     >                  >      >     checkpoint
     >                  >      >      >      >>> finalization" logic
    wherever
     >                 it sees
     >                  >     @RequiresStableInput,
     >                  >      >      >     which is
     >                  >      >      >      >>> already what it would
    have to do.
     >                  >      >      >      >>
     >                  >      >      >      >> I don't think that fixes the
     >                 problem. See above
     >                  >     example.
     >                  >      >      >      >>
     >                  >      >      >      >> Thanks,
     >                  >      >      >      >> Max
     >                  >      >      >      >>
     >                  >      >      >      >> On 01.03.19 00:04, Raghu
    Angadi
     >                 wrote:
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                  >      >      >      >>> On Thu, Feb 28, 2019 at
    2:42
     >                 PM Raghu Angadi
     >                  >      >     <ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
     >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>
     >                  >      >      >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
     >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>>
     >                  >      >      >      >>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>
     >                  >      >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
     >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>>>> wrote:
     >                  >      >      >      >>>
     >                  >      >      >      >>>     On Thu, Feb 28, 2019 at
     >                 2:34 PM Kenneth Knowles
     >                  >      >      >     <k...@apache.org
    <mailto:k...@apache.org>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>>
    <mailto:k...@apache.org <mailto:k...@apache.org>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>>>
     >                  >     <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>>>
     >                  >      >     <mailto:k...@apache.org
    <mailto:k...@apache.org>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>>
    <mailto:k...@apache.org <mailto:k...@apache.org>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>>>
     >                  >     <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>>>>
>                  >      >      >      >>> <mailto:k...@apache.org <mailto:k...@apache.org>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>>
     >                  >     <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>
     >                  >     <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>>>
     >                  >      >     <mailto:k...@apache.org
    <mailto:k...@apache.org>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>>
    <mailto:k...@apache.org <mailto:k...@apache.org>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>>>
     >                  >     <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>>>>>> wrote:
     >                  >      >      >      >>>
     >                  >      >      >      >>>         I'm not sure what a
     >                 hard fail is. I probably
     >                  >      >     have a shallow
     >                  >      >      >      >>>         understanding,
    but doesn't
     >                  >     @RequiresStableInput work
     >                  >      >      >     for 2PC?
     >                  >      >      >      >>>         The preCommit()
    phase
     >                 should establish the
     >                  >      >     transaction and
     >                  >      >      >      >>>         commit() is not
    called
     >                 until after checkpoint
     >                  >      >      >     finalization. Can
     >                  >      >      >      >>>         you describe
    the way
     >                 that it does not work a
     >                  >      >     little bit
     >                  >      >      >     more?
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                  >      >      >      >>>     - preCommit() is called
     >                 before checkpoint.
     >                  >     Kafka EOS in
     >                  >      >      >     Flink starts
     >                  >      >      >      >>>     the transaction before
     >                 this and makes sure it
     >                  >      >     flushes all
     >                  >      >      >     records in
     >                  >      >      >      >>>     preCommit(). So far
    good.
     >                  >      >      >      >>>     - commit is called
    after
     >                 checkpoint is persisted.
     >                  >      >     Now, imagine
     >                  >      >      >      >>>     commit() fails for some
     >                 reason. There is no
     >                  >     option
     >                  >      >     to rerun
     >                  >      >      >     the 1st
     >                  >      >      >      >>>     phase to write the
    records
     >                 again in a new
     >                  >      >     transaction. This
     >                  >      >      >     is a
     >                  >      >      >      >>>     hard failure for
    the the
     >                 job. In practice
     >                  >     Flink might
     >                  >      >      >     attempt to
     >                  >      >      >      >>>     commit again (not
    sure how
     >                 many times), which is
     >                  >      >     likely to
     >                  >      >      >     fail and
     >                  >      >      >      >>>     eventually results
    in job
     >                 failure.
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                  >      >      >      >>> In Apache Beam, the records
     >                 could be stored in state,
     >                  >      >     and can be
     >                  >      >      >      >>> written inside commit() to
     >                 work around this issue. It
     >                  >      >     could have
     >                  >      >      >      >>> scalability issues if
     >                 checkpoints are not frequent
     >                  >      >     enough in Flink
     >                  >      >      >      >>> runner.
     >                  >      >      >      >>>
     >                  >      >      >      >>> Raghu.
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                  >      >      >      >>>         Kenn
     >                  >      >      >      >>>
     >                  >      >      >      >>>         On Thu, Feb 28,
    2019
     >                 at 1:25 PM Raghu Angadi
     >                  >      >      >     <ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
     >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>
     >                  >      >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
     >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>>
     >                  >      >      >      >>>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>
     >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>
     >                  >      >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>
     >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
     >                  >      >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>>>> wrote:
     >                  >      >      >      >>>
     >                  >      >      >      >>>             On Thu, Feb 28,
     >                 2019 at 11:01 AM
     >                  >     Kenneth Knowles
>                  >      >      >      >>> <k...@apache.org <mailto:k...@apache.org>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>>
     >                  >     <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>
     >                  >     <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>>>
     >                  >      >     <mailto:k...@apache.org
    <mailto:k...@apache.org>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>>
    <mailto:k...@apache.org <mailto:k...@apache.org>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>>>
     >                  >     <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>>>>
     >                  >      >      >     <mailto:k...@apache.org
    <mailto:k...@apache.org>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>>
    <mailto:k...@apache.org <mailto:k...@apache.org>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>>>
     >                  >     <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>>>
     >                  >      >     <mailto:k...@apache.org
    <mailto:k...@apache.org>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>>
    <mailto:k...@apache.org <mailto:k...@apache.org>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>>>
     >                  >     <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>
     >                 <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>>>>>> wrote:
     >                  >      >      >      >>>
     >                  >      >      >      >>>                 I
    believe the
     >                 way you would implement
     >                  >      >     the logic
     >                  >      >      >     behind
     >                  >      >      >      >>>                 Flink's
     >                 KafkaProducer would be to
     >                  >     have
     >                  >      >     two steps:
     >                  >      >      >      >>>
     >                  >      >      >      >>>                 1. Start
     >                 transaction
     >                  >      >      >      >>>
     >                 2. @RequiresStableInput Close
     >                  >     transaction
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                  >      >      >      >>>             I see.  What
     >                 happens if closing the
     >                  >     transaction
     >                  >      >      >     fails in
     >                  >      >      >      >>>             (2)?
    Flink's 2PC
     >                 requires that
     >                  >     commit() should
     >                  >      >      >     never hard
     >                  >      >      >      >>>             fail once
     >                 preCommit() succeeds. I
     >                  >     think that is
     >                  >      >      >     cost of not
     >                  >      >      >      >>>             having an extra
     >                 shuffle. It is
     >                  >     alright since
     >                  >      >     this
     >                  >      >      >     policy has
     >                  >      >      >      >>>             worked well for
     >                 Flink so far.
     >                  >      >      >      >>>
     >                  >      >      >      >>>             Overall, it
    will
     >                 be great to have
     >                  >      >     @RequiresStableInput
     >                  >      >      >      >>>             support in
    Flink
     >                 runner.
     >                  >      >      >      >>>
     >                  >      >      >      >>>             Raghu.
     >                  >      >      >      >>>
     >                  >      >      >      >>>                 The
     >                 FlinkRunner would need to
     >                  >     insert the
     >                  >      >     "wait
     >                  >      >      >     until
     >                  >      >      >      >>>                 checkpoint
     >                 finalization" logic
     >                  >     wherever it
     >                  >      >      >      >>>
     >                 sees @RequiresStableInput, which is
     >                  >      >     already what it
     >                  >      >      >      >>>                 would
    have to do.
     >                  >      >      >      >>>
     >                  >      >      >      >>>                 This
    matches
     >                 the KafkaProducer's
     >                  >     logic -
     >                  >      >     delay
     >                  >      >      >     closing
     >                  >      >      >      >>>                 the
     >                 transaction until checkpoint
     >                  >      >     finalization. This
     >                  >      >      >      >>>                 answers my
     >                 main question, which
     >                  >     is "is
     >                  >      >      >      >>>
     >                 @RequiresStableInput expressive
     >                  >     enough
     >                  >      >     to allow
>                  >      >      >      >>> Beam-on-Flink
     >                 to have exactly
     >                  >     once behavior
     >                  >      >      >     with the
     >                  >      >      >      >>>                 same
     >                 performance characteristics as
     >                  >      >     native Flink
     >                  >      >      >      >>>                 checkpoint
     >                 finalization?"
     >                  >      >      >      >>>
     >                  >      >      >      >>>                 Kenn
     >                  >      >      >      >>>
     >                  >      >      >      >>>                 [1]
     >                  > https://github.com/apache/beam/pull/7955
     >                  >      >      >      >>>
     >                  >      >      >      >>>                 On Thu, Feb
     >                 28, 2019 at 10:43 AM
     >                  >     Reuven Lax
     >                  >      >      >      >>>
     >                 <re...@google.com <mailto:re...@google.com>
    <mailto:re...@google.com <mailto:re...@google.com>>
     >                  >     <mailto:re...@google.com
    <mailto:re...@google.com> <mailto:re...@google.com
    <mailto:re...@google.com>>>
     >                  >      >     <mailto:re...@google.com
    <mailto:re...@google.com>
     >                 <mailto:re...@google.com
    <mailto:re...@google.com>> <mailto:re...@google.com
    <mailto:re...@google.com>
     >                 <mailto:re...@google.com <mailto:re...@google.com>>>>
     >                  >     <mailto:re...@google.com
    <mailto:re...@google.com>
     >                 <mailto:re...@google.com
    <mailto:re...@google.com>> <mailto:re...@google.com
    <mailto:re...@google.com>
     >                 <mailto:re...@google.com <mailto:re...@google.com>>>
     >                  >      >     <mailto:re...@google.com
    <mailto:re...@google.com>
     >                 <mailto:re...@google.com
    <mailto:re...@google.com>> <mailto:re...@google.com
    <mailto:re...@google.com>
     >                 <mailto:re...@google.com
    <mailto:re...@google.com>>>>>
     >                  >      >      >     <mailto:re...@google.com
    <mailto:re...@google.com>
     >                 <mailto:re...@google.com
    <mailto:re...@google.com>> <mailto:re...@google.com
    <mailto:re...@google.com>
     >                 <mailto:re...@google.com <mailto:re...@google.com>>>
     >                  >     <mailto:re...@google.com
    <mailto:re...@google.com>
     >                 <mailto:re...@google.com
    <mailto:re...@google.com>> <mailto:re...@google.com
    <mailto:re...@google.com>
     >                 <mailto:re...@google.com <mailto:re...@google.com>>>>
     >                  >      >     <mailto:re...@google.com
    <mailto:re...@google.com>
     >                 <mailto:re...@google.com
    <mailto:re...@google.com>> <mailto:re...@google.com
    <mailto:re...@google.com>
     >                 <mailto:re...@google.com <mailto:re...@google.com>>>
     >                  >     <mailto:re...@google.com
    <mailto:re...@google.com>
     >                 <mailto:re...@google.com
    <mailto:re...@google.com>> <mailto:re...@google.com
    <mailto:re...@google.com>
     >                 <mailto:re...@google.com
    <mailto:re...@google.com>>>>>>> wrote:
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                  >      >      >      >>>                     On Thu,
     >                 Feb 28, 2019 at 10:41 AM
     >                  >      >     Raghu Angadi
     >                  >      >      >      >>>
     >                 <ang...@gmail.com <mailto:ang...@gmail.com>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>
     >                  >      >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>
     >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
     >                  >      >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>>
     >                  >      >      >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
     >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>
     >                  >      >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
     >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>>>> wrote:
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                 Now why does the Flink
     >                  >      >     Runner not
     >                  >      >      >     support
     >                  >      >      >      >>>
     >                 KafkaIO EOS? Flink's
     >                  >     native
     >                  >      >      >      >>>
     >                 KafkaProducer supports
     >                  >      >     exactly-once. It
     >                  >      >      >      >>>
     >                 simply commits the
     >                  >     pending
     >                  >      >      >      >>>
     >                 transaction once it has
     >                  >      >     completed a
     >                  >      >      >      >>> checkpoint.
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                  >      >      >      >>>                         On
     >                 Thu, Feb 28, 2019 at
     >                  >     9:59 AM
     >                  >      >     Maximilian
     >                  >      >      >      >>>
     >                 Michels <m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>
     >                  >     <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >                  >      >     <mailto:m...@apache.org
    <mailto:m...@apache.org>
     >                 <mailto:m...@apache.org <mailto:m...@apache.org>>
    <mailto:m...@apache.org <mailto:m...@apache.org>
     >                 <mailto:m...@apache.org <mailto:m...@apache.org>>>>
     >                  >      >      >     <mailto:m...@apache.org
    <mailto:m...@apache.org>
     >                 <mailto:m...@apache.org <mailto:m...@apache.org>>
    <mailto:m...@apache.org <mailto:m...@apache.org>
     >                 <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >                  >     <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
     >                 <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>>>
     >                  >      >     <mailto:m...@apache.org
    <mailto:m...@apache.org>
     >                 <mailto:m...@apache.org <mailto:m...@apache.org>>
    <mailto:m...@apache.org <mailto:m...@apache.org>
     >                 <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >                  >     <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
     >                 <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>>
     >                  >      >     <mailto:m...@apache.org
    <mailto:m...@apache.org>
     >                 <mailto:m...@apache.org <mailto:m...@apache.org>>
    <mailto:m...@apache.org <mailto:m...@apache.org>
     >                 <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >                  >     <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
     >                 <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>>>>>
>                  >      >      >      >>> wrote:
     >                  >      >      >      >>>
     >                  >      >      >      >>>
                                 Hi,
     >                  >      >      >      >>>
     >                  >      >      >      >>>
                                 I
     >                 came across
     >                  >     KafkaIO's Runner
     >                  >      >      >     whitelist [1]
     >                  >      >      >      >>>
     >                 for enabling exactly-once
     >                  >      >      >      >>>
     >                 semantics (EOS). I
     >                  >     think it is
     >                  >      >      >     questionable
     >                  >      >      >      >>>
                                 to
     >                 exclude Runners from
     >                  >      >      >      >>>
     >                 inside a transform, but I
     >                  >      >     see that the
     >                  >      >      >      >>>
     >                 intention was to save
     >                  >     users from
     >                  >      >      >      >>>
     >                 surprises.
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                 Now why does the Flink
     >                  >      >     Runner not
     >                  >      >      >     support
     >                  >      >      >      >>>
     >                 KafkaIO EOS? Flink's
     >                  >     native
     >                  >      >      >      >>>
     >                 KafkaProducer supports
     >                  >      >     exactly-once. It
     >                  >      >      >      >>>
     >                 simply commits the
     >                  >     pending
     >                  >      >      >      >>>
     >                 transaction once it has
     >                  >      >     completed a
     >                  >      >      >      >>> checkpoint.
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                  >      >      >      >>>
>                  >      >      >      >>> When
     >                 we discussed this in Aug
     >                  >      >     2017, the
     >                  >      >      >      >>>
     >                 understanding was that 2
     >                  >     Phase
     >                  >      >     commit
     >                  >      >      >     utility in
>                  >      >      >      >>> Flink
     >                 used to implement
     >                  >     Flink's
     >                  >      >     Kafka
     >                  >      >      >     EOS could
>                  >      >      >      >>> not be
     >                 implemented in Beam's
     >                  >      >     context.
     >                  >      >      >      >>>                         See
     >                 this message
     >                  >      >      >      >>>
     >                  >      >
     >                  >
>  <https://www.mail-archive.com/dev@beam.apache.org/msg02664.html> in >                  >      >      >      >>> that
     >                 dev thread. Has anything
     >                  >      >     changed
     >                  >      >      >     in this
     >                  >      >      >      >>>
     >                 regard? The whole thread is
     >                  >      >     relevant to
     >                  >      >      >     this
>                  >      >      >      >>> topic
     >                 and worth going
     >                  >     through.
     >                  >      >      >      >>>
     >                  >      >      >      >>>                     I think
     >                 that TwoPhaseCommit
     >                  >     utility
     >                  >      >     class
     >                  >      >      >     wouldn't
>                  >      >      >      >>> work. The
     >                 Flink runner would
     >                  >      >     probably want to
>                  >      >      >      >>> directly use
     >                  >     notifySnapshotComplete
     >                  >      >     in order to
     >                  >      >      >      >>>
     >                 implement @RequiresStableInput.
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                  >      >      >      >>>
                                 A
     >                 checkpoint is
     >                  >     realized by
     >                  >      >     sending
     >                  >      >      >     barriers
     >                  >      >      >      >>>
     >                 through all channels
     >                  >      >      >      >>>
     >                 starting from the
     >                  >     source until
     >                  >      >      >     reaching all
     >                  >      >      >      >>>
     >                 sinks. Every operator
     >                  >      >      >      >>>
     >                 persists its state
     >                  >     once it has
     >                  >      >      >     received a
     >                  >      >      >      >>>
     >                 barrier on all its input
     >                  >      >      >      >>>
     >                 channels, it then
     >                  >     forwards
     >                  >      >     it to the
     >                  >      >      >      >>>
     >                 downstream operators.
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                 The architecture of
     >                  >     Beam's
     >                  >      >      >      >>>
     >                 KafkaExactlyOnceSink
     >                  >     is as
     >                  >      >     follows[2]:
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                 Input ->
     >                  >     AssignRandomShardIds ->
     >                  >      >      >     GroupByKey
     >                  >      >      >      >>>
                                 ->
     >                 AssignSequenceIds ->
     >                  >      >      >      >>>
     >                 GroupByKey ->
     >                  >     ExactlyOnceWriter
     >                  >      >      >      >>>
     >                  >      >      >      >>>
                                 As
     >                 I understood, Spark or
     >                  >      >     Dataflow
     >                  >      >      >     use the
     >                  >      >      >      >>>
     >                 GroupByKey stages to
     >                  >     persist
     >                  >      >      >      >>>
     >                 the input. That is not
     >                  >      >     required in
     >                  >      >      >     Flink to
     >                  >      >      >      >>>
                                 be
     >                 able to take a
     >                  >     consistent
     >                  >      >      >      >>>
     >                 snapshot of the pipeline.
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                 Basically, for Flink we
     >                  >      >     don't need
     >                  >      >      >     any of
     >                  >      >      >      >>>
     >                 that magic that
     >                  >     KafkaIO does.
     >                  >      >      >      >>>
     >                 What we would need to
     >                  >      >     support EOS
     >                  >      >      >     is a way
     >                  >      >      >      >>>
                                 to
     >                 tell the
     >                  >     ExactlyOnceWriter
     >                  >      >      >      >>>
                                 (a
     >                 DoFn) to commit once a
     >                  >      >      >     checkpoint has
     >                  >      >      >      >>>
     >                 completed.
     >                  >      >      >      >>>
     >                  >      >      >      >>>
                                 I
     >                 know that the new
     >                  >     version
     >                  >      >     of SDF
     >                  >      >      >     supports
     >                  >      >      >      >>>
     >                 checkpointing which
     >                  >     should
     >                  >      >      >      >>>
     >                 solve this issue. But
     >                  >     there is
     >                  >      >      >     still a lot
     >                  >      >      >      >>>
                                 of
     >                 work to do to make
     >                  >     this
     >                  >      >      >      >>>
     >                 reality.
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                  >      >      >      >>>                         I
     >                 don't see how SDF
     >                  >     solves this
     >                  >      >      >     problem.. May be
>                  >      >      >      >>> pseudo
     >                 code would make more
     >                  >      >     clear.  But if
>                  >      >      >      >>> helps,
     >                 that is great!
     >                  >      >      >      >>>
     >                  >      >      >      >>>
                                 So
     >                 I think it would make
     >                  >      >     sense to think
     >                  >      >      >      >>>
     >                 about a way to make
     >                  >     KafkaIO's
     >                  >      >      >      >>>
     >                 EOS more accessible
     >                  >     to Runners
     >                  >      >      >     which support
     >                  >      >      >      >>>
                                 a
     >                 different way of
     >                  >      >      >      >>>
     >                 checkpointing.
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                 Absolutely. I would love to
     >                  >      >     support EOS in
>                  >      >      >      >>> KakaIO
     >                 for Flink. I think
     >                  >     that will
     >                  >      >      >     help many
>                  >      >      >      >>> future
     >                 exactly-once
     >                  >     sinks.. and
     >                  >      >     address
     >                  >      >      >      >>>
     >                 fundamental
     >                  >     incompatibility between
     >                  >      >      >     Beam model
     >                  >      >      >      >>>                         and
     >                 Flink's horizontal
     >                  >     checkpointing
     >                  >      >      >     for such
     >                  >      >      >      >>>
     >                 applications.
     >                  >      >      >      >>>
>                  >      >      >      >>> Raghu.
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                 Cheers,
     >                  >      >      >      >>>
                                 Max
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                 PS: I found this
     >                  >     document about
     >                  >      >      >      >>>
     >                 RequiresStableInput
     >                  >     [3], but
     >                  >      >     IMHO
     >                  >      >      >      >>>
     >                 defining an
     >                  >     annotation only
     >                  >      >      >     manifests the
     >                  >      >      >      >>>
     >                 conceptual difference
     >                  >     between
     >                  >      >      >      >>>
     >                 the Runners.
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                  >      >      >      >>>
                                 [1]
     >                  >      >      >      >>>
     >                  >      >      >
     >                  >      >
     >                  >
     >
    
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
     >                  >      >      >
     >                  >      >      >      >>>
     >                  >      >      >      >>>
                                 [2]
     >                  >      >      >      >>>
     >                  >      >      >
     >                  >      >
     >                  >
     >
    
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
     >                  >      >      >
     >                  >      >      >      >>>
     >                  >      >      >      >>>
                                 [3]
     >                  >      >      >      >>>
     >                  >      >      >
     >                  >      >
     >                  >
     >
    
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
     >                  >      >      >
     >                  >      >      >      >>>
     >                  >      >      >      >>>
     >                  >      >      >
     >                  >      >
     >                  >
     >

Reply via email to