On Mon, Mar 4, 2019 at 9:18 AM Reuven Lax <[email protected]
<mailto:[email protected]>> wrote:
On Mon, Mar 4, 2019 at 9:04 AM Kenneth Knowles <[email protected]
<mailto:[email protected]>> wrote:
On Mon, Mar 4, 2019 at 7:16 AM Maximilian Michels
<[email protected] <mailto:[email protected]>> wrote:
> If you randomly generate shard ids, buffer those until
finalization, finalize a checkpoint so that you never need
to re-run that generation, isn't the result stable from that
point onwards?
Yes, you're right :) For @RequiresStableInput we will always
have to
buffer and emit only after a finalized checkpoint.
2PC is the better model for Flink, at least in the case of
Kafka because
it can offload the buffering to Kafka via its transactions.
RequiresStableInput is a more general solution and it is
feasible to
support it in the Flink Runner. However, we have to make
sure that
checkpoints are taken frequently to avoid too much memory
pressure.
It would be nice to also support 2PC in Beam, i.e. the
Runner could
choose to either buffer/materialize input or do a 2PC, but
it would also
break the purity of the existing model.
Still digging in to details. I think the "generate random shard
ids & buffer" is a tradition but more specific to BigQueryIO or
FileIO styles. It doesn't have to be done that way if the target
system has special support like Kafka does.
For Kafka, can you get the 2PC behavior like this: Upstream
step: open a transaction, write a bunch of stuff to it (let
Kafka do the buffering) and emit a transaction identifier.
Downstream @RequiresStableInput step: close transaction. Again,
I may be totally missing something, but I think that this has
identical characteristics:
Does Kafka garbage collect this eventually in the case where you
crash and start again with a different transaction identifier?
I believe that is what I read on the page about Flink's Kafka 2PC,
though I cannot find it any more. What would the alternative be for
Kafka? You always have to be ready for a client that goes away.
Kenn
- Kafka does the buffering
- checkpoint finalization is the driver of latency
- failure before checkpoint finalization means the old
transaction sits around and times out eventually
- failure after checkpoint finalization causes retry with the
same transaction identifier
Kenn
On 01.03.19 19:42, Kenneth Knowles wrote:
> I think I am fundamentally misunderstanding checkpointing
in Flink.
>
> If you randomly generate shard ids, buffer those until
finalization,
> finalize a checkpoint so that you never need to re-run
that generation,
> isn't the result stable from that point onwards?
>
> Kenn
>
> On Fri, Mar 1, 2019 at 10:30 AM Maximilian Michels
<[email protected] <mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>> wrote:
>
> Fully agree. I think we can improve the situation
drastically. For
> KafkaIO EOS with Flink we need to make these two changes:
>
> 1) Introduce buffering while the checkpoint is being
taken
> 2) Replace the random shard id assignment with
something deterministic
>
> However, we won't be able to provide full
compatibility with
> RequiresStableInput because Flink only guarantees
stable input after a
> checkpoint. RequiresStableInput requires input at any
point in time to
> be stable. IMHO the only way to achieve that is
materializing output
> which Flink does not currently support.
>
> KafkaIO does not need all the power of
RequiresStableInput to achieve
> EOS with Flink, but for the general case I don't see
a good solution at
> the moment.
>
> -Max
>
> On 01.03.19 16:45, Reuven Lax wrote:
> > Yeah, the person who was working on it originally
stopped working on
> > Beam, and nobody else ever finished it. I think it
is important to
> > finish though. Many of the existing Sinks are only
fully correct for
> > Dataflow today, because they generate either
Reshuffle or
> GroupByKey to
> > ensure input stability before outputting (in many
cases this code
> was
> > inherited from before Beam existed). On Flink
today, these sinks
> might
> > occasionally produce duplicate output in the case
of failures.
> >
> > Reuven
> >
> > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels
<[email protected] <mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>
> > <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>> wrote:
> >
> > Circling back to the RequiresStableInput
annotation[1]. I've
> done some
> > protoyping to see how this could be integrated
into Flink. I'm
> > currently
> > writing a test based on RequiresStableInput.
> >
> > I found out there are already checks in place
at the Runners to
> > throw in
> > case transforms use RequiresStableInput and
its not
> supported. However,
> > not a single transform actually uses the
annotation.
> >
> > It seems that the effort stopped at some
point? Would it make
> sense to
> > start annotating KafkaExactlyOnceSink with
> @RequiresStableInput? We
> > could then get rid of the whitelist.
> >
> > -Max
> >
> > [1]
> >
>
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
> >
> >
> >
> > On 01.03.19 14:28, Maximilian Michels wrote:
> > > Just realized that transactions do not
spawn multiple
> elements in
> > > KafkaExactlyOnceSink. So the proposed
solution to stop
> processing
> > > elements while a snapshot is pending would
work.
> > >
> > > It is certainly not optimal in terms of
performance for
> Flink and
> > poses
> > > problems when checkpoints take long to
complete, but it
> would be
> > > worthwhile to implement this to make use of
the EOS feature.
> > >
> > > Thanks,
> > > Max
> > >
> > > On 01.03.19 12:23, Maximilian Michels wrote:
> > >> Thanks you for the prompt replies. It's
great to see that
> there is
> > >> good understanding of how EOS in Flink works.
> > >>
> > >>> This is exactly what RequiresStableInput
is supposed to
> do. On the
> > >>> Flink runner, this would be implemented
by delaying
> processing
> > until
> > >>> the current checkpoint is done.
> > >>
> > >> I don't think that works because we have
no control over
> the Kafka
> > >> transactions. Imagine:
> > >>
> > >> 1) ExactlyOnceWriter writes records to
Kafka and commits,
> then
> > starts
> > >> a new transaction.
> > >> 2) Flink checkpoints, delaying the
processing of
> elements, the
> > >> checkpoint fails.
> > >> 3) We restore from an old checkpoint and
will start writing
> > duplicate
> > >> data to Kafka. The de-duplication that the
sink performs
> does not
> > >> help, especially because the random shards
ids might be
> assigned
> > >> differently.
> > >>
> > >> IMHO we have to have control over commit
to be able to
> provide EOS.
> > >>
> > >>> When we discussed this in Aug 2017, the
understanding
> was that 2
> > >>> Phase commit utility in Flink used to
implement Flink's
> Kafka EOS
> > >>> could not be implemented in Beam's context.
> > >>
> > >> That's also my understanding, unless we
change the interface.
> > >>
> > >>> I don't see how SDF solves this problem..
> > >>
> > >> SDF has a checkpoint method which the
Runner can call,
> but I think
> > >> that you are right, that the above problem
would be the same.
> > >>
> > >>> Absolutely. I would love to support EOS
in KakaIO for
> Flink. I
> > think
> > >>> that will help many future exactly-once
sinks.. and address
> > >>> fundamental incompatibility between Beam
model and Flink's
> > horizontal
> > >>> checkpointing for such applications.
> > >>
> > >> Great :)
> > >>
> > >>> The FlinkRunner would need to insert the
"wait until
> checkpoint
> > >>> finalization" logic wherever it sees
@RequiresStableInput,
> > which is
> > >>> already what it would have to do.
> > >>
> > >> I don't think that fixes the problem. See
above example.
> > >>
> > >> Thanks,
> > >> Max
> > >>
> > >> On 01.03.19 00:04, Raghu Angadi wrote:
> > >>>
> > >>>
> > >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi
> <[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> > <mailto:[email protected]
<mailto:[email protected]> <mailto:[email protected]
<mailto:[email protected]>>>
> > >>> <mailto:[email protected]
<mailto:[email protected]> <mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>>> wrote:
> > >>>
> > >>> On Thu, Feb 28, 2019 at 2:34 PM
Kenneth Knowles
> > <[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>
> > >>> <mailto:[email protected]
<mailto:[email protected]> <mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>>> wrote:
> > >>>
> > >>> I'm not sure what a hard fail is.
I probably
> have a shallow
> > >>> understanding, but doesn't
@RequiresStableInput work
> > for 2PC?
> > >>> The preCommit() phase should
establish the
> transaction and
> > >>> commit() is not called until
after checkpoint
> > finalization. Can
> > >>> you describe the way that it does
not work a
> little bit
> > more?
> > >>>
> > >>>
> > >>> - preCommit() is called before
checkpoint. Kafka EOS in
> > Flink starts
> > >>> the transaction before this and makes
sure it
> flushes all
> > records in
> > >>> preCommit(). So far good.
> > >>> - commit is called after checkpoint
is persisted.
> Now, imagine
> > >>> commit() fails for some reason. There
is no option
> to rerun
> > the 1st
> > >>> phase to write the records again in a new
> transaction. This
> > is a
> > >>> hard failure for the the job. In
practice Flink might
> > attempt to
> > >>> commit again (not sure how many
times), which is
> likely to
> > fail and
> > >>> eventually results in job failure.
> > >>>
> > >>>
> > >>> In Apache Beam, the records could be
stored in state,
> and can be
> > >>> written inside commit() to work around
this issue. It
> could have
> > >>> scalability issues if checkpoints are not
frequent
> enough in Flink
> > >>> runner.
> > >>>
> > >>> Raghu.
> > >>>
> > >>>
> > >>>
> > >>> Kenn
> > >>>
> > >>> On Thu, Feb 28, 2019 at 1:25 PM
Raghu Angadi
> > <[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>
> > >>> <mailto:[email protected]
<mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>
<mailto:[email protected] <mailto:[email protected]>
> <mailto:[email protected]
<mailto:[email protected]>>>>> wrote:
> > >>>
> > >>> On Thu, Feb 28, 2019 at 11:01
AM Kenneth Knowles
> > >>> <[email protected]
<mailto:[email protected]> <mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>
> > <mailto:[email protected]
<mailto:[email protected]> <mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>>> wrote:
> > >>>
> > >>> I believe the way you
would implement
> the logic
> > behind
> > >>> Flink's KafkaProducer
would be to have
> two steps:
> > >>>
> > >>> 1. Start transaction
> > >>> 2. @RequiresStableInput
Close transaction
> > >>>
> > >>>
> > >>> I see. What happens if
closing the transaction
> > fails in
> > >>> (2)? Flink's 2PC requires
that commit() should
> > never hard
> > >>> fail once preCommit()
succeeds. I think that is
> > cost of not
> > >>> having an extra shuffle. It
is alright since
> this
> > policy has
> > >>> worked well for Flink so far.
> > >>>
> > >>> Overall, it will be great to have
> @RequiresStableInput
> > >>> support in Flink runner.
> > >>>
> > >>> Raghu.
> > >>>
> > >>> The FlinkRunner would
need to insert the
> "wait
> > until
> > >>> checkpoint finalization"
logic wherever it
> > >>>
sees @RequiresStableInput, which is
> already what it
> > >>> would have to do.
> > >>>
> > >>> This matches the
KafkaProducer's logic -
> delay
> > closing
> > >>> the transaction until
checkpoint
> finalization. This
> > >>> answers my main question,
which is "is
> > >>> @RequiresStableInput
expressive enough
> to allow
> > >>> Beam-on-Flink to have
exactly once behavior
> > with the
> > >>> same performance
characteristics as
> native Flink
> > >>> checkpoint finalization?"
> > >>>
> > >>> Kenn
> > >>>
> > >>> [1]
https://github.com/apache/beam/pull/7955
> > >>>
> > >>> On Thu, Feb 28, 2019 at
10:43 AM Reuven Lax
> > >>> <[email protected]
<mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>
<mailto:[email protected] <mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>>
> > <mailto:[email protected]
<mailto:[email protected]> <mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>>> wrote:
> > >>>
> > >>>
> > >>>
> > >>> On Thu, Feb 28, 2019
at 10:41 AM
> Raghu Angadi
> > >>> <[email protected]
<mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>
<mailto:[email protected] <mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>>
> > <mailto:[email protected]
<mailto:[email protected]> <mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>>> wrote:
> > >>>
> > >>>
> > >>> Now why does
the Flink
> Runner not
> > support
> > >>> KafkaIO EOS?
Flink's native
> > >>> KafkaProducer
supports
> exactly-once. It
> > >>> simply
commits the pending
> > >>> transaction
once it has
> completed a
> > >>> checkpoint.
> > >>>
> > >>>
> > >>>
> > >>> On Thu, Feb 28,
2019 at 9:59 AM
> Maximilian
> > >>> Michels
<[email protected] <mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>
> > <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>>>
> > >>> wrote:
> > >>>
> > >>> Hi,
> > >>>
> > >>> I came across
KafkaIO's Runner
> > whitelist [1]
> > >>> for enabling
exactly-once
> > >>> semantics
(EOS). I think it is
> > questionable
> > >>> to exclude
Runners from
> > >>> inside a
transform, but I
> see that the
> > >>> intention was
to save users from
> > >>> surprises.
> > >>>
> > >>> Now why does
the Flink
> Runner not
> > support
> > >>> KafkaIO EOS?
Flink's native
> > >>> KafkaProducer
supports
> exactly-once. It
> > >>> simply
commits the pending
> > >>> transaction
once it has
> completed a
> > >>> checkpoint.
> > >>>
> > >>>
> > >>>
> > >>> When we discussed
this in Aug
> 2017, the
> > >>> understanding was
that 2 Phase
> commit
> > utility in
> > >>> Flink used to
implement Flink's
> Kafka
> > EOS could
> > >>> not be
implemented in Beam's
> context.
> > >>> See this message
> > >>>
>
<https://www.mail-archive.com/[email protected]/msg02664.html> in
> > >>> that dev thread.
Has anything
> changed
> > in this
> > >>> regard? The whole
thread is
> relevant to
> > this
> > >>> topic and worth
going through.
> > >>>
> > >>> I think that
TwoPhaseCommit utility
> class
> > wouldn't
> > >>> work. The Flink
runner would
> probably want to
> > >>> directly use
notifySnapshotComplete
> in order to
> > >>>
implement @RequiresStableInput.
> > >>>
> > >>>
> > >>> A checkpoint
is realized by
> sending
> > barriers
> > >>> through all
channels
> > >>> starting from
the source until
> > reaching all
> > >>> sinks. Every
operator
> > >>> persists its
state once it has
> > received a
> > >>> barrier on
all its input
> > >>> channels, it
then forwards
> it to the
> > >>> downstream
operators.
> > >>>
> > >>> The
architecture of Beam's
> > >>>
KafkaExactlyOnceSink is as
> follows[2]:
> > >>>
> > >>> Input ->
AssignRandomShardIds ->
> > GroupByKey
> > >>> ->
AssignSequenceIds ->
> > >>> GroupByKey ->
ExactlyOnceWriter
> > >>>
> > >>> As I
understood, Spark or
> Dataflow
> > use the
> > >>> GroupByKey
stages to persist
> > >>> the input.
That is not
> required in
> > Flink to
> > >>> be able to
take a consistent
> > >>> snapshot of
the pipeline.
> > >>>
> > >>> Basically,
for Flink we
> don't need
> > any of
> > >>> that magic
that KafkaIO does.
> > >>> What we would
need to
> support EOS
> > is a way
> > >>> to tell the
ExactlyOnceWriter
> > >>> (a DoFn) to
commit once a
> > checkpoint has
> > >>> completed.
> > >>>
> > >>> I know that
the new version
> of SDF
> > supports
> > >>> checkpointing
which should
> > >>> solve this
issue. But there is
> > still a lot
> > >>> of work to do
to make this
> > >>> reality.
> > >>>
> > >>>
> > >>> I don't see how
SDF solves this
> > problem.. May be
> > >>> pseudo code would
make more
> clear. But if
> > >>> helps, that is great!
> > >>>
> > >>> So I think it
would make
> sense to think
> > >>> about a way
to make KafkaIO's
> > >>> EOS more
accessible to Runners
> > which support
> > >>> a different
way of
> > >>> checkpointing.
> > >>>
> > >>>
> > >>> Absolutely. I
would love to
> support EOS in
> > >>> KakaIO for Flink.
I think that will
> > help many
> > >>> future
exactly-once sinks.. and
> address
> > >>> fundamental
incompatibility between
> > Beam model
> > >>> and Flink's
horizontal checkpointing
> > for such
> > >>> applications.
> > >>>
> > >>> Raghu.
> > >>>
> > >>> Cheers,
> > >>> Max
> > >>>
> > >>> PS: I found
this document about
> > >>>
RequiresStableInput [3], but
> IMHO
> > >>> defining an
annotation only
> > manifests the
> > >>> conceptual
difference between
> > >>> the Runners.
> > >>>
> > >>>
> > >>> [1]
> > >>>
> >
>
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
> >
> > >>>
> > >>> [2]
> > >>>
> >
>
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
> >
> > >>>
> > >>> [3]
> > >>>
> >
>
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
> >
> > >>>
> > >>>
> >
>