We cannot reason about correct exactly-once behavior of a transform without 
understanding how state management and fault-tolerance in the runner work.

Generally, we require a transforms's writes to be idempotent for exactly-once semantics, even with @RequiresStableInput.

In the case of KafkaIO, we have transactions which means writes cannot be indempotent per se. That's why we drop already-committed records by recovering the current committed id from Kafka itself: https://github.com/apache/beam/blob/99d5d9138acbf9e5b87e7068183c5fd27448043e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L300

Beam's state interface is only used to persist the current record id. This is necessary to be able to replay the same ids upon restoring a failed job.

-Max

On 11.03.19 17:38, Thomas Weise wrote:
We cannot reason about correct exactly-once behavior of a transform without understanding how state management and fault-tolerance in the runner work.

Max pinged me this link to the Kafka EOS logic [1]. It uses a state variable to find out what was already written. That state variable would be part of a future Flink checkpoint. If after a failure we revert to the previous checkpoint, it won't help to discover/skip duplicates?

The general problem is that we are trying to rely on state in two different places to achieve EOS. This blog <https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/> [2] describes how Kafka streams can provide the exactly-once guarantee, by using only Kafka as transactional resource (and committing all changes in a single TX). Everything else would require a distributed transaction coordinator (expensive) or a retry with duplicate detection mechanism in the external system (like check if record/reference was already written to Kafka, JDBC etc. or for file system, check if the file that would result from atomic rename already exists).

Thomas

[1] https://github.com/apache/beam/blob/99d5d9138acbf9e5b87e7068183c5fd27448043e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L329 [2] https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/

On Mon, Mar 11, 2019 at 7:54 AM Maximilian Michels <m...@apache.org <mailto:m...@apache.org>> wrote:

    This is not really about barriers, those are an implementation detail.

    If a transform is annotated with @RequiresStableInput, no data will be
    processed by this transform until a complete checkpoint has been taken.
    After checkpoint completion, the elements will be processed. In case of
    any failures, the checkpoint will be restored and the elements will be
    processed again. This requires idempotent writes. KafkaIO's EOS mode
    does that by ignoring all elements which are already part of a commit.

    -Max

    On 11.03.19 15:15, Thomas Weise wrote:
     > So all records between 2 checkpoint barriers will be buffered and on
     > checkpoint complete notification sent in a single transaction to
    Kafka?
     >
     > The next question then is what happens if the Kafka transaction
    does not
     > complete (and checkpoint complete callback fails)? Will the
    callback be
     > repeated after Flink recovers?
     >
     >
     > On Mon, Mar 11, 2019 at 3:02 AM Maximilian Michels
    <m...@apache.org <mailto:m...@apache.org>
     > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
     >
     >      > But there is still the possibility that we fail to flush the
     >     buffer after the checkpoint is complete (data loss)?
     >
     >     Since we have already checkpointed the buffered data we can retry
     >     flushing it in case of failures. We may emit elements
    multiple times
     >     but
     >     that is because the Kafka EOS sink will skip records which
    are already
     >     part of a committed transaction.
     >
     >     -Max
     >
     >     On 06.03.19 19:28, Thomas Weise wrote:
     >      > A fair amount of work for true true exactly once output
    was done in
     >      > Apex. Different from almost exactly-once :)
     >      >
     >      > The takeaway was that the mechanism to achieve it depends
    on the
     >      > external system. The implementation looks different for
    let's say
     >     a file
     >      > sink or JDBC or Kafka.
     >      >
     >      > Apex had an exactly-once producer before Kafka supported
     >     transactions.
     >      > That producer relied on the ability to discover what was
    already
     >     written
     >      > to Kafka upon recovery from failure. Why?
     >      >
     >      > Runners are not distributed transaction coordinators and no
     >     matter how
     >      > we write the code, there is always the small possibility
    that one
     >     of two
     >      > resources fails to commit, resulting in either data loss or
     >     duplicates.
     >      > The Kafka EOS was a hybrid of producer and consumer, the
    consumer
     >     part
     >      > used during recovery to find out what was already produced
     >     previously.
     >      >
     >      > Flink and Apex have very similar checkpointing model,
    that's why
     >     this
     >      > thread caught my attention. Within the topology/runner,
     >     exactly-once is
     >      > achieved by replay having the same effect. For sinks, it
    needs to
     >     rely
     >      > on the capabilities of the respective system (like atomic
    rename for
     >      > file sink, or transaction with metadata table for JDBC).
     >      >
     >      > The buffering until checkpoint is complete is a mechanism
    to get
     >     away
     >      > from sink specific implementations. It comes with the latency
     >     penalty
     >      > (memory overhead could be solved with a write ahead log). But
     >     there is
     >      > still the possibility that we fail to flush the buffer
    after the
     >      > checkpoint is complete (data loss)?
     >      >
     >      > Thanks,
     >      > Thomas
     >      >
     >      >
     >      > On Wed, Mar 6, 2019 at 9:37 AM Kenneth Knowles
    <k...@apache.org <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>
     >      > <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>>> wrote:
     >      >
     >      >     On Tue, Mar 5, 2019 at 10:02 PM Raghu Angadi
     >     <ang...@gmail.com <mailto:ang...@gmail.com>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>> wrote:
     >      >
     >      >
     >      >
     >      >         On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax
     >     <re...@google.com <mailto:re...@google.com>
    <mailto:re...@google.com <mailto:re...@google.com>>
     >      >         <mailto:re...@google.com <mailto:re...@google.com>
    <mailto:re...@google.com <mailto:re...@google.com>>>> wrote:
     >      >
     >      >             RE: Kenn's suggestion. i think Raghu looked into
     >     something
     >      >             that, and something about it didn't work. I don't
     >     remember
     >      >             all the details, but I think there might have
    been some
     >      >             subtle problem with it that wasn't obvious.
    Doesn't mean
     >      >             that there isn't another way to solve that issue.'
     >      >
     >      >
     >      >         Two disadvantages:
     >      >         - A transaction in Kafka are tied to single producer
     >     instance.
     >      >         There is no official API to start a txn in one
    process and
     >      >         access it in another process. Flink's sink uses an
     >     internal REST
     >      >         API for this.
     >      >
     >      >
     >      >     Can you say more about how this works?
     >      >
     >      >         - There is one failure case that I mentioned
    earlier: if
     >     closing
     >      >         the transaction in downstream transform fails, it
    is data
     >     loss,
     >      >         there is no way to replay the upstream transform that
     >     wrote the
     >      >         records to Kafka.
     >      >
     >      >
     >      >     With coupling of unrelated failures due to fusion,
    this is a
     >     severe
     >      >     problem. I think I see now how 2PC affects this. From my
     >     reading, I
     >      >     can't see the difference in how Flink works. If the
    checkpoint
     >      >     finalization callback that does the Kafka commit
    fails, does it
     >      >     invalidate the checkpoint so the start transaction + write
     >     elements
     >      >     is retried?
     >      >
     >      >     Kenn
     >      >
     >      >
     >      >         GBKs don't have major scalability limitations in
    most runner.
     >      >         Extra GBK is fine in practice for such a sink (at
    least
     >     no one
     >      >         has complained about it yet, though I don't know
    real usage
     >      >         numbers in practice). Flink's implentation in Beam
     >      >         using @RequiresStableInput  does have storage
     >     requirements and
     >      >         latency costs that increase with checkpoint
    interval. I
     >     think is
     >      >         still just as useful. Good to see @RequiresStableInput
     >     support
     >      >         added to Flink runner in Max's PR.
     >      >
     >      >
     >      >             Hopefully we can make that work. Another
    possibility
     >     if we
     >      >             can't is to do something special for Flink.
    Beam allows
     >      >             runners to splice out well-known transforms with
     >     their own
     >      >             implementation. Dataflow already does that for
    Google
     >     Cloud
     >      >             Pub/Sub sources/sinks. The Flink runner could
    splice
     >     out the
     >      >             Kafka sink with one that uses Flink-specific
     >     functionality.
     >      >             Ideally this would reuse most of the existing
    Kafka code
     >      >             (maybe we could refactor just the EOS part
    into something
     >      >             that could be subbed out).
     >      >
     >      >             Reuven
     >      >
     >      >             On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels
     >      >             <m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
     >      >
     >      >                  > It would be interesting to see if there's
     >     something
     >      >                 we could add to the Beam model that would
    create a
     >      >                 better story for Kafka's EOS writes.
     >      >
     >      >                 There would have to be a checkpoint-completed
     >     callback
     >      >                 the DoFn can
     >      >                 register with the Runner. Does not seem
     >     applicable for
     >      >                 most Runners though.
     >      >
     >      >                  > This is true, however isn't it already true
     >     for such
     >      >                 uses of Flink?
     >      >
     >      >                 Yes, that's correct. In the case of Kafka,
    Flink can
     >      >                 offload the
     >      >                 buffering but for the general case,
    idempotent writes
     >      >                 are only possible
     >      >                 if we buffer data until the checkpoint is
    completed.
     >      >
     >      >                 On 04.03.19 17:45, Reuven Lax wrote:
     >      >                  >
     >      >                  >
     >      >                  > On Mon, Mar 4, 2019 at 6:55 AM
    Maximilian Michels
     >      >                 <m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >      >                  > <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>>> wrote:
     >      >                  >
     >      >                  >      > Can we do 2? I seem to remember
    that we had
     >      >                 trouble in some cases
     >      >                  >     (e..g in the BigQuery case, there
    was no
     >     obvious
     >      >                 way to create a
     >      >                  >     deterministic id, which is why we
    went for a
     >      >                 random number followed
     >      >                  >     by a reshuffle). Also remember that
    the user
     >      >                 ParDo that is producing
     >      >                  >     data to the sink is not guaranteed
    to be
     >      >                 deterministic; the Beam
     >      >                  >     model allows for non-deterministic
    transforms.
     >      >                  >
     >      >                  >     I believe we could use something
    like the
     >     worker
     >      >                 id to make it
     >      >                  >     deterministic, though the worker id
    can change
     >      >                 after a restart. We
     >      >                  >     could
     >      >                  >     persist it in Flink's operator
    state. I do not
     >      >                 know if we can come up
     >      >                  >     with a Runner-independent solution.
     >      >                  >
     >      >                  >
     >      >                  > If we did this, we would break it on
    runners that
     >      >                 don't have a concept
     >      >                  > of a stable worker id :( The Dataflow
    runner
     >     can load
     >      >                 balance work at
     >      >                  > any time (including moving work around
    between
     >     workers).
     >      >                  >
     >      >                  >
     >      >                  >      > I'm not quite sure I understand.
    If a
     >     ParDo is
     >      >                 marked with
     >      >                  >     RequiresStableInput, can't the
    flink runner
     >      >                 buffer the input message
     >      >                  >     until after the checkpoint is
    complete and
     >     only
     >      >                 then deliver it to
     >      >                  >     the ParDo?
     >      >                  >
     >      >                  >     You're correct. I thought that it could
     >     suffice
     >      >                 to only buffer during a
     >      >                  >     checkpoint and otherwise rely on the
     >      >                 deterministic execution of the
     >      >                  >     pipeline and KafkaIO's
    de-duplication code.
     >      >                  >
     >      >                  >
     >      >                  > Yes, I want to distinguish the KafkaIO case
     >     from the
     >      >                 general case. It
     >      >                  > would be interesting to see if there's
     >     something we
     >      >                 could add to the
     >      >                  > Beam model that would create a better
    story for
     >      >                 Kafka's EOS writes.
     >      >                  >
     >      >                  >
     >      >                  >     In any case, emitting only after
     >     finalization of
     >      >                 checkpoints gives us
     >      >                  >     guaranteed stable input. It also means
     >     that the
     >      >                 processing is tight to
     >      >                  >     the checkpoint interval, the checkpoint
     >     duration,
     >      >                 and the available
     >      >                  >     memory.
     >      >                  >
     >      >                  >
     >      >                  > This is true, however isn't it already true
     >     for such
     >      >                 uses of Flink?
     >      >                  >
     >      >                  >
     >      >                  >     On 01.03.19 19:41, Reuven Lax wrote:
     >      >                  >      >
     >      >                  >      >
     >      >                  >      > On Fri, Mar 1, 2019 at 10:37 AM
     >     Maximilian Michels
     >      >                  >     <m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >      >                 <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>>
     >      >                  >      > <mailto:m...@apache.org
    <mailto:m...@apache.org>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>>
    <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >      >                 <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>>>> wrote:
     >      >                  >      >
     >      >                  >      >     Fully agree. I think we can
    improve the
     >      >                 situation
     >      >                  >     drastically. For
     >      >                  >      >     KafkaIO EOS with Flink we
    need to make
     >      >                 these two changes:
     >      >                  >      >
     >      >                  >      >     1) Introduce buffering while the
     >      >                 checkpoint is being taken
     >      >                  >      >     2) Replace the random shard id
     >     assignment
     >      >                 with something
     >      >                  >     deterministic
     >      >                  >      >
     >      >                  >      >
     >      >                  >      > Can we do 2? I seem to remember
    that we had
     >      >                 trouble in some cases
     >      >                  >     (e..g
     >      >                  >      > in the BigQuery case, there was no
     >     obvious way
     >      >                 to create a
     >      >                  >     deterministic
     >      >                  >      > id, which is why we went for a
    random
     >     number
     >      >                 followed by a
     >      >                  >     reshuffle).
     >      >                  >      > Also remember that the user
    ParDo that is
     >      >                 producing data to the
     >      >                  >     sink is
     >      >                  >      > not guaranteed to be
    deterministic; the
     >     Beam
     >      >                 model allows for
     >      >                  >      > non-deterministic transforms.
     >      >                  >      >
     >      >                  >      >
     >      >                  >      >     However, we won't be able to
     >     provide full
     >      >                 compatibility with
     >      >                  >      >     RequiresStableInput because
    Flink only
     >      >                 guarantees stable
     >      >                  >     input after a
     >      >                  >      >     checkpoint. RequiresStableInput
     >     requires
     >      >                 input at any point
     >      >                  >     in time to
     >      >                  >      >     be stable.
     >      >                  >      >
     >      >                  >      >
     >      >                  >      > I'm not quite sure I understand.
    If a
     >     ParDo is
     >      >                 marked with
     >      >                  >      > RequiresStableInput, can't the
    flink runner
     >      >                 buffer the input message
     >      >                  >      > until after the checkpoint is
    complete and
     >      >                 only then deliver it
     >      >                  >     to the
     >      >                  >      > ParDo? This adds latency of
    course, but I'm
     >      >                 not sure how else to do
     >      >                  >      > things correctly with the Beam
    model.
     >      >                  >      >
     >      >                  >      >     IMHO the only way to achieve
    that is
     >      >                 materializing output
     >      >                  >      >     which Flink does not
    currently support.
     >      >                  >      >
     >      >                  >      >     KafkaIO does not need all
    the power of
     >      >                 RequiresStableInput to
     >      >                  >     achieve
     >      >                  >      >     EOS with Flink, but for the
    general
     >     case I
     >      >                 don't see a good
     >      >                  >     solution at
     >      >                  >      >     the moment.
     >      >                  >      >
     >      >                  >      >     -Max
     >      >                  >      >
     >      >                  >      >     On 01.03.19 16:45, Reuven
    Lax wrote:
     >      >                  >      >      > Yeah, the person who was
    working
     >     on it
     >      >                 originally stopped
     >      >                  >     working on
     >      >                  >      >      > Beam, and nobody else ever
     >     finished it.
     >      >                 I think it is
     >      >                  >     important to
     >      >                  >      >      > finish though. Many of
    the existing
     >      >                 Sinks are only fully
     >      >                  >     correct for
     >      >                  >      >      > Dataflow today, because they
     >     generate
     >      >                 either Reshuffle or
     >      >                  >      >     GroupByKey to
     >      >                  >      >      > ensure input stability before
     >      >                 outputting (in many cases
     >      >                  >     this code
     >      >                  >      >     was
     >      >                  >      >      > inherited from before Beam
     >     existed). On
     >      >                 Flink today, these
     >      >                  >     sinks
     >      >                  >      >     might
     >      >                  >      >      > occasionally produce
    duplicate
     >     output
     >      >                 in the case of failures.
     >      >                  >      >      >
     >      >                  >      >      > Reuven
     >      >                  >      >      >
     >      >                  >      >      > On Fri, Mar 1, 2019 at
    7:18 AM
     >      >                 Maximilian Michels
     >      >                  >     <m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >      >                 <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>>
     >      >                  >      >     <mailto:m...@apache.org
    <mailto:m...@apache.org>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>>
     >      >                 <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>
     >      >                 <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org
    <mailto:m...@apache.org>>>>>
     >      >                  >      >      > <mailto:m...@apache.org
    <mailto:m...@apache.org>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>>
     >      >                 <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>
     >      >                 <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org
    <mailto:m...@apache.org>>>>
     >      >                  >     <mailto:m...@apache.org
    <mailto:m...@apache.org>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>>
    <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >      >                 <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>>>>> wrote:
     >      >                  >      >      >
     >      >                  >      >      >     Circling back to the
     >      >                 RequiresStableInput
     >      >                  >     annotation[1]. I've
     >      >                  >      >     done some
     >      >                  >      >      >     protoyping to see how
    this
     >     could be
     >      >                 integrated into
     >      >                  >     Flink. I'm
     >      >                  >      >      >     currently
     >      >                  >      >      >     writing a test based on
     >      >                 RequiresStableInput.
     >      >                  >      >      >
     >      >                  >      >      >     I found out there are
    already
     >      >                 checks in place at the
     >      >                  >     Runners to
     >      >                  >      >      >     throw in
     >      >                  >      >      >     case transforms use
     >      >                 RequiresStableInput and its not
     >      >                  >      >     supported. However,
     >      >                  >      >      >     not a single
    transform actually
     >      >                 uses the annotation.
     >      >                  >      >      >
     >      >                  >      >      >     It seems that the effort
     >     stopped at
     >      >                 some point? Would
     >      >                  >     it make
     >      >                  >      >     sense to
     >      >                  >      >      >     start annotating
     >      >                 KafkaExactlyOnceSink with
     >      >                  >      >     @RequiresStableInput? We
     >      >                  >      >      >     could then get rid of the
     >     whitelist.
     >      >                  >      >      >
     >      >                  >      >      >     -Max
     >      >                  >      >      >
     >      >                  >      >      >     [1]
     >      >                  >      >      >
     >      >                  >      >
     >      >                  >
     >      >
     >
    
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
     >      >                  >      >      >
     >      >                  >      >      >
     >      >                  >      >      >
     >      >                  >      >      >     On 01.03.19 14:28,
    Maximilian
     >      >                 Michels wrote:
     >      >                  >      >      >      > Just realized that
     >     transactions
     >      >                 do not spawn multiple
     >      >                  >      >     elements in
     >      >                  >      >      >      >
    KafkaExactlyOnceSink. So the
     >      >                 proposed solution to stop
     >      >                  >      >     processing
     >      >                  >      >      >      > elements while a
    snapshot is
     >      >                 pending would work.
     >      >                  >      >      >      >
     >      >                  >      >      >      > It is certainly not
     >     optimal in
     >      >                 terms of performance for
     >      >                  >      >     Flink and
     >      >                  >      >      >     poses
     >      >                  >      >      >      > problems when
    checkpoints
     >     take
     >      >                 long to complete, but it
     >      >                  >      >     would be
     >      >                  >      >      >      > worthwhile to
    implement
     >     this to
     >      >                 make use of the EOS
     >      >                  >     feature.
     >      >                  >      >      >      >
     >      >                  >      >      >      > Thanks,
     >      >                  >      >      >      > Max
     >      >                  >      >      >      >
     >      >                  >      >      >      > On 01.03.19 12:23,
    Maximilian
     >      >                 Michels wrote:
     >      >                  >      >      >      >> Thanks you for
    the prompt
     >      >                 replies. It's great to
     >      >                  >     see that
     >      >                  >      >     there is
     >      >                  >      >      >      >> good understanding of
     >     how EOS
     >      >                 in Flink works.
     >      >                  >      >      >      >>
     >      >                  >      >      >      >>> This is exactly what
     >      >                 RequiresStableInput is
     >      >                  >     supposed to
     >      >                  >      >     do. On the
     >      >                  >      >      >      >>> Flink runner,
    this would be
     >      >                 implemented by delaying
     >      >                  >      >     processing
     >      >                  >      >      >     until
     >      >                  >      >      >      >>> the current
    checkpoint
     >     is done.
     >      >                  >      >      >      >>
     >      >                  >      >      >      >> I don't think
    that works
     >      >                 because we have no
     >      >                  >     control over
     >      >                  >      >     the Kafka
     >      >                  >      >      >      >> transactions.
    Imagine:
     >      >                  >      >      >      >>
     >      >                  >      >      >      >> 1)
    ExactlyOnceWriter writes
     >      >                 records to Kafka and
     >      >                  >     commits,
     >      >                  >      >     then
     >      >                  >      >      >     starts
     >      >                  >      >      >      >> a new transaction.
     >      >                  >      >      >      >> 2) Flink checkpoints,
     >     delaying
     >      >                 the processing of
     >      >                  >      >     elements, the
     >      >                  >      >      >      >> checkpoint fails.
     >      >                  >      >      >      >> 3) We restore
    from an old
     >      >                 checkpoint and will
     >      >                  >     start writing
     >      >                  >      >      >     duplicate
     >      >                  >      >      >      >> data to Kafka. The
     >      >                 de-duplication that the sink
     >      >                  >     performs
     >      >                  >      >     does not
     >      >                  >      >      >      >> help, especially
    because the
     >      >                 random shards ids
     >      >                  >     might be
     >      >                  >      >     assigned
     >      >                  >      >      >      >> differently.
     >      >                  >      >      >      >>
     >      >                  >      >      >      >> IMHO we have to
    have control
     >      >                 over commit to be able to
     >      >                  >      >     provide EOS.
     >      >                  >      >      >      >>
     >      >                  >      >      >      >>> When we
    discussed this
     >     in Aug
     >      >                 2017, the understanding
     >      >                  >      >     was that 2
     >      >                  >      >      >      >>> Phase commit
    utility in
     >     Flink
     >      >                 used to implement
     >      >                  >     Flink's
     >      >                  >      >     Kafka EOS
     >      >                  >      >      >      >>> could not be
    implemented in
     >      >                 Beam's context.
     >      >                  >      >      >      >>
     >      >                  >      >      >      >> That's also my
     >     understanding,
     >      >                 unless we change the
     >      >                  >     interface.
     >      >                  >      >      >      >>
     >      >                  >      >      >      >>> I don't see how
    SDF solves
     >      >                 this problem..
     >      >                  >      >      >      >>
     >      >                  >      >      >      >> SDF has a
    checkpoint method
     >      >                 which the Runner can call,
     >      >                  >      >     but I think
     >      >                  >      >      >      >> that you are
    right, that the
     >      >                 above problem would
     >      >                  >     be the same.
     >      >                  >      >      >      >>
     >      >                  >      >      >      >>> Absolutely. I
    would love to
     >      >                 support EOS in KakaIO for
     >      >                  >      >     Flink. I
     >      >                  >      >      >     think
     >      >                  >      >      >      >>> that will help
    many future
     >      >                 exactly-once sinks..
     >      >                  >     and address
     >      >                  >      >      >      >>> fundamental
    incompatibility
     >      >                 between Beam model
     >      >                  >     and Flink's
     >      >                  >      >      >     horizontal
     >      >                  >      >      >      >>> checkpointing
    for such
     >      >                 applications.
     >      >                  >      >      >      >>
     >      >                  >      >      >      >> Great :)
     >      >                  >      >      >      >>
     >      >                  >      >      >      >>> The FlinkRunner
    would
     >     need to
     >      >                 insert the "wait until
     >      >                  >      >     checkpoint
     >      >                  >      >      >      >>> finalization" logic
     >     wherever
     >      >                 it sees
     >      >                  >     @RequiresStableInput,
     >      >                  >      >      >     which is
     >      >                  >      >      >      >>> already what it
    would
     >     have to do.
     >      >                  >      >      >      >>
     >      >                  >      >      >      >> I don't think
    that fixes the
     >      >                 problem. See above
     >      >                  >     example.
     >      >                  >      >      >      >>
     >      >                  >      >      >      >> Thanks,
     >      >                  >      >      >      >> Max
     >      >                  >      >      >      >>
     >      >                  >      >      >      >> On 01.03.19
    00:04, Raghu
     >     Angadi
     >      >                 wrote:
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>> On Thu, Feb 28,
    2019 at
     >     2:42
     >      >                 PM Raghu Angadi
     >      >                  >      >     <ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>
     >      >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>>
>      >                  >      >      >  <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>
     >      >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>>>
     >      >                  >      >      >      >>>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>
     >      >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>
     >      >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>>
     >      >                  >      >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>
     >      >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>>>>> wrote:
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>     On Thu, Feb
    28, 2019 at
     >      >                 2:34 PM Kenneth Knowles
     >      >                  >      >      >     <k...@apache.org
    <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>>>
     >      >                  >     <mailto:k...@apache.org
    <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>
    <mailto:k...@apache.org <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>>>>
     >      >                  >      >     <mailto:k...@apache.org
    <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>>>
     >      >                  >     <mailto:k...@apache.org
    <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>
    <mailto:k...@apache.org <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>>>>>
     >      >                  >      >      >      >>>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>>
     >      >                  >     <mailto:k...@apache.org
    <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>
    <mailto:k...@apache.org <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>>
     >      >                  >     <mailto:k...@apache.org
    <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>
    <mailto:k...@apache.org <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>>>>
     >      >                  >      >     <mailto:k...@apache.org
    <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>>>
     >      >                  >     <mailto:k...@apache.org
    <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>
    <mailto:k...@apache.org <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>>>>>>> wrote:
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>         I'm not
    sure what a
     >      >                 hard fail is. I probably
     >      >                  >      >     have a shallow
>      >                  >      >      >      >>> understanding,
     >     but doesn't
     >      >                  >     @RequiresStableInput work
     >      >                  >      >      >     for 2PC?
     >      >                  >      >      >      >>>         The
    preCommit()
     >     phase
     >      >                 should establish the
     >      >                  >      >     transaction and
     >      >                  >      >      >      >>>         commit()
    is not
     >     called
     >      >                 until after checkpoint
     >      >                  >      >      >     finalization. Can
     >      >                  >      >      >      >>>         you describe
     >     the way
     >      >                 that it does not work a
     >      >                  >      >     little bit
     >      >                  >      >      >     more?
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>     -
    preCommit() is called
     >      >                 before checkpoint.
     >      >                  >     Kafka EOS in
     >      >                  >      >      >     Flink starts
     >      >                  >      >      >      >>>     the
    transaction before
     >      >                 this and makes sure it
     >      >                  >      >     flushes all
     >      >                  >      >      >     records in
     >      >                  >      >      >      >>>     preCommit().
    So far
     >     good.
     >      >                  >      >      >      >>>     - commit is
    called
     >     after
     >      >                 checkpoint is persisted.
     >      >                  >      >     Now, imagine
     >      >                  >      >      >      >>>     commit()
    fails for some
     >      >                 reason. There is no
     >      >                  >     option
     >      >                  >      >     to rerun
     >      >                  >      >      >     the 1st
     >      >                  >      >      >      >>>     phase to
    write the
     >     records
     >      >                 again in a new
     >      >                  >      >     transaction. This
     >      >                  >      >      >     is a
     >      >                  >      >      >      >>>     hard failure for
     >     the the
     >      >                 job. In practice
     >      >                  >     Flink might
     >      >                  >      >      >     attempt to
     >      >                  >      >      >      >>>     commit again
    (not
     >     sure how
     >      >                 many times), which is
     >      >                  >      >     likely to
     >      >                  >      >      >     fail and
     >      >                  >      >      >      >>>     eventually
    results
     >     in job
     >      >                 failure.
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>> In Apache Beam,
    the records
     >      >                 could be stored in state,
     >      >                  >      >     and can be
     >      >                  >      >      >      >>> written inside
    commit() to
     >      >                 work around this issue. It
     >      >                  >      >     could have
     >      >                  >      >      >      >>> scalability
    issues if
     >      >                 checkpoints are not frequent
     >      >                  >      >     enough in Flink
     >      >                  >      >      >      >>> runner.
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>> Raghu.
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>         Kenn
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>         On Thu,
    Feb 28,
     >     2019
     >      >                 at 1:25 PM Raghu Angadi
     >      >                  >      >      >     <ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>
     >      >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>>
     >      >                  >      >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>
     >      >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>>>
     >      >                  >      >      >      >>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
     >      >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>
     >      >                  >      >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>>
     >      >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>
     >      >                  >      >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>>>>> wrote:
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>             On
    Thu, Feb 28,
     >      >                 2019 at 11:01 AM
     >      >                  >     Kenneth Knowles
     >      >                  >      >      >      >>>
     >     <k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>>
     >      >                  >     <mailto:k...@apache.org
    <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>
    <mailto:k...@apache.org <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>>
     >      >                  >     <mailto:k...@apache.org
    <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>
    <mailto:k...@apache.org <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>>>>
     >      >                  >      >     <mailto:k...@apache.org
    <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>>>
     >      >                  >     <mailto:k...@apache.org
    <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>
    <mailto:k...@apache.org <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>>>>>
>      >                  >      >      >  <mailto:k...@apache.org <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>>>
     >      >                  >     <mailto:k...@apache.org
    <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>
    <mailto:k...@apache.org <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>>>>
     >      >                  >      >     <mailto:k...@apache.org
    <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>>>
     >      >                  >     <mailto:k...@apache.org
    <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>
    <mailto:k...@apache.org <mailto:k...@apache.org>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>>>
     >      >                 <mailto:k...@apache.org
    <mailto:k...@apache.org> <mailto:k...@apache.org
    <mailto:k...@apache.org>>
     >     <mailto:k...@apache.org <mailto:k...@apache.org>
    <mailto:k...@apache.org <mailto:k...@apache.org>>>>>>>> wrote:
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>                 I
     >     believe the
     >      >                 way you would implement
     >      >                  >      >     the logic
     >      >                  >      >      >     behind
>      >                  >      >      >      >>> Flink's
     >      >                 KafkaProducer would be to
     >      >                  >     have
     >      >                  >      >     two steps:
     >      >                  >      >      >      >>>
>      >                  >      >      >      >>> 1. Start
     >      >                 transaction
     >      >                  >      >      >      >>>
     >      >                 2. @RequiresStableInput Close
     >      >                  >     transaction
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>             I
    see.  What
     >      >                 happens if closing the
     >      >                  >     transaction
     >      >                  >      >      >     fails in
     >      >                  >      >      >      >>>             (2)?
     >     Flink's 2PC
     >      >                 requires that
     >      >                  >     commit() should
     >      >                  >      >      >     never hard
     >      >                  >      >      >      >>>             fail
    once
     >      >                 preCommit() succeeds. I
     >      >                  >     think that is
     >      >                  >      >      >     cost of not
>      >                  >      >      >      >>> having an extra
     >      >                 shuffle. It is
     >      >                  >     alright since
     >      >                  >      >     this
     >      >                  >      >      >     policy has
>      >                  >      >      >      >>> worked well for
     >      >                 Flink so far.
     >      >                  >      >      >      >>>
>      >                  >      >      >      >>> Overall, it
     >     will
     >      >                 be great to have
     >      >                  >      >     @RequiresStableInput
>      >                  >      >      >      >>> support in
     >     Flink
     >      >                 runner.
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>             Raghu.
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>                 The
     >      >                 FlinkRunner would need to
     >      >                  >     insert the
     >      >                  >      >     "wait
     >      >                  >      >      >     until
>      >                  >      >      >      >>> checkpoint
     >      >                 finalization" logic
     >      >                  >     wherever it
     >      >                  >      >      >      >>>
     >      >                 sees @RequiresStableInput, which is
     >      >                  >      >     already what it
>      >                  >      >      >      >>> would
     >     have to do.
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>                 This
     >     matches
     >      >                 the KafkaProducer's
     >      >                  >     logic -
     >      >                  >      >     delay
     >      >                  >      >      >     closing
     >      >                  >      >      >      >>>                 the
     >      >                 transaction until checkpoint
     >      >                  >      >     finalization. This
>      >                  >      >      >      >>> answers my
     >      >                 main question, which
     >      >                  >     is "is
     >      >                  >      >      >      >>>
     >      >                 @RequiresStableInput expressive
     >      >                  >     enough
     >      >                  >      >     to allow
     >      >                  >      >      >      >>>
     >     Beam-on-Flink
     >      >                 to have exactly
     >      >                  >     once behavior
     >      >                  >      >      >     with the
     >      >                  >      >      >      >>>                 same
     >      >                 performance characteristics as
     >      >                  >      >     native Flink
>      >                  >      >      >      >>> checkpoint
     >      >                 finalization?"
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>                 Kenn
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>                 [1]
     >      >                  > https://github.com/apache/beam/pull/7955
     >      >                  >      >      >      >>>
>      >                  >      >      >      >>> On Thu, Feb
     >      >                 28, 2019 at 10:43 AM
     >      >                  >     Reuven Lax
     >      >                  >      >      >      >>>
     >      >                 <re...@google.com
    <mailto:re...@google.com> <mailto:re...@google.com
    <mailto:re...@google.com>>
     >     <mailto:re...@google.com <mailto:re...@google.com>
    <mailto:re...@google.com <mailto:re...@google.com>>>
     >      >                  >     <mailto:re...@google.com
    <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>
    <mailto:re...@google.com <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>>>
     >      >                  >      >     <mailto:re...@google.com
    <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>
     >      >                 <mailto:re...@google.com
    <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>>
    <mailto:re...@google.com <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>
     >      >                 <mailto:re...@google.com
    <mailto:re...@google.com> <mailto:re...@google.com
    <mailto:re...@google.com>>>>>
     >      >                  >     <mailto:re...@google.com
    <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>
     >      >                 <mailto:re...@google.com
    <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>>
    <mailto:re...@google.com <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>
     >      >                 <mailto:re...@google.com
    <mailto:re...@google.com> <mailto:re...@google.com
    <mailto:re...@google.com>>>>
     >      >                  >      >     <mailto:re...@google.com
    <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>
     >      >                 <mailto:re...@google.com
    <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>>
    <mailto:re...@google.com <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>
     >      >                 <mailto:re...@google.com
    <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>>>>>
>      >                  >      >      >  <mailto:re...@google.com <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>
     >      >                 <mailto:re...@google.com
    <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>>
    <mailto:re...@google.com <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>
     >      >                 <mailto:re...@google.com
    <mailto:re...@google.com> <mailto:re...@google.com
    <mailto:re...@google.com>>>>
     >      >                  >     <mailto:re...@google.com
    <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>
     >      >                 <mailto:re...@google.com
    <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>>
    <mailto:re...@google.com <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>
     >      >                 <mailto:re...@google.com
    <mailto:re...@google.com> <mailto:re...@google.com
    <mailto:re...@google.com>>>>>
     >      >                  >      >     <mailto:re...@google.com
    <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>
     >      >                 <mailto:re...@google.com
    <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>>
    <mailto:re...@google.com <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>
     >      >                 <mailto:re...@google.com
    <mailto:re...@google.com> <mailto:re...@google.com
    <mailto:re...@google.com>>>>
     >      >                  >     <mailto:re...@google.com
    <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>
     >      >                 <mailto:re...@google.com
    <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>>
    <mailto:re...@google.com <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>
     >      >                 <mailto:re...@google.com
    <mailto:re...@google.com>
     >     <mailto:re...@google.com <mailto:re...@google.com>>>>>>>> wrote:
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
                         On Thu,
     >      >                 Feb 28, 2019 at 10:41 AM
     >      >                  >      >     Raghu Angadi
     >      >                  >      >      >      >>>
     >      >                 <ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
     >      >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>
     >      >                  >      >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>>
     >      >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>
     >      >                  >      >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>>>
>      >                  >      >      >  <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>
     >      >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>>
     >      >                  >      >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>>>>
     >      >                  >     <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
    <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
     >      >                 <mailto:ang...@gmail.com
    <mailto:ang...@gmail.com>
     >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>>>>> wrote:
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                 Now why does the Flink
     >      >                  >      >     Runner not
     >      >                  >      >      >     support
     >      >                  >      >      >      >>>
     >      >                 KafkaIO EOS? Flink's
     >      >                  >     native
     >      >                  >      >      >      >>>
     >      >                 KafkaProducer supports
     >      >                  >      >     exactly-once. It
     >      >                  >      >      >      >>>
     >      >                 simply commits the
     >      >                  >     pending
     >      >                  >      >      >      >>>
     >      >                 transaction once it has
     >      >                  >      >     completed a
     >      >                  >      >      >      >>> checkpoint.
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
                             On
     >      >                 Thu, Feb 28, 2019 at
     >      >                  >     9:59 AM
     >      >                  >      >     Maximilian
     >      >                  >      >      >      >>>
     >      >                 Michels <m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >      >                  >     <mailto:m...@apache.org
    <mailto:m...@apache.org>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>>
    <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>>
     >      >                  >      >     <mailto:m...@apache.org
    <mailto:m...@apache.org>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>>
     >      >                 <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>
     >      >                 <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org
    <mailto:m...@apache.org>>>>>
>      >                  >      >      >  <mailto:m...@apache.org <mailto:m...@apache.org>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>>
     >      >                 <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>
     >      >                 <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org
    <mailto:m...@apache.org>>>>
     >      >                  >     <mailto:m...@apache.org
    <mailto:m...@apache.org>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>>
    <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >      >                 <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>>>>
     >      >                  >      >     <mailto:m...@apache.org
    <mailto:m...@apache.org>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>>
     >      >                 <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>
     >      >                 <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org
    <mailto:m...@apache.org>>>>
     >      >                  >     <mailto:m...@apache.org
    <mailto:m...@apache.org>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>>
    <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >      >                 <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>>>
     >      >                  >      >     <mailto:m...@apache.org
    <mailto:m...@apache.org>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>>
     >      >                 <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>
     >      >                 <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org
    <mailto:m...@apache.org>>>>
     >      >                  >     <mailto:m...@apache.org
    <mailto:m...@apache.org>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>>
    <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >      >                 <mailto:m...@apache.org
    <mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>>>>>>
     >      >                  >      >      >      >>>
     >     wrote:
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >                                  Hi,
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >                                  I
     >      >                 came across
     >      >                  >     KafkaIO's Runner
     >      >                  >      >      >     whitelist [1]
     >      >                  >      >      >      >>>
     >      >                 for enabling exactly-once
     >      >                  >      >      >      >>>
     >      >                 semantics (EOS). I
     >      >                  >     think it is
     >      >                  >      >      >     questionable
     >      >                  >      >      >      >>>
     >                                  to
     >      >                 exclude Runners from
     >      >                  >      >      >      >>>
     >      >                 inside a transform, but I
     >      >                  >      >     see that the
     >      >                  >      >      >      >>>
     >      >                 intention was to save
     >      >                  >     users from
     >      >                  >      >      >      >>>
     >      >                 surprises.
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                 Now why does the Flink
     >      >                  >      >     Runner not
     >      >                  >      >      >     support
     >      >                  >      >      >      >>>
     >      >                 KafkaIO EOS? Flink's
     >      >                  >     native
     >      >                  >      >      >      >>>
     >      >                 KafkaProducer supports
     >      >                  >      >     exactly-once. It
     >      >                  >      >      >      >>>
     >      >                 simply commits the
     >      >                  >     pending
     >      >                  >      >      >      >>>
     >      >                 transaction once it has
     >      >                  >      >     completed a
     >      >                  >      >      >      >>> checkpoint.
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >     When
     >      >                 we discussed this in Aug
     >      >                  >      >     2017, the
     >      >                  >      >      >      >>>
     >      >                 understanding was that 2
     >      >                  >     Phase
     >      >                  >      >     commit
     >      >                  >      >      >     utility in
     >      >                  >      >      >      >>>
     >     Flink
     >      >                 used to implement
     >      >                  >     Flink's
     >      >                  >      >     Kafka
     >      >                  >      >      >     EOS could
     >      >                  >      >      >      >>>
     >     not be
     >      >                 implemented in Beam's
     >      >                  >      >     context.
     >      >                  >      >      >      >>>
                             See
     >      >                 this message
     >      >                  >      >      >      >>>
     >      >                  >      >
     >      >                  >
     >      >
>  <https://www.mail-archive.com/dev@beam.apache.org/msg02664.html> in
     >      >                  >      >      >      >>>
     >     that
     >      >                 dev thread. Has anything
     >      >                  >      >     changed
     >      >                  >      >      >     in this
     >      >                  >      >      >      >>>
     >      >                 regard? The whole thread is
     >      >                  >      >     relevant to
     >      >                  >      >      >     this
     >      >                  >      >      >      >>>
     >     topic
     >      >                 and worth going
     >      >                  >     through.
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
                         I think
     >      >                 that TwoPhaseCommit
     >      >                  >     utility
     >      >                  >      >     class
     >      >                  >      >      >     wouldn't
     >      >                  >      >      >      >>>
     >     work. The
     >      >                 Flink runner would
     >      >                  >      >     probably want to
     >      >                  >      >      >      >>>
     >     directly use
     >      >                  >     notifySnapshotComplete
     >      >                  >      >     in order to
     >      >                  >      >      >      >>>
     >      >                 implement @RequiresStableInput.
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >                                  A
     >      >                 checkpoint is
     >      >                  >     realized by
     >      >                  >      >     sending
     >      >                  >      >      >     barriers
     >      >                  >      >      >      >>>
     >      >                 through all channels
     >      >                  >      >      >      >>>
     >      >                 starting from the
     >      >                  >     source until
     >      >                  >      >      >     reaching all
     >      >                  >      >      >      >>>
     >      >                 sinks. Every operator
     >      >                  >      >      >      >>>
     >      >                 persists its state
     >      >                  >     once it has
     >      >                  >      >      >     received a
     >      >                  >      >      >      >>>
     >      >                 barrier on all its input
     >      >                  >      >      >      >>>
     >      >                 channels, it then
     >      >                  >     forwards
     >      >                  >      >     it to the
     >      >                  >      >      >      >>>
     >      >                 downstream operators.
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                 The architecture of
     >      >                  >     Beam's
     >      >                  >      >      >      >>>
     >      >                 KafkaExactlyOnceSink
     >      >                  >     is as
     >      >                  >      >     follows[2]:
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                 Input ->
     >      >                  >     AssignRandomShardIds ->
     >      >                  >      >      >     GroupByKey
     >      >                  >      >      >      >>>
     >                                  ->
     >      >                 AssignSequenceIds ->
     >      >                  >      >      >      >>>
     >      >                 GroupByKey ->
     >      >                  >     ExactlyOnceWriter
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >                                  As
     >      >                 I understood, Spark or
     >      >                  >      >     Dataflow
     >      >                  >      >      >     use the
     >      >                  >      >      >      >>>
     >      >                 GroupByKey stages to
     >      >                  >     persist
     >      >                  >      >      >      >>>
     >      >                 the input. That is not
     >      >                  >      >     required in
     >      >                  >      >      >     Flink to
     >      >                  >      >      >      >>>
     >                                  be
     >      >                 able to take a
     >      >                  >     consistent
     >      >                  >      >      >      >>>
     >      >                 snapshot of the pipeline.
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                 Basically, for Flink we
     >      >                  >      >     don't need
     >      >                  >      >      >     any of
     >      >                  >      >      >      >>>
     >      >                 that magic that
     >      >                  >     KafkaIO does.
     >      >                  >      >      >      >>>
     >      >                 What we would need to
     >      >                  >      >     support EOS
     >      >                  >      >      >     is a way
     >      >                  >      >      >      >>>
     >                                  to
     >      >                 tell the
     >      >                  >     ExactlyOnceWriter
     >      >                  >      >      >      >>>
     >                                  (a
     >      >                 DoFn) to commit once a
     >      >                  >      >      >     checkpoint has
     >      >                  >      >      >      >>>
     >      >                 completed.
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >                                  I
     >      >                 know that the new
     >      >                  >     version
     >      >                  >      >     of SDF
     >      >                  >      >      >     supports
     >      >                  >      >      >      >>>
     >      >                 checkpointing which
     >      >                  >     should
     >      >                  >      >      >      >>>
     >      >                 solve this issue. But
     >      >                  >     there is
     >      >                  >      >      >     still a lot
     >      >                  >      >      >      >>>
     >                                  of
     >      >                 work to do to make
     >      >                  >     this
     >      >                  >      >      >      >>>
     >      >                 reality.
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
                             I
     >      >                 don't see how SDF
     >      >                  >     solves this
     >      >                  >      >      >     problem.. May be
     >      >                  >      >      >      >>>
     >     pseudo
     >      >                 code would make more
     >      >                  >      >     clear.  But if
     >      >                  >      >      >      >>>
     >     helps,
     >      >                 that is great!
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >                                  So
     >      >                 I think it would make
     >      >                  >      >     sense to think
     >      >                  >      >      >      >>>
     >      >                 about a way to make
     >      >                  >     KafkaIO's
     >      >                  >      >      >      >>>
     >      >                 EOS more accessible
     >      >                  >     to Runners
     >      >                  >      >      >     which support
     >      >                  >      >      >      >>>
     >                                  a
     >      >                 different way of
     >      >                  >      >      >      >>>
     >      >                 checkpointing.
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                 Absolutely. I would love to
     >      >                  >      >     support EOS in
     >      >                  >      >      >      >>>
     >     KakaIO
     >      >                 for Flink. I think
     >      >                  >     that will
     >      >                  >      >      >     help many
     >      >                  >      >      >      >>>
     >     future
     >      >                 exactly-once
     >      >                  >     sinks.. and
     >      >                  >      >     address
     >      >                  >      >      >      >>>
     >      >                 fundamental
     >      >                  >     incompatibility between
     >      >                  >      >      >     Beam model
     >      >                  >      >      >      >>>
                             and
     >      >                 Flink's horizontal
     >      >                  >     checkpointing
     >      >                  >      >      >     for such
     >      >                  >      >      >      >>>
     >      >                 applications.
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >     Raghu.
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                 Cheers,
     >      >                  >      >      >      >>>
     >                                  Max
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                 PS: I found this
     >      >                  >     document about
     >      >                  >      >      >      >>>
     >      >                 RequiresStableInput
     >      >                  >     [3], but
     >      >                  >      >     IMHO
     >      >                  >      >      >      >>>
     >      >                 defining an
     >      >                  >     annotation only
     >      >                  >      >      >     manifests the
     >      >                  >      >      >      >>>
     >      >                 conceptual difference
     >      >                  >     between
     >      >                  >      >      >      >>>
     >      >                 the Runners.
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >                                  [1]
     >      >                  >      >      >      >>>
     >      >                  >      >      >
     >      >                  >      >
     >      >                  >
     >      >
     >
    
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
     >      >                  >      >      >
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >                                  [2]
     >      >                  >      >      >      >>>
     >      >                  >      >      >
     >      >                  >      >
     >      >                  >
     >      >
     >
    
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
     >      >                  >      >      >
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >                                  [3]
     >      >                  >      >      >      >>>
     >      >                  >      >      >
     >      >                  >      >
     >      >                  >
     >      >
     >
    
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
     >      >                  >      >      >
     >      >                  >      >      >      >>>
     >      >                  >      >      >      >>>
     >      >                  >      >      >
     >      >                  >      >
     >      >                  >
     >      >
     >

Reply via email to