The situation for Kafka transactions described here seems very analogous to a file sink that writes to temp files, waits for that to succeed, durably persists, then renames to their final location [1]. What it sounds like Raghu is describing is that if the Kafka commit fails then the previously written data is discarded. So that would be as if rename failure would delete the temp files. The architecture no longer works.
Kenn [1] see https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L765 On Wed, Mar 6, 2019 at 10:29 AM Thomas Weise <t...@apache.org> wrote: > A fair amount of work for true true exactly once output was done in Apex. > Different from almost exactly-once :) > > The takeaway was that the mechanism to achieve it depends on the external > system. The implementation looks different for let's say a file sink or > JDBC or Kafka. > > Apex had an exactly-once producer before Kafka supported transactions. > That producer relied on the ability to discover what was already written to > Kafka upon recovery from failure. Why? > > Runners are not distributed transaction coordinators and no matter how we > write the code, there is always the small possibility that one of two > resources fails to commit, resulting in either data loss or duplicates. The > Kafka EOS was a hybrid of producer and consumer, the consumer part used > during recovery to find out what was already produced previously. > > Flink and Apex have very similar checkpointing model, that's why this > thread caught my attention. Within the topology/runner, exactly-once is > achieved by replay having the same effect. For sinks, it needs to rely on > the capabilities of the respective system (like atomic rename for file > sink, or transaction with metadata table for JDBC). > > The buffering until checkpoint is complete is a mechanism to get away from > sink specific implementations. It comes with the latency penalty (memory > overhead could be solved with a write ahead log). But there is still the > possibility that we fail to flush the buffer after the checkpoint is > complete (data loss)? > > Thanks, > Thomas > > > On Wed, Mar 6, 2019 at 9:37 AM Kenneth Knowles <k...@apache.org> wrote: > >> On Tue, Mar 5, 2019 at 10:02 PM Raghu Angadi <ang...@gmail.com> wrote: >> >>> >>> >>> On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax <re...@google.com> wrote: >>> >>>> RE: Kenn's suggestion. i think Raghu looked into something that, and >>>> something about it didn't work. I don't remember all the details, but I >>>> think there might have been some subtle problem with it that wasn't >>>> obvious. Doesn't mean that there isn't another way to solve that issue.' >>>> >>> >>> Two disadvantages: >>> - A transaction in Kafka are tied to single producer instance. There is >>> no official API to start a txn in one process and access it in another >>> process. Flink's sink uses an internal REST API for this. >>> >> >> Can you say more about how this works? >> >> - There is one failure case that I mentioned earlier: if closing the >>> transaction in downstream transform fails, it is data loss, there is no way >>> to replay the upstream transform that wrote the records to Kafka. >>> >> >> With coupling of unrelated failures due to fusion, this is a severe >> problem. I think I see now how 2PC affects this. From my reading, I can't >> see the difference in how Flink works. If the checkpoint finalization >> callback that does the Kafka commit fails, does it invalidate the >> checkpoint so the start transaction + write elements is retried? >> >> Kenn >> >> >>> >>> GBKs don't have major scalability limitations in most runner. Extra GBK >>> is fine in practice for such a sink (at least no one has complained about >>> it yet, though I don't know real usage numbers in practice). Flink's >>> implentation in Beam using @RequiresStableInput does have storage >>> requirements and latency costs that increase with checkpoint interval. I >>> think is still just as useful. Good to see @RequiresStableInput support >>> added to Flink runner in Max's PR. >>> >>> >>>> Hopefully we can make that work. Another possibility if we can't is to >>>> do something special for Flink. Beam allows runners to splice out >>>> well-known transforms with their own implementation. Dataflow already does >>>> that for Google Cloud Pub/Sub sources/sinks. The Flink runner could splice >>>> out the Kafka sink with one that uses Flink-specific functionality. >>>> Ideally this would reuse most of the existing Kafka code (maybe we could >>>> refactor just the EOS part into something that could be subbed out). >>>> >>>> Reuven >>>> >>>> On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels <m...@apache.org> >>>> wrote: >>>> >>>>> > It would be interesting to see if there's something we could add to >>>>> the Beam model that would create a better story for Kafka's EOS writes. >>>>> >>>>> There would have to be a checkpoint-completed callback the DoFn can >>>>> register with the Runner. Does not seem applicable for most Runners >>>>> though. >>>>> >>>>> > This is true, however isn't it already true for such uses of Flink? >>>>> >>>>> Yes, that's correct. In the case of Kafka, Flink can offload the >>>>> buffering but for the general case, idempotent writes are only >>>>> possible >>>>> if we buffer data until the checkpoint is completed. >>>>> >>>>> On 04.03.19 17:45, Reuven Lax wrote: >>>>> > >>>>> > >>>>> > On Mon, Mar 4, 2019 at 6:55 AM Maximilian Michels <m...@apache.org >>>>> > <mailto:m...@apache.org>> wrote: >>>>> > >>>>> > > Can we do 2? I seem to remember that we had trouble in some >>>>> cases >>>>> > (e..g in the BigQuery case, there was no obvious way to create a >>>>> > deterministic id, which is why we went for a random number >>>>> followed >>>>> > by a reshuffle). Also remember that the user ParDo that is >>>>> producing >>>>> > data to the sink is not guaranteed to be deterministic; the Beam >>>>> > model allows for non-deterministic transforms. >>>>> > >>>>> > I believe we could use something like the worker id to make it >>>>> > deterministic, though the worker id can change after a restart. >>>>> We >>>>> > could >>>>> > persist it in Flink's operator state. I do not know if we can >>>>> come up >>>>> > with a Runner-independent solution. >>>>> > >>>>> > >>>>> > If we did this, we would break it on runners that don't have a >>>>> concept >>>>> > of a stable worker id :( The Dataflow runner can load balance work >>>>> at >>>>> > any time (including moving work around between workers). >>>>> > >>>>> > >>>>> > > I'm not quite sure I understand. If a ParDo is marked with >>>>> > RequiresStableInput, can't the flink runner buffer the input >>>>> message >>>>> > until after the checkpoint is complete and only then deliver it >>>>> to >>>>> > the ParDo? >>>>> > >>>>> > You're correct. I thought that it could suffice to only buffer >>>>> during a >>>>> > checkpoint and otherwise rely on the deterministic execution of >>>>> the >>>>> > pipeline and KafkaIO's de-duplication code. >>>>> > >>>>> > >>>>> > Yes, I want to distinguish the KafkaIO case from the general case. >>>>> It >>>>> > would be interesting to see if there's something we could add to the >>>>> > Beam model that would create a better story for Kafka's EOS writes. >>>>> > >>>>> > >>>>> > In any case, emitting only after finalization of checkpoints >>>>> gives us >>>>> > guaranteed stable input. It also means that the processing is >>>>> tight to >>>>> > the checkpoint interval, the checkpoint duration, and the >>>>> available >>>>> > memory. >>>>> > >>>>> > >>>>> > This is true, however isn't it already true for such uses of Flink? >>>>> > >>>>> > >>>>> > On 01.03.19 19:41, Reuven Lax wrote: >>>>> > > >>>>> > > >>>>> > > On Fri, Mar 1, 2019 at 10:37 AM Maximilian Michels >>>>> > <m...@apache.org <mailto:m...@apache.org> >>>>> > > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote: >>>>> > > >>>>> > > Fully agree. I think we can improve the situation >>>>> > drastically. For >>>>> > > KafkaIO EOS with Flink we need to make these two changes: >>>>> > > >>>>> > > 1) Introduce buffering while the checkpoint is being taken >>>>> > > 2) Replace the random shard id assignment with something >>>>> > deterministic >>>>> > > >>>>> > > >>>>> > > Can we do 2? I seem to remember that we had trouble in some >>>>> cases >>>>> > (e..g >>>>> > > in the BigQuery case, there was no obvious way to create a >>>>> > deterministic >>>>> > > id, which is why we went for a random number followed by a >>>>> > reshuffle). >>>>> > > Also remember that the user ParDo that is producing data to >>>>> the >>>>> > sink is >>>>> > > not guaranteed to be deterministic; the Beam model allows for >>>>> > > non-deterministic transforms. >>>>> > > >>>>> > > >>>>> > > However, we won't be able to provide full compatibility >>>>> with >>>>> > > RequiresStableInput because Flink only guarantees stable >>>>> > input after a >>>>> > > checkpoint. RequiresStableInput requires input at any >>>>> point >>>>> > in time to >>>>> > > be stable. >>>>> > > >>>>> > > >>>>> > > I'm not quite sure I understand. If a ParDo is marked with >>>>> > > RequiresStableInput, can't the flink runner buffer the input >>>>> message >>>>> > > until after the checkpoint is complete and only then deliver >>>>> it >>>>> > to the >>>>> > > ParDo? This adds latency of course, but I'm not sure how else >>>>> to do >>>>> > > things correctly with the Beam model. >>>>> > > >>>>> > > IMHO the only way to achieve that is materializing output >>>>> > > which Flink does not currently support. >>>>> > > >>>>> > > KafkaIO does not need all the power of >>>>> RequiresStableInput to >>>>> > achieve >>>>> > > EOS with Flink, but for the general case I don't see a >>>>> good >>>>> > solution at >>>>> > > the moment. >>>>> > > >>>>> > > -Max >>>>> > > >>>>> > > On 01.03.19 16:45, Reuven Lax wrote: >>>>> > > > Yeah, the person who was working on it originally >>>>> stopped >>>>> > working on >>>>> > > > Beam, and nobody else ever finished it. I think it is >>>>> > important to >>>>> > > > finish though. Many of the existing Sinks are only >>>>> fully >>>>> > correct for >>>>> > > > Dataflow today, because they generate either Reshuffle >>>>> or >>>>> > > GroupByKey to >>>>> > > > ensure input stability before outputting (in many cases >>>>> > this code >>>>> > > was >>>>> > > > inherited from before Beam existed). On Flink today, >>>>> these >>>>> > sinks >>>>> > > might >>>>> > > > occasionally produce duplicate output in the case of >>>>> failures. >>>>> > > > >>>>> > > > Reuven >>>>> > > > >>>>> > > > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels >>>>> > <m...@apache.org <mailto:m...@apache.org> >>>>> > > <mailto:m...@apache.org <mailto:m...@apache.org>> >>>>> > > > <mailto:m...@apache.org <mailto:m...@apache.org> >>>>> > <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote: >>>>> > > > >>>>> > > > Circling back to the RequiresStableInput >>>>> > annotation[1]. I've >>>>> > > done some >>>>> > > > protoyping to see how this could be integrated into >>>>> > Flink. I'm >>>>> > > > currently >>>>> > > > writing a test based on RequiresStableInput. >>>>> > > > >>>>> > > > I found out there are already checks in place at >>>>> the >>>>> > Runners to >>>>> > > > throw in >>>>> > > > case transforms use RequiresStableInput and its not >>>>> > > supported. However, >>>>> > > > not a single transform actually uses the >>>>> annotation. >>>>> > > > >>>>> > > > It seems that the effort stopped at some point? >>>>> Would >>>>> > it make >>>>> > > sense to >>>>> > > > start annotating KafkaExactlyOnceSink with >>>>> > > @RequiresStableInput? We >>>>> > > > could then get rid of the whitelist. >>>>> > > > >>>>> > > > -Max >>>>> > > > >>>>> > > > [1] >>>>> > > > >>>>> > > >>>>> > >>>>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM >>>>> > > > >>>>> > > > >>>>> > > > >>>>> > > > On 01.03.19 14:28, Maximilian Michels wrote: >>>>> > > > > Just realized that transactions do not spawn >>>>> multiple >>>>> > > elements in >>>>> > > > > KafkaExactlyOnceSink. So the proposed solution >>>>> to stop >>>>> > > processing >>>>> > > > > elements while a snapshot is pending would work. >>>>> > > > > >>>>> > > > > It is certainly not optimal in terms of >>>>> performance for >>>>> > > Flink and >>>>> > > > poses >>>>> > > > > problems when checkpoints take long to >>>>> complete, but it >>>>> > > would be >>>>> > > > > worthwhile to implement this to make use of the >>>>> EOS >>>>> > feature. >>>>> > > > > >>>>> > > > > Thanks, >>>>> > > > > Max >>>>> > > > > >>>>> > > > > On 01.03.19 12:23, Maximilian Michels wrote: >>>>> > > > >> Thanks you for the prompt replies. It's great >>>>> to >>>>> > see that >>>>> > > there is >>>>> > > > >> good understanding of how EOS in Flink works. >>>>> > > > >> >>>>> > > > >>> This is exactly what RequiresStableInput is >>>>> > supposed to >>>>> > > do. On the >>>>> > > > >>> Flink runner, this would be implemented by >>>>> delaying >>>>> > > processing >>>>> > > > until >>>>> > > > >>> the current checkpoint is done. >>>>> > > > >> >>>>> > > > >> I don't think that works because we have no >>>>> > control over >>>>> > > the Kafka >>>>> > > > >> transactions. Imagine: >>>>> > > > >> >>>>> > > > >> 1) ExactlyOnceWriter writes records to Kafka >>>>> and >>>>> > commits, >>>>> > > then >>>>> > > > starts >>>>> > > > >> a new transaction. >>>>> > > > >> 2) Flink checkpoints, delaying the processing >>>>> of >>>>> > > elements, the >>>>> > > > >> checkpoint fails. >>>>> > > > >> 3) We restore from an old checkpoint and will >>>>> > start writing >>>>> > > > duplicate >>>>> > > > >> data to Kafka. The de-duplication that the sink >>>>> > performs >>>>> > > does not >>>>> > > > >> help, especially because the random shards ids >>>>> > might be >>>>> > > assigned >>>>> > > > >> differently. >>>>> > > > >> >>>>> > > > >> IMHO we have to have control over commit to be >>>>> able to >>>>> > > provide EOS. >>>>> > > > >> >>>>> > > > >>> When we discussed this in Aug 2017, the >>>>> understanding >>>>> > > was that 2 >>>>> > > > >>> Phase commit utility in Flink used to >>>>> implement >>>>> > Flink's >>>>> > > Kafka EOS >>>>> > > > >>> could not be implemented in Beam's context. >>>>> > > > >> >>>>> > > > >> That's also my understanding, unless we change >>>>> the >>>>> > interface. >>>>> > > > >> >>>>> > > > >>> I don't see how SDF solves this problem.. >>>>> > > > >> >>>>> > > > >> SDF has a checkpoint method which the Runner >>>>> can call, >>>>> > > but I think >>>>> > > > >> that you are right, that the above problem >>>>> would >>>>> > be the same. >>>>> > > > >> >>>>> > > > >>> Absolutely. I would love to support EOS in >>>>> KakaIO for >>>>> > > Flink. I >>>>> > > > think >>>>> > > > >>> that will help many future exactly-once >>>>> sinks.. >>>>> > and address >>>>> > > > >>> fundamental incompatibility between Beam model >>>>> > and Flink's >>>>> > > > horizontal >>>>> > > > >>> checkpointing for such applications. >>>>> > > > >> >>>>> > > > >> Great :) >>>>> > > > >> >>>>> > > > >>> The FlinkRunner would need to insert the >>>>> "wait until >>>>> > > checkpoint >>>>> > > > >>> finalization" logic wherever it sees >>>>> > @RequiresStableInput, >>>>> > > > which is >>>>> > > > >>> already what it would have to do. >>>>> > > > >> >>>>> > > > >> I don't think that fixes the problem. See above >>>>> > example. >>>>> > > > >> >>>>> > > > >> Thanks, >>>>> > > > >> Max >>>>> > > > >> >>>>> > > > >> On 01.03.19 00:04, Raghu Angadi wrote: >>>>> > > > >>> >>>>> > > > >>> >>>>> > > > >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi >>>>> > > <ang...@gmail.com <mailto:ang...@gmail.com> >>>>> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>> >>>>> > > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >>>>> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>> >>>>> > > > >>> <mailto:ang...@gmail.com >>>>> > <mailto:ang...@gmail.com> <mailto:ang...@gmail.com >>>>> > <mailto:ang...@gmail.com>> >>>>> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >>>>> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>> wrote: >>>>> > > > >>> >>>>> > > > >>> On Thu, Feb 28, 2019 at 2:34 PM Kenneth >>>>> Knowles >>>>> > > > <k...@apache.org <mailto:k...@apache.org> >>>>> > <mailto:k...@apache.org <mailto:k...@apache.org>> >>>>> > > <mailto:k...@apache.org <mailto:k...@apache.org> >>>>> > <mailto:k...@apache.org <mailto:k...@apache.org>>> >>>>> > > > >>> <mailto:k...@apache.org >>>>> > <mailto:k...@apache.org> <mailto:k...@apache.org >>>>> > <mailto:k...@apache.org>> >>>>> > > <mailto:k...@apache.org <mailto:k...@apache.org> >>>>> > <mailto:k...@apache.org <mailto:k...@apache.org>>>>> wrote: >>>>> > > > >>> >>>>> > > > >>> I'm not sure what a hard fail is. I >>>>> probably >>>>> > > have a shallow >>>>> > > > >>> understanding, but doesn't >>>>> > @RequiresStableInput work >>>>> > > > for 2PC? >>>>> > > > >>> The preCommit() phase should >>>>> establish the >>>>> > > transaction and >>>>> > > > >>> commit() is not called until after >>>>> checkpoint >>>>> > > > finalization. Can >>>>> > > > >>> you describe the way that it does not >>>>> work a >>>>> > > little bit >>>>> > > > more? >>>>> > > > >>> >>>>> > > > >>> >>>>> > > > >>> - preCommit() is called before checkpoint. >>>>> > Kafka EOS in >>>>> > > > Flink starts >>>>> > > > >>> the transaction before this and makes >>>>> sure it >>>>> > > flushes all >>>>> > > > records in >>>>> > > > >>> preCommit(). So far good. >>>>> > > > >>> - commit is called after checkpoint is >>>>> persisted. >>>>> > > Now, imagine >>>>> > > > >>> commit() fails for some reason. There is >>>>> no >>>>> > option >>>>> > > to rerun >>>>> > > > the 1st >>>>> > > > >>> phase to write the records again in a new >>>>> > > transaction. This >>>>> > > > is a >>>>> > > > >>> hard failure for the the job. In practice >>>>> > Flink might >>>>> > > > attempt to >>>>> > > > >>> commit again (not sure how many times), >>>>> which is >>>>> > > likely to >>>>> > > > fail and >>>>> > > > >>> eventually results in job failure. >>>>> > > > >>> >>>>> > > > >>> >>>>> > > > >>> In Apache Beam, the records could be stored >>>>> in state, >>>>> > > and can be >>>>> > > > >>> written inside commit() to work around this >>>>> issue. It >>>>> > > could have >>>>> > > > >>> scalability issues if checkpoints are not >>>>> frequent >>>>> > > enough in Flink >>>>> > > > >>> runner. >>>>> > > > >>> >>>>> > > > >>> Raghu. >>>>> > > > >>> >>>>> > > > >>> >>>>> > > > >>> >>>>> > > > >>> Kenn >>>>> > > > >>> >>>>> > > > >>> On Thu, Feb 28, 2019 at 1:25 PM Raghu >>>>> Angadi >>>>> > > > <ang...@gmail.com <mailto:ang...@gmail.com> >>>>> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>> >>>>> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >>>>> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>> >>>>> > > > >>> <mailto:ang...@gmail.com >>>>> > <mailto:ang...@gmail.com> >>>>> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>> >>>>> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >>>>> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>> >>>>> wrote: >>>>> > > > >>> >>>>> > > > >>> On Thu, Feb 28, 2019 at 11:01 AM >>>>> > Kenneth Knowles >>>>> > > > >>> <k...@apache.org >>>>> > <mailto:k...@apache.org> <mailto:k...@apache.org >>>>> > <mailto:k...@apache.org>> >>>>> > > <mailto:k...@apache.org <mailto:k...@apache.org> >>>>> > <mailto:k...@apache.org <mailto:k...@apache.org>>> >>>>> > > > <mailto:k...@apache.org <mailto:k...@apache.org> >>>>> > <mailto:k...@apache.org <mailto:k...@apache.org>> >>>>> > > <mailto:k...@apache.org <mailto:k...@apache.org> >>>>> > <mailto:k...@apache.org <mailto:k...@apache.org>>>>> wrote: >>>>> > > > >>> >>>>> > > > >>> I believe the way you would >>>>> implement >>>>> > > the logic >>>>> > > > behind >>>>> > > > >>> Flink's KafkaProducer would >>>>> be to >>>>> > have >>>>> > > two steps: >>>>> > > > >>> >>>>> > > > >>> 1. Start transaction >>>>> > > > >>> 2. @RequiresStableInput Close >>>>> > transaction >>>>> > > > >>> >>>>> > > > >>> >>>>> > > > >>> I see. What happens if closing >>>>> the >>>>> > transaction >>>>> > > > fails in >>>>> > > > >>> (2)? Flink's 2PC requires that >>>>> > commit() should >>>>> > > > never hard >>>>> > > > >>> fail once preCommit() succeeds. I >>>>> > think that is >>>>> > > > cost of not >>>>> > > > >>> having an extra shuffle. It is >>>>> > alright since >>>>> > > this >>>>> > > > policy has >>>>> > > > >>> worked well for Flink so far. >>>>> > > > >>> >>>>> > > > >>> Overall, it will be great to have >>>>> > > @RequiresStableInput >>>>> > > > >>> support in Flink runner. >>>>> > > > >>> >>>>> > > > >>> Raghu. >>>>> > > > >>> >>>>> > > > >>> The FlinkRunner would need to >>>>> > insert the >>>>> > > "wait >>>>> > > > until >>>>> > > > >>> checkpoint finalization" logic >>>>> > wherever it >>>>> > > > >>> sees @RequiresStableInput, >>>>> which is >>>>> > > already what it >>>>> > > > >>> would have to do. >>>>> > > > >>> >>>>> > > > >>> This matches the >>>>> KafkaProducer's >>>>> > logic - >>>>> > > delay >>>>> > > > closing >>>>> > > > >>> the transaction until >>>>> checkpoint >>>>> > > finalization. This >>>>> > > > >>> answers my main question, >>>>> which >>>>> > is "is >>>>> > > > >>> @RequiresStableInput >>>>> expressive >>>>> > enough >>>>> > > to allow >>>>> > > > >>> Beam-on-Flink to have exactly >>>>> > once behavior >>>>> > > > with the >>>>> > > > >>> same performance >>>>> characteristics as >>>>> > > native Flink >>>>> > > > >>> checkpoint finalization?" >>>>> > > > >>> >>>>> > > > >>> Kenn >>>>> > > > >>> >>>>> > > > >>> [1] >>>>> > https://github.com/apache/beam/pull/7955 >>>>> > > > >>> >>>>> > > > >>> On Thu, Feb 28, 2019 at 10:43 >>>>> AM >>>>> > Reuven Lax >>>>> > > > >>> <re...@google.com >>>>> > <mailto:re...@google.com> >>>>> > > <mailto:re...@google.com <mailto:re...@google.com>> >>>>> > <mailto:re...@google.com <mailto:re...@google.com> >>>>> > > <mailto:re...@google.com <mailto:re...@google.com>>> >>>>> > > > <mailto:re...@google.com <mailto:re...@google.com> >>>>> > <mailto:re...@google.com <mailto:re...@google.com>> >>>>> > > <mailto:re...@google.com <mailto:re...@google.com> >>>>> > <mailto:re...@google.com <mailto:re...@google.com>>>>> wrote: >>>>> > > > >>> >>>>> > > > >>> >>>>> > > > >>> >>>>> > > > >>> On Thu, Feb 28, 2019 at >>>>> 10:41 AM >>>>> > > Raghu Angadi >>>>> > > > >>> <ang...@gmail.com >>>>> > <mailto:ang...@gmail.com> >>>>> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>> >>>>> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >>>>> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>> >>>>> > > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >>>>> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>> >>>>> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >>>>> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>> wrote: >>>>> > > > >>> >>>>> > > > >>> >>>>> > > > >>> Now why does the >>>>> Flink >>>>> > > Runner not >>>>> > > > support >>>>> > > > >>> KafkaIO EOS? >>>>> Flink's >>>>> > native >>>>> > > > >>> KafkaProducer >>>>> supports >>>>> > > exactly-once. It >>>>> > > > >>> simply commits the >>>>> > pending >>>>> > > > >>> transaction once >>>>> it has >>>>> > > completed a >>>>> > > > >>> checkpoint. >>>>> > > > >>> >>>>> > > > >>> >>>>> > > > >>> >>>>> > > > >>> On Thu, Feb 28, 2019 >>>>> at >>>>> > 9:59 AM >>>>> > > Maximilian >>>>> > > > >>> Michels < >>>>> m...@apache.org >>>>> > <mailto:m...@apache.org> >>>>> > > <mailto:m...@apache.org <mailto:m...@apache.org>> >>>>> > > > <mailto:m...@apache.org <mailto:m...@apache.org> >>>>> > <mailto:m...@apache.org <mailto:m...@apache.org>>> >>>>> > > <mailto:m...@apache.org <mailto:m...@apache.org> >>>>> > <mailto:m...@apache.org <mailto:m...@apache.org>> >>>>> > > <mailto:m...@apache.org <mailto:m...@apache.org> >>>>> > <mailto:m...@apache.org <mailto:m...@apache.org>>>>> >>>>> > > > >>> wrote: >>>>> > > > >>> >>>>> > > > >>> Hi, >>>>> > > > >>> >>>>> > > > >>> I came across >>>>> > KafkaIO's Runner >>>>> > > > whitelist [1] >>>>> > > > >>> for enabling >>>>> exactly-once >>>>> > > > >>> semantics (EOS). I >>>>> > think it is >>>>> > > > questionable >>>>> > > > >>> to exclude >>>>> Runners from >>>>> > > > >>> inside a >>>>> transform, but I >>>>> > > see that the >>>>> > > > >>> intention was to >>>>> save >>>>> > users from >>>>> > > > >>> surprises. >>>>> > > > >>> >>>>> > > > >>> Now why does the >>>>> Flink >>>>> > > Runner not >>>>> > > > support >>>>> > > > >>> KafkaIO EOS? >>>>> Flink's >>>>> > native >>>>> > > > >>> KafkaProducer >>>>> supports >>>>> > > exactly-once. It >>>>> > > > >>> simply commits the >>>>> > pending >>>>> > > > >>> transaction once >>>>> it has >>>>> > > completed a >>>>> > > > >>> checkpoint. >>>>> > > > >>> >>>>> > > > >>> >>>>> > > > >>> >>>>> > > > >>> When we discussed >>>>> this in Aug >>>>> > > 2017, the >>>>> > > > >>> understanding was >>>>> that 2 >>>>> > Phase >>>>> > > commit >>>>> > > > utility in >>>>> > > > >>> Flink used to >>>>> implement >>>>> > Flink's >>>>> > > Kafka >>>>> > > > EOS could >>>>> > > > >>> not be implemented in >>>>> Beam's >>>>> > > context. >>>>> > > > >>> See this message >>>>> > > > >>> >>>>> > > >>>>> > < >>>>> https://www.mail-archive.com/dev@beam.apache.org/msg02664.html> in >>>>> > > > >>> that dev thread. Has >>>>> anything >>>>> > > changed >>>>> > > > in this >>>>> > > > >>> regard? The whole >>>>> thread is >>>>> > > relevant to >>>>> > > > this >>>>> > > > >>> topic and worth going >>>>> > through. >>>>> > > > >>> >>>>> > > > >>> I think that >>>>> TwoPhaseCommit >>>>> > utility >>>>> > > class >>>>> > > > wouldn't >>>>> > > > >>> work. The Flink runner >>>>> would >>>>> > > probably want to >>>>> > > > >>> directly use >>>>> > notifySnapshotComplete >>>>> > > in order to >>>>> > > > >>> >>>>> implement @RequiresStableInput. >>>>> > > > >>> >>>>> > > > >>> >>>>> > > > >>> A checkpoint is >>>>> > realized by >>>>> > > sending >>>>> > > > barriers >>>>> > > > >>> through all >>>>> channels >>>>> > > > >>> starting from the >>>>> > source until >>>>> > > > reaching all >>>>> > > > >>> sinks. Every >>>>> operator >>>>> > > > >>> persists its state >>>>> > once it has >>>>> > > > received a >>>>> > > > >>> barrier on all >>>>> its input >>>>> > > > >>> channels, it then >>>>> > forwards >>>>> > > it to the >>>>> > > > >>> downstream >>>>> operators. >>>>> > > > >>> >>>>> > > > >>> The architecture >>>>> of >>>>> > Beam's >>>>> > > > >>> >>>>> KafkaExactlyOnceSink >>>>> > is as >>>>> > > follows[2]: >>>>> > > > >>> >>>>> > > > >>> Input -> >>>>> > AssignRandomShardIds -> >>>>> > > > GroupByKey >>>>> > > > >>> -> >>>>> AssignSequenceIds -> >>>>> > > > >>> GroupByKey -> >>>>> > ExactlyOnceWriter >>>>> > > > >>> >>>>> > > > >>> As I understood, >>>>> Spark or >>>>> > > Dataflow >>>>> > > > use the >>>>> > > > >>> GroupByKey stages >>>>> to >>>>> > persist >>>>> > > > >>> the input. That >>>>> is not >>>>> > > required in >>>>> > > > Flink to >>>>> > > > >>> be able to take a >>>>> > consistent >>>>> > > > >>> snapshot of the >>>>> pipeline. >>>>> > > > >>> >>>>> > > > >>> Basically, for >>>>> Flink we >>>>> > > don't need >>>>> > > > any of >>>>> > > > >>> that magic that >>>>> > KafkaIO does. >>>>> > > > >>> What we would >>>>> need to >>>>> > > support EOS >>>>> > > > is a way >>>>> > > > >>> to tell the >>>>> > ExactlyOnceWriter >>>>> > > > >>> (a DoFn) to >>>>> commit once a >>>>> > > > checkpoint has >>>>> > > > >>> completed. >>>>> > > > >>> >>>>> > > > >>> I know that the >>>>> new >>>>> > version >>>>> > > of SDF >>>>> > > > supports >>>>> > > > >>> checkpointing >>>>> which >>>>> > should >>>>> > > > >>> solve this issue. >>>>> But >>>>> > there is >>>>> > > > still a lot >>>>> > > > >>> of work to do to >>>>> make >>>>> > this >>>>> > > > >>> reality. >>>>> > > > >>> >>>>> > > > >>> >>>>> > > > >>> I don't see how SDF >>>>> > solves this >>>>> > > > problem.. May be >>>>> > > > >>> pseudo code would >>>>> make more >>>>> > > clear. But if >>>>> > > > >>> helps, that is great! >>>>> > > > >>> >>>>> > > > >>> So I think it >>>>> would make >>>>> > > sense to think >>>>> > > > >>> about a way to >>>>> make >>>>> > KafkaIO's >>>>> > > > >>> EOS more >>>>> accessible >>>>> > to Runners >>>>> > > > which support >>>>> > > > >>> a different way of >>>>> > > > >>> checkpointing. >>>>> > > > >>> >>>>> > > > >>> >>>>> > > > >>> Absolutely. I would >>>>> love to >>>>> > > support EOS in >>>>> > > > >>> KakaIO for Flink. I >>>>> think >>>>> > that will >>>>> > > > help many >>>>> > > > >>> future exactly-once >>>>> > sinks.. and >>>>> > > address >>>>> > > > >>> fundamental >>>>> > incompatibility between >>>>> > > > Beam model >>>>> > > > >>> and Flink's horizontal >>>>> > checkpointing >>>>> > > > for such >>>>> > > > >>> applications. >>>>> > > > >>> >>>>> > > > >>> Raghu. >>>>> > > > >>> >>>>> > > > >>> Cheers, >>>>> > > > >>> Max >>>>> > > > >>> >>>>> > > > >>> PS: I found this >>>>> > document about >>>>> > > > >>> >>>>> RequiresStableInput >>>>> > [3], but >>>>> > > IMHO >>>>> > > > >>> defining an >>>>> > annotation only >>>>> > > > manifests the >>>>> > > > >>> conceptual >>>>> difference >>>>> > between >>>>> > > > >>> the Runners. >>>>> > > > >>> >>>>> > > > >>> >>>>> > > > >>> [1] >>>>> > > > >>> >>>>> > > > >>>>> > > >>>>> > >>>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144 >>>>> > > > >>>>> > > > >>> >>>>> > > > >>> [2] >>>>> > > > >>> >>>>> > > > >>>>> > > >>>>> > >>>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166 >>>>> > > > >>>>> > > > >>> >>>>> > > > >>> [3] >>>>> > > > >>> >>>>> > > > >>>>> > > >>>>> > >>>>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM >>>>> > > > >>>>> > > > >>> >>>>> > > > >>> >>>>> > > > >>>>> > > >>>>> > >>>>> >>>>