We cannot reason about correct exactly-once behavior of a transform
without understanding how state management and fault-tolerance in the
runner work.
Max pinged me this link to the Kafka EOS logic [1]. It uses a state
variable to find out what was already written. That state variable would
be part of a future Flink checkpoint. If after a failure we revert to
the previous checkpoint, it won't help to discover/skip duplicates?
The general problem is that we are trying to rely on state in two
different places to achieve EOS. This blog
<https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/>
[2] describes how Kafka streams can provide the exactly-once guarantee,
by using only Kafka as transactional resource (and committing all
changes in a single TX). Everything else would require a distributed
transaction coordinator (expensive) or a retry with duplicate detection
mechanism in the external system (like check if record/reference was
already written to Kafka, JDBC etc. or for file system, check if the
file that would result from atomic rename already exists).
Thomas
[1]
https://github.com/apache/beam/blob/99d5d9138acbf9e5b87e7068183c5fd27448043e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L329
[2]
https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
On Mon, Mar 11, 2019 at 7:54 AM Maximilian Michels <m...@apache.org
<mailto:m...@apache.org>> wrote:
This is not really about barriers, those are an implementation detail.
If a transform is annotated with @RequiresStableInput, no data will be
processed by this transform until a complete checkpoint has been taken.
After checkpoint completion, the elements will be processed. In case of
any failures, the checkpoint will be restored and the elements will be
processed again. This requires idempotent writes. KafkaIO's EOS mode
does that by ignoring all elements which are already part of a commit.
-Max
On 11.03.19 15:15, Thomas Weise wrote:
> So all records between 2 checkpoint barriers will be buffered and on
> checkpoint complete notification sent in a single transaction to
Kafka?
>
> The next question then is what happens if the Kafka transaction
does not
> complete (and checkpoint complete callback fails)? Will the
callback be
> repeated after Flink recovers?
>
>
> On Mon, Mar 11, 2019 at 3:02 AM Maximilian Michels
<m...@apache.org <mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>
> > But there is still the possibility that we fail to flush the
> buffer after the checkpoint is complete (data loss)?
>
> Since we have already checkpointed the buffered data we can retry
> flushing it in case of failures. We may emit elements
multiple times
> but
> that is because the Kafka EOS sink will skip records which
are already
> part of a committed transaction.
>
> -Max
>
> On 06.03.19 19:28, Thomas Weise wrote:
> > A fair amount of work for true true exactly once output
was done in
> > Apex. Different from almost exactly-once :)
> >
> > The takeaway was that the mechanism to achieve it depends
on the
> > external system. The implementation looks different for
let's say
> a file
> > sink or JDBC or Kafka.
> >
> > Apex had an exactly-once producer before Kafka supported
> transactions.
> > That producer relied on the ability to discover what was
already
> written
> > to Kafka upon recovery from failure. Why?
> >
> > Runners are not distributed transaction coordinators and no
> matter how
> > we write the code, there is always the small possibility
that one
> of two
> > resources fails to commit, resulting in either data loss or
> duplicates.
> > The Kafka EOS was a hybrid of producer and consumer, the
consumer
> part
> > used during recovery to find out what was already produced
> previously.
> >
> > Flink and Apex have very similar checkpointing model,
that's why
> this
> > thread caught my attention. Within the topology/runner,
> exactly-once is
> > achieved by replay having the same effect. For sinks, it
needs to
> rely
> > on the capabilities of the respective system (like atomic
rename for
> > file sink, or transaction with metadata table for JDBC).
> >
> > The buffering until checkpoint is complete is a mechanism
to get
> away
> > from sink specific implementations. It comes with the latency
> penalty
> > (memory overhead could be solved with a write ahead log). But
> there is
> > still the possibility that we fail to flush the buffer
after the
> > checkpoint is complete (data loss)?
> >
> > Thanks,
> > Thomas
> >
> >
> > On Wed, Mar 6, 2019 at 9:37 AM Kenneth Knowles
<k...@apache.org <mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>
> > <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>>> wrote:
> >
> > On Tue, Mar 5, 2019 at 10:02 PM Raghu Angadi
> <ang...@gmail.com <mailto:ang...@gmail.com>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>> wrote:
> >
> >
> >
> > On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax
> <re...@google.com <mailto:re...@google.com>
<mailto:re...@google.com <mailto:re...@google.com>>
> > <mailto:re...@google.com <mailto:re...@google.com>
<mailto:re...@google.com <mailto:re...@google.com>>>> wrote:
> >
> > RE: Kenn's suggestion. i think Raghu looked into
> something
> > that, and something about it didn't work. I don't
> remember
> > all the details, but I think there might have
been some
> > subtle problem with it that wasn't obvious.
Doesn't mean
> > that there isn't another way to solve that issue.'
> >
> >
> > Two disadvantages:
> > - A transaction in Kafka are tied to single producer
> instance.
> > There is no official API to start a txn in one
process and
> > access it in another process. Flink's sink uses an
> internal REST
> > API for this.
> >
> >
> > Can you say more about how this works?
> >
> > - There is one failure case that I mentioned
earlier: if
> closing
> > the transaction in downstream transform fails, it
is data
> loss,
> > there is no way to replay the upstream transform that
> wrote the
> > records to Kafka.
> >
> >
> > With coupling of unrelated failures due to fusion,
this is a
> severe
> > problem. I think I see now how 2PC affects this. From my
> reading, I
> > can't see the difference in how Flink works. If the
checkpoint
> > finalization callback that does the Kafka commit
fails, does it
> > invalidate the checkpoint so the start transaction + write
> elements
> > is retried?
> >
> > Kenn
> >
> >
> > GBKs don't have major scalability limitations in
most runner.
> > Extra GBK is fine in practice for such a sink (at
least
> no one
> > has complained about it yet, though I don't know
real usage
> > numbers in practice). Flink's implentation in Beam
> > using @RequiresStableInput does have storage
> requirements and
> > latency costs that increase with checkpoint
interval. I
> think is
> > still just as useful. Good to see @RequiresStableInput
> support
> > added to Flink runner in Max's PR.
> >
> >
> > Hopefully we can make that work. Another
possibility
> if we
> > can't is to do something special for Flink.
Beam allows
> > runners to splice out well-known transforms with
> their own
> > implementation. Dataflow already does that for
Google
> Cloud
> > Pub/Sub sources/sinks. The Flink runner could
splice
> out the
> > Kafka sink with one that uses Flink-specific
> functionality.
> > Ideally this would reuse most of the existing
Kafka code
> > (maybe we could refactor just the EOS part
into something
> > that could be subbed out).
> >
> > Reuven
> >
> > On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels
> > <m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
> >
> > > It would be interesting to see if there's
> something
> > we could add to the Beam model that would
create a
> > better story for Kafka's EOS writes.
> >
> > There would have to be a checkpoint-completed
> callback
> > the DoFn can
> > register with the Runner. Does not seem
> applicable for
> > most Runners though.
> >
> > > This is true, however isn't it already true
> for such
> > uses of Flink?
> >
> > Yes, that's correct. In the case of Kafka,
Flink can
> > offload the
> > buffering but for the general case,
idempotent writes
> > are only possible
> > if we buffer data until the checkpoint is
completed.
> >
> > On 04.03.19 17:45, Reuven Lax wrote:
> > >
> > >
> > > On Mon, Mar 4, 2019 at 6:55 AM
Maximilian Michels
> > <m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>
> > > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>>> wrote:
> > >
> > > > Can we do 2? I seem to remember
that we had
> > trouble in some cases
> > > (e..g in the BigQuery case, there
was no
> obvious
> > way to create a
> > > deterministic id, which is why we
went for a
> > random number followed
> > > by a reshuffle). Also remember that
the user
> > ParDo that is producing
> > > data to the sink is not guaranteed
to be
> > deterministic; the Beam
> > > model allows for non-deterministic
transforms.
> > >
> > > I believe we could use something
like the
> worker
> > id to make it
> > > deterministic, though the worker id
can change
> > after a restart. We
> > > could
> > > persist it in Flink's operator
state. I do not
> > know if we can come up
> > > with a Runner-independent solution.
> > >
> > >
> > > If we did this, we would break it on
runners that
> > don't have a concept
> > > of a stable worker id :( The Dataflow
runner
> can load
> > balance work at
> > > any time (including moving work around
between
> workers).
> > >
> > >
> > > > I'm not quite sure I understand.
If a
> ParDo is
> > marked with
> > > RequiresStableInput, can't the
flink runner
> > buffer the input message
> > > until after the checkpoint is
complete and
> only
> > then deliver it to
> > > the ParDo?
> > >
> > > You're correct. I thought that it could
> suffice
> > to only buffer during a
> > > checkpoint and otherwise rely on the
> > deterministic execution of the
> > > pipeline and KafkaIO's
de-duplication code.
> > >
> > >
> > > Yes, I want to distinguish the KafkaIO case
> from the
> > general case. It
> > > would be interesting to see if there's
> something we
> > could add to the
> > > Beam model that would create a better
story for
> > Kafka's EOS writes.
> > >
> > >
> > > In any case, emitting only after
> finalization of
> > checkpoints gives us
> > > guaranteed stable input. It also means
> that the
> > processing is tight to
> > > the checkpoint interval, the checkpoint
> duration,
> > and the available
> > > memory.
> > >
> > >
> > > This is true, however isn't it already true
> for such
> > uses of Flink?
> > >
> > >
> > > On 01.03.19 19:41, Reuven Lax wrote:
> > > >
> > > >
> > > > On Fri, Mar 1, 2019 at 10:37 AM
> Maximilian Michels
> > > <m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>>
> > > > <mailto:m...@apache.org
<mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>
<mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>>>> wrote:
> > > >
> > > > Fully agree. I think we can
improve the
> > situation
> > > drastically. For
> > > > KafkaIO EOS with Flink we
need to make
> > these two changes:
> > > >
> > > > 1) Introduce buffering while the
> > checkpoint is being taken
> > > > 2) Replace the random shard id
> assignment
> > with something
> > > deterministic
> > > >
> > > >
> > > > Can we do 2? I seem to remember
that we had
> > trouble in some cases
> > > (e..g
> > > > in the BigQuery case, there was no
> obvious way
> > to create a
> > > deterministic
> > > > id, which is why we went for a
random
> number
> > followed by a
> > > reshuffle).
> > > > Also remember that the user
ParDo that is
> > producing data to the
> > > sink is
> > > > not guaranteed to be
deterministic; the
> Beam
> > model allows for
> > > > non-deterministic transforms.
> > > >
> > > >
> > > > However, we won't be able to
> provide full
> > compatibility with
> > > > RequiresStableInput because
Flink only
> > guarantees stable
> > > input after a
> > > > checkpoint. RequiresStableInput
> requires
> > input at any point
> > > in time to
> > > > be stable.
> > > >
> > > >
> > > > I'm not quite sure I understand.
If a
> ParDo is
> > marked with
> > > > RequiresStableInput, can't the
flink runner
> > buffer the input message
> > > > until after the checkpoint is
complete and
> > only then deliver it
> > > to the
> > > > ParDo? This adds latency of
course, but I'm
> > not sure how else to do
> > > > things correctly with the Beam
model.
> > > >
> > > > IMHO the only way to achieve
that is
> > materializing output
> > > > which Flink does not
currently support.
> > > >
> > > > KafkaIO does not need all
the power of
> > RequiresStableInput to
> > > achieve
> > > > EOS with Flink, but for the
general
> case I
> > don't see a good
> > > solution at
> > > > the moment.
> > > >
> > > > -Max
> > > >
> > > > On 01.03.19 16:45, Reuven
Lax wrote:
> > > > > Yeah, the person who was
working
> on it
> > originally stopped
> > > working on
> > > > > Beam, and nobody else ever
> finished it.
> > I think it is
> > > important to
> > > > > finish though. Many of
the existing
> > Sinks are only fully
> > > correct for
> > > > > Dataflow today, because they
> generate
> > either Reshuffle or
> > > > GroupByKey to
> > > > > ensure input stability before
> > outputting (in many cases
> > > this code
> > > > was
> > > > > inherited from before Beam
> existed). On
> > Flink today, these
> > > sinks
> > > > might
> > > > > occasionally produce
duplicate
> output
> > in the case of failures.
> > > > >
> > > > > Reuven
> > > > >
> > > > > On Fri, Mar 1, 2019 at
7:18 AM
> > Maximilian Michels
> > > <m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>>
> > > > <mailto:m...@apache.org
<mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org
<mailto:m...@apache.org>>>>>
> > > > > <mailto:m...@apache.org
<mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org
<mailto:m...@apache.org>>>>
> > > <mailto:m...@apache.org
<mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>
<mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>>>>> wrote:
> > > > >
> > > > > Circling back to the
> > RequiresStableInput
> > > annotation[1]. I've
> > > > done some
> > > > > protoyping to see how
this
> could be
> > integrated into
> > > Flink. I'm
> > > > > currently
> > > > > writing a test based on
> > RequiresStableInput.
> > > > >
> > > > > I found out there are
already
> > checks in place at the
> > > Runners to
> > > > > throw in
> > > > > case transforms use
> > RequiresStableInput and its not
> > > > supported. However,
> > > > > not a single
transform actually
> > uses the annotation.
> > > > >
> > > > > It seems that the effort
> stopped at
> > some point? Would
> > > it make
> > > > sense to
> > > > > start annotating
> > KafkaExactlyOnceSink with
> > > > @RequiresStableInput? We
> > > > > could then get rid of the
> whitelist.
> > > > >
> > > > > -Max
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
>
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
> > > > >
> > > > >
> > > > >
> > > > > On 01.03.19 14:28,
Maximilian
> > Michels wrote:
> > > > > > Just realized that
> transactions
> > do not spawn multiple
> > > > elements in
> > > > > >
KafkaExactlyOnceSink. So the
> > proposed solution to stop
> > > > processing
> > > > > > elements while a
snapshot is
> > pending would work.
> > > > > >
> > > > > > It is certainly not
> optimal in
> > terms of performance for
> > > > Flink and
> > > > > poses
> > > > > > problems when
checkpoints
> take
> > long to complete, but it
> > > > would be
> > > > > > worthwhile to
implement
> this to
> > make use of the EOS
> > > feature.
> > > > > >
> > > > > > Thanks,
> > > > > > Max
> > > > > >
> > > > > > On 01.03.19 12:23,
Maximilian
> > Michels wrote:
> > > > > >> Thanks you for
the prompt
> > replies. It's great to
> > > see that
> > > > there is
> > > > > >> good understanding of
> how EOS
> > in Flink works.
> > > > > >>
> > > > > >>> This is exactly what
> > RequiresStableInput is
> > > supposed to
> > > > do. On the
> > > > > >>> Flink runner,
this would be
> > implemented by delaying
> > > > processing
> > > > > until
> > > > > >>> the current
checkpoint
> is done.
> > > > > >>
> > > > > >> I don't think
that works
> > because we have no
> > > control over
> > > > the Kafka
> > > > > >> transactions.
Imagine:
> > > > > >>
> > > > > >> 1)
ExactlyOnceWriter writes
> > records to Kafka and
> > > commits,
> > > > then
> > > > > starts
> > > > > >> a new transaction.
> > > > > >> 2) Flink checkpoints,
> delaying
> > the processing of
> > > > elements, the
> > > > > >> checkpoint fails.
> > > > > >> 3) We restore
from an old
> > checkpoint and will
> > > start writing
> > > > > duplicate
> > > > > >> data to Kafka. The
> > de-duplication that the sink
> > > performs
> > > > does not
> > > > > >> help, especially
because the
> > random shards ids
> > > might be
> > > > assigned
> > > > > >> differently.
> > > > > >>
> > > > > >> IMHO we have to
have control
> > over commit to be able to
> > > > provide EOS.
> > > > > >>
> > > > > >>> When we
discussed this
> in Aug
> > 2017, the understanding
> > > > was that 2
> > > > > >>> Phase commit
utility in
> Flink
> > used to implement
> > > Flink's
> > > > Kafka EOS
> > > > > >>> could not be
implemented in
> > Beam's context.
> > > > > >>
> > > > > >> That's also my
> understanding,
> > unless we change the
> > > interface.
> > > > > >>
> > > > > >>> I don't see how
SDF solves
> > this problem..
> > > > > >>
> > > > > >> SDF has a
checkpoint method
> > which the Runner can call,
> > > > but I think
> > > > > >> that you are
right, that the
> > above problem would
> > > be the same.
> > > > > >>
> > > > > >>> Absolutely. I
would love to
> > support EOS in KakaIO for
> > > > Flink. I
> > > > > think
> > > > > >>> that will help
many future
> > exactly-once sinks..
> > > and address
> > > > > >>> fundamental
incompatibility
> > between Beam model
> > > and Flink's
> > > > > horizontal
> > > > > >>> checkpointing
for such
> > applications.
> > > > > >>
> > > > > >> Great :)
> > > > > >>
> > > > > >>> The FlinkRunner
would
> need to
> > insert the "wait until
> > > > checkpoint
> > > > > >>> finalization" logic
> wherever
> > it sees
> > > @RequiresStableInput,
> > > > > which is
> > > > > >>> already what it
would
> have to do.
> > > > > >>
> > > > > >> I don't think
that fixes the
> > problem. See above
> > > example.
> > > > > >>
> > > > > >> Thanks,
> > > > > >> Max
> > > > > >>
> > > > > >> On 01.03.19
00:04, Raghu
> Angadi
> > wrote:
> > > > > >>>
> > > > > >>>
> > > > > >>> On Thu, Feb 28,
2019 at
> 2:42
> > PM Raghu Angadi
> > > > <ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>>
> > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>>>
> > > > >
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>>
> > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>>>
> > > > > >>>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>
> > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>
> > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>>
> > > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>>
> > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>>>>> wrote:
> > > > > >>>
> > > > > >>> On Thu, Feb
28, 2019 at
> > 2:34 PM Kenneth Knowles
> > > > > <k...@apache.org
<mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>>>
> > > <mailto:k...@apache.org
<mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>
<mailto:k...@apache.org <mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>>>>
> > > > <mailto:k...@apache.org
<mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>>>
> > > <mailto:k...@apache.org
<mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>
<mailto:k...@apache.org <mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>>>>>
> > > > > >>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>>
> > > <mailto:k...@apache.org
<mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>
<mailto:k...@apache.org <mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>>
> > > <mailto:k...@apache.org
<mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>
<mailto:k...@apache.org <mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>>>>
> > > > <mailto:k...@apache.org
<mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>>>
> > > <mailto:k...@apache.org
<mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>
<mailto:k...@apache.org <mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>>>>>>> wrote:
> > > > > >>>
> > > > > >>> I'm not
sure what a
> > hard fail is. I probably
> > > > have a shallow
> > > > > >>>
understanding,
> but doesn't
> > > @RequiresStableInput work
> > > > > for 2PC?
> > > > > >>> The
preCommit()
> phase
> > should establish the
> > > > transaction and
> > > > > >>> commit()
is not
> called
> > until after checkpoint
> > > > > finalization. Can
> > > > > >>> you describe
> the way
> > that it does not work a
> > > > little bit
> > > > > more?
> > > > > >>>
> > > > > >>>
> > > > > >>> -
preCommit() is called
> > before checkpoint.
> > > Kafka EOS in
> > > > > Flink starts
> > > > > >>> the
transaction before
> > this and makes sure it
> > > > flushes all
> > > > > records in
> > > > > >>> preCommit().
So far
> good.
> > > > > >>> - commit is
called
> after
> > checkpoint is persisted.
> > > > Now, imagine
> > > > > >>> commit()
fails for some
> > reason. There is no
> > > option
> > > > to rerun
> > > > > the 1st
> > > > > >>> phase to
write the
> records
> > again in a new
> > > > transaction. This
> > > > > is a
> > > > > >>> hard failure for
> the the
> > job. In practice
> > > Flink might
> > > > > attempt to
> > > > > >>> commit again
(not
> sure how
> > many times), which is
> > > > likely to
> > > > > fail and
> > > > > >>> eventually
results
> in job
> > failure.
> > > > > >>>
> > > > > >>>
> > > > > >>> In Apache Beam,
the records
> > could be stored in state,
> > > > and can be
> > > > > >>> written inside
commit() to
> > work around this issue. It
> > > > could have
> > > > > >>> scalability
issues if
> > checkpoints are not frequent
> > > > enough in Flink
> > > > > >>> runner.
> > > > > >>>
> > > > > >>> Raghu.
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>> Kenn
> > > > > >>>
> > > > > >>> On Thu,
Feb 28,
> 2019
> > at 1:25 PM Raghu Angadi
> > > > > <ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>>
> > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>>>
> > > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>>
> > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>>>
> > > > > >>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
> > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>
> > > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>>>
> > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>>
> > > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>>>>> wrote:
> > > > > >>>
> > > > > >>> On
Thu, Feb 28,
> > 2019 at 11:01 AM
> > > Kenneth Knowles
> > > > > >>>
> <k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>>
> > > <mailto:k...@apache.org
<mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>
<mailto:k...@apache.org <mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>>
> > > <mailto:k...@apache.org
<mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>
<mailto:k...@apache.org <mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>>>>
> > > > <mailto:k...@apache.org
<mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>>>
> > > <mailto:k...@apache.org
<mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>
<mailto:k...@apache.org <mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>>>>>
> > > > >
<mailto:k...@apache.org <mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>>>
> > > <mailto:k...@apache.org
<mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>
<mailto:k...@apache.org <mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>>>>
> > > > <mailto:k...@apache.org
<mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>>>
> > > <mailto:k...@apache.org
<mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>
<mailto:k...@apache.org <mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>>>>>>> wrote:
> > > > > >>>
> > > > > >>> I
> believe the
> > way you would implement
> > > > the logic
> > > > > behind
> > > > > >>>
Flink's
> > KafkaProducer would be to
> > > have
> > > > two steps:
> > > > > >>>
> > > > > >>>
1. Start
> > transaction
> > > > > >>>
> > 2. @RequiresStableInput Close
> > > transaction
> > > > > >>>
> > > > > >>>
> > > > > >>> I
see. What
> > happens if closing the
> > > transaction
> > > > > fails in
> > > > > >>> (2)?
> Flink's 2PC
> > requires that
> > > commit() should
> > > > > never hard
> > > > > >>> fail
once
> > preCommit() succeeds. I
> > > think that is
> > > > > cost of not
> > > > > >>>
having an extra
> > shuffle. It is
> > > alright since
> > > > this
> > > > > policy has
> > > > > >>>
worked well for
> > Flink so far.
> > > > > >>>
> > > > > >>>
Overall, it
> will
> > be great to have
> > > > @RequiresStableInput
> > > > > >>>
support in
> Flink
> > runner.
> > > > > >>>
> > > > > >>> Raghu.
> > > > > >>>
> > > > > >>> The
> > FlinkRunner would need to
> > > insert the
> > > > "wait
> > > > > until
> > > > > >>>
checkpoint
> > finalization" logic
> > > wherever it
> > > > > >>>
> > sees @RequiresStableInput, which is
> > > > already what it
> > > > > >>>
would
> have to do.
> > > > > >>>
> > > > > >>> This
> matches
> > the KafkaProducer's
> > > logic -
> > > > delay
> > > > > closing
> > > > > >>> the
> > transaction until checkpoint
> > > > finalization. This
> > > > > >>>
answers my
> > main question, which
> > > is "is
> > > > > >>>
> > @RequiresStableInput expressive
> > > enough
> > > > to allow
> > > > > >>>
> Beam-on-Flink
> > to have exactly
> > > once behavior
> > > > > with the
> > > > > >>> same
> > performance characteristics as
> > > > native Flink
> > > > > >>>
checkpoint
> > finalization?"
> > > > > >>>
> > > > > >>> Kenn
> > > > > >>>
> > > > > >>> [1]
> > > https://github.com/apache/beam/pull/7955
> > > > > >>>
> > > > > >>>
On Thu, Feb
> > 28, 2019 at 10:43 AM
> > > Reuven Lax
> > > > > >>>
> > <re...@google.com
<mailto:re...@google.com> <mailto:re...@google.com
<mailto:re...@google.com>>
> <mailto:re...@google.com <mailto:re...@google.com>
<mailto:re...@google.com <mailto:re...@google.com>>>
> > > <mailto:re...@google.com
<mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>
<mailto:re...@google.com <mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>>>
> > > > <mailto:re...@google.com
<mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>
> > <mailto:re...@google.com
<mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>>
<mailto:re...@google.com <mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>
> > <mailto:re...@google.com
<mailto:re...@google.com> <mailto:re...@google.com
<mailto:re...@google.com>>>>>
> > > <mailto:re...@google.com
<mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>
> > <mailto:re...@google.com
<mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>>
<mailto:re...@google.com <mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>
> > <mailto:re...@google.com
<mailto:re...@google.com> <mailto:re...@google.com
<mailto:re...@google.com>>>>
> > > > <mailto:re...@google.com
<mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>
> > <mailto:re...@google.com
<mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>>
<mailto:re...@google.com <mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>
> > <mailto:re...@google.com
<mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>>>>>
> > > > >
<mailto:re...@google.com <mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>
> > <mailto:re...@google.com
<mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>>
<mailto:re...@google.com <mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>
> > <mailto:re...@google.com
<mailto:re...@google.com> <mailto:re...@google.com
<mailto:re...@google.com>>>>
> > > <mailto:re...@google.com
<mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>
> > <mailto:re...@google.com
<mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>>
<mailto:re...@google.com <mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>
> > <mailto:re...@google.com
<mailto:re...@google.com> <mailto:re...@google.com
<mailto:re...@google.com>>>>>
> > > > <mailto:re...@google.com
<mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>
> > <mailto:re...@google.com
<mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>>
<mailto:re...@google.com <mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>
> > <mailto:re...@google.com
<mailto:re...@google.com> <mailto:re...@google.com
<mailto:re...@google.com>>>>
> > > <mailto:re...@google.com
<mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>
> > <mailto:re...@google.com
<mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>>
<mailto:re...@google.com <mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>
> > <mailto:re...@google.com
<mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>>>>>>> wrote:
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>>
On Thu,
> > Feb 28, 2019 at 10:41 AM
> > > > Raghu Angadi
> > > > > >>>
> > <ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
> > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>
> > > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>>>
> > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>>
> > > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>>>
> > > > >
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>>
> > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>>>
> > > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>>
> > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>>>>> wrote:
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > Now why does the Flink
> > > > Runner not
> > > > > support
> > > > > >>>
> > KafkaIO EOS? Flink's
> > > native
> > > > > >>>
> > KafkaProducer supports
> > > > exactly-once. It
> > > > > >>>
> > simply commits the
> > > pending
> > > > > >>>
> > transaction once it has
> > > > completed a
> > > > > >>> checkpoint.
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>>
On
> > Thu, Feb 28, 2019 at
> > > 9:59 AM
> > > > Maximilian
> > > > > >>>
> > Michels <m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>
> > > <mailto:m...@apache.org
<mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>
<mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>>
> > > > <mailto:m...@apache.org
<mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org
<mailto:m...@apache.org>>>>>
> > > > >
<mailto:m...@apache.org <mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org
<mailto:m...@apache.org>>>>
> > > <mailto:m...@apache.org
<mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>
<mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>>>>
> > > > <mailto:m...@apache.org
<mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org
<mailto:m...@apache.org>>>>
> > > <mailto:m...@apache.org
<mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>
<mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>>>
> > > > <mailto:m...@apache.org
<mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org
<mailto:m...@apache.org>>>>
> > > <mailto:m...@apache.org
<mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>
<mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>>>>>>
> > > > > >>>
> wrote:
> > > > > >>>
> > > > > >>>
> Hi,
> > > > > >>>
> > > > > >>>
> I
> > came across
> > > KafkaIO's Runner
> > > > > whitelist [1]
> > > > > >>>
> > for enabling exactly-once
> > > > > >>>
> > semantics (EOS). I
> > > think it is
> > > > > questionable
> > > > > >>>
> to
> > exclude Runners from
> > > > > >>>
> > inside a transform, but I
> > > > see that the
> > > > > >>>
> > intention was to save
> > > users from
> > > > > >>>
> > surprises.
> > > > > >>>
> > > > > >>>
> > Now why does the Flink
> > > > Runner not
> > > > > support
> > > > > >>>
> > KafkaIO EOS? Flink's
> > > native
> > > > > >>>
> > KafkaProducer supports
> > > > exactly-once. It
> > > > > >>>
> > simply commits the
> > > pending
> > > > > >>>
> > transaction once it has
> > > > completed a
> > > > > >>> checkpoint.
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>>
> When
> > we discussed this in Aug
> > > > 2017, the
> > > > > >>>
> > understanding was that 2
> > > Phase
> > > > commit
> > > > > utility in
> > > > > >>>
> Flink
> > used to implement
> > > Flink's
> > > > Kafka
> > > > > EOS could
> > > > > >>>
> not be
> > implemented in Beam's
> > > > context.
> > > > > >>>
See
> > this message
> > > > > >>>
> > > >
> > >
> >
>
<https://www.mail-archive.com/dev@beam.apache.org/msg02664.html> in
> > > > > >>>
> that
> > dev thread. Has anything
> > > > changed
> > > > > in this
> > > > > >>>
> > regard? The whole thread is
> > > > relevant to
> > > > > this
> > > > > >>>
> topic
> > and worth going
> > > through.
> > > > > >>>
> > > > > >>>
I think
> > that TwoPhaseCommit
> > > utility
> > > > class
> > > > > wouldn't
> > > > > >>>
> work. The
> > Flink runner would
> > > > probably want to
> > > > > >>>
> directly use
> > > notifySnapshotComplete
> > > > in order to
> > > > > >>>
> > implement @RequiresStableInput.
> > > > > >>>
> > > > > >>>
> > > > > >>>
> A
> > checkpoint is
> > > realized by
> > > > sending
> > > > > barriers
> > > > > >>>
> > through all channels
> > > > > >>>
> > starting from the
> > > source until
> > > > > reaching all
> > > > > >>>
> > sinks. Every operator
> > > > > >>>
> > persists its state
> > > once it has
> > > > > received a
> > > > > >>>
> > barrier on all its input
> > > > > >>>
> > channels, it then
> > > forwards
> > > > it to the
> > > > > >>>
> > downstream operators.
> > > > > >>>
> > > > > >>>
> > The architecture of
> > > Beam's
> > > > > >>>
> > KafkaExactlyOnceSink
> > > is as
> > > > follows[2]:
> > > > > >>>
> > > > > >>>
> > Input ->
> > > AssignRandomShardIds ->
> > > > > GroupByKey
> > > > > >>>
> ->
> > AssignSequenceIds ->
> > > > > >>>
> > GroupByKey ->
> > > ExactlyOnceWriter
> > > > > >>>
> > > > > >>>
> As
> > I understood, Spark or
> > > > Dataflow
> > > > > use the
> > > > > >>>
> > GroupByKey stages to
> > > persist
> > > > > >>>
> > the input. That is not
> > > > required in
> > > > > Flink to
> > > > > >>>
> be
> > able to take a
> > > consistent
> > > > > >>>
> > snapshot of the pipeline.
> > > > > >>>
> > > > > >>>
> > Basically, for Flink we
> > > > don't need
> > > > > any of
> > > > > >>>
> > that magic that
> > > KafkaIO does.
> > > > > >>>
> > What we would need to
> > > > support EOS
> > > > > is a way
> > > > > >>>
> to
> > tell the
> > > ExactlyOnceWriter
> > > > > >>>
> (a
> > DoFn) to commit once a
> > > > > checkpoint has
> > > > > >>>
> > completed.
> > > > > >>>
> > > > > >>>
> I
> > know that the new
> > > version
> > > > of SDF
> > > > > supports
> > > > > >>>
> > checkpointing which
> > > should
> > > > > >>>
> > solve this issue. But
> > > there is
> > > > > still a lot
> > > > > >>>
> of
> > work to do to make
> > > this
> > > > > >>>
> > reality.
> > > > > >>>
> > > > > >>>
> > > > > >>>
I
> > don't see how SDF
> > > solves this
> > > > > problem.. May be
> > > > > >>>
> pseudo
> > code would make more
> > > > clear. But if
> > > > > >>>
> helps,
> > that is great!
> > > > > >>>
> > > > > >>>
> So
> > I think it would make
> > > > sense to think
> > > > > >>>
> > about a way to make
> > > KafkaIO's
> > > > > >>>
> > EOS more accessible
> > > to Runners
> > > > > which support
> > > > > >>>
> a
> > different way of
> > > > > >>>
> > checkpointing.
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > Absolutely. I would love to
> > > > support EOS in
> > > > > >>>
> KakaIO
> > for Flink. I think
> > > that will
> > > > > help many
> > > > > >>>
> future
> > exactly-once
> > > sinks.. and
> > > > address
> > > > > >>>
> > fundamental
> > > incompatibility between
> > > > > Beam model
> > > > > >>>
and
> > Flink's horizontal
> > > checkpointing
> > > > > for such
> > > > > >>>
> > applications.
> > > > > >>>
> > > > > >>>
> Raghu.
> > > > > >>>
> > > > > >>>
> > Cheers,
> > > > > >>>
> Max
> > > > > >>>
> > > > > >>>
> > PS: I found this
> > > document about
> > > > > >>>
> > RequiresStableInput
> > > [3], but
> > > > IMHO
> > > > > >>>
> > defining an
> > > annotation only
> > > > > manifests the
> > > > > >>>
> > conceptual difference
> > > between
> > > > > >>>
> > the Runners.
> > > > > >>>
> > > > > >>>
> > > > > >>>
> [1]
> > > > > >>>
> > > > >
> > > >
> > >
> >
>
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
> > > > >
> > > > > >>>
> > > > > >>>
> [2]
> > > > > >>>
> > > > >
> > > >
> > >
> >
>
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
> > > > >
> > > > > >>>
> > > > > >>>
> [3]
> > > > > >>>
> > > > >
> > > >
> > >
> >
>
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
> > > > >
> > > > > >>>
> > > > > >>>
> > > > >
> > > >
> > >
> >
>