BTW - as a followup - there is a cost to having a Flink-specific override for the Kafka sink. Part of that is test coverage - users who write DirectRunner tests for their pipeline will now be using a different version of the code than is used on the actual Flink runner. It also makes the code less obvious: people who read the KafkaIO code will tend not to realize that Flink is running something a bit different, and this can lead to confusion.
Reuven On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax <re...@google.com> wrote: > RE: Kenn's suggestion. i think Raghu looked into something that, and > something about it didn't work. I don't remember all the details, but I > think there might have been some subtle problem with it that wasn't > obvious. Doesn't mean that there isn't another way to solve that issue. > > Hopefully we can make that work. Another possibility if we can't is to do > something special for Flink. Beam allows runners to splice out well-known > transforms with their own implementation. Dataflow already does that for > Google Cloud Pub/Sub sources/sinks. The Flink runner could splice out the > Kafka sink with one that uses Flink-specific functionality. Ideally this > would reuse most of the existing Kafka code (maybe we could refactor just > the EOS part into something that could be subbed out). > > Reuven > > On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels <m...@apache.org> wrote: > >> > It would be interesting to see if there's something we could add to the >> Beam model that would create a better story for Kafka's EOS writes. >> >> There would have to be a checkpoint-completed callback the DoFn can >> register with the Runner. Does not seem applicable for most Runners >> though. >> >> > This is true, however isn't it already true for such uses of Flink? >> >> Yes, that's correct. In the case of Kafka, Flink can offload the >> buffering but for the general case, idempotent writes are only possible >> if we buffer data until the checkpoint is completed. >> >> On 04.03.19 17:45, Reuven Lax wrote: >> > >> > >> > On Mon, Mar 4, 2019 at 6:55 AM Maximilian Michels <m...@apache.org >> > <mailto:m...@apache.org>> wrote: >> > >> > > Can we do 2? I seem to remember that we had trouble in some cases >> > (e..g in the BigQuery case, there was no obvious way to create a >> > deterministic id, which is why we went for a random number followed >> > by a reshuffle). Also remember that the user ParDo that is producing >> > data to the sink is not guaranteed to be deterministic; the Beam >> > model allows for non-deterministic transforms. >> > >> > I believe we could use something like the worker id to make it >> > deterministic, though the worker id can change after a restart. We >> > could >> > persist it in Flink's operator state. I do not know if we can come >> up >> > with a Runner-independent solution. >> > >> > >> > If we did this, we would break it on runners that don't have a concept >> > of a stable worker id :( The Dataflow runner can load balance work at >> > any time (including moving work around between workers). >> > >> > >> > > I'm not quite sure I understand. If a ParDo is marked with >> > RequiresStableInput, can't the flink runner buffer the input message >> > until after the checkpoint is complete and only then deliver it to >> > the ParDo? >> > >> > You're correct. I thought that it could suffice to only buffer >> during a >> > checkpoint and otherwise rely on the deterministic execution of the >> > pipeline and KafkaIO's de-duplication code. >> > >> > >> > Yes, I want to distinguish the KafkaIO case from the general case. It >> > would be interesting to see if there's something we could add to the >> > Beam model that would create a better story for Kafka's EOS writes. >> > >> > >> > In any case, emitting only after finalization of checkpoints gives >> us >> > guaranteed stable input. It also means that the processing is tight >> to >> > the checkpoint interval, the checkpoint duration, and the available >> > memory. >> > >> > >> > This is true, however isn't it already true for such uses of Flink? >> > >> > >> > On 01.03.19 19:41, Reuven Lax wrote: >> > > >> > > >> > > On Fri, Mar 1, 2019 at 10:37 AM Maximilian Michels >> > <m...@apache.org <mailto:m...@apache.org> >> > > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote: >> > > >> > > Fully agree. I think we can improve the situation >> > drastically. For >> > > KafkaIO EOS with Flink we need to make these two changes: >> > > >> > > 1) Introduce buffering while the checkpoint is being taken >> > > 2) Replace the random shard id assignment with something >> > deterministic >> > > >> > > >> > > Can we do 2? I seem to remember that we had trouble in some cases >> > (e..g >> > > in the BigQuery case, there was no obvious way to create a >> > deterministic >> > > id, which is why we went for a random number followed by a >> > reshuffle). >> > > Also remember that the user ParDo that is producing data to the >> > sink is >> > > not guaranteed to be deterministic; the Beam model allows for >> > > non-deterministic transforms. >> > > >> > > >> > > However, we won't be able to provide full compatibility with >> > > RequiresStableInput because Flink only guarantees stable >> > input after a >> > > checkpoint. RequiresStableInput requires input at any point >> > in time to >> > > be stable. >> > > >> > > >> > > I'm not quite sure I understand. If a ParDo is marked with >> > > RequiresStableInput, can't the flink runner buffer the input >> message >> > > until after the checkpoint is complete and only then deliver it >> > to the >> > > ParDo? This adds latency of course, but I'm not sure how else to >> do >> > > things correctly with the Beam model. >> > > >> > > IMHO the only way to achieve that is materializing output >> > > which Flink does not currently support. >> > > >> > > KafkaIO does not need all the power of RequiresStableInput to >> > achieve >> > > EOS with Flink, but for the general case I don't see a good >> > solution at >> > > the moment. >> > > >> > > -Max >> > > >> > > On 01.03.19 16:45, Reuven Lax wrote: >> > > > Yeah, the person who was working on it originally stopped >> > working on >> > > > Beam, and nobody else ever finished it. I think it is >> > important to >> > > > finish though. Many of the existing Sinks are only fully >> > correct for >> > > > Dataflow today, because they generate either Reshuffle or >> > > GroupByKey to >> > > > ensure input stability before outputting (in many cases >> > this code >> > > was >> > > > inherited from before Beam existed). On Flink today, these >> > sinks >> > > might >> > > > occasionally produce duplicate output in the case of >> failures. >> > > > >> > > > Reuven >> > > > >> > > > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels >> > <m...@apache.org <mailto:m...@apache.org> >> > > <mailto:m...@apache.org <mailto:m...@apache.org>> >> > > > <mailto:m...@apache.org <mailto:m...@apache.org> >> > <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote: >> > > > >> > > > Circling back to the RequiresStableInput >> > annotation[1]. I've >> > > done some >> > > > protoyping to see how this could be integrated into >> > Flink. I'm >> > > > currently >> > > > writing a test based on RequiresStableInput. >> > > > >> > > > I found out there are already checks in place at the >> > Runners to >> > > > throw in >> > > > case transforms use RequiresStableInput and its not >> > > supported. However, >> > > > not a single transform actually uses the annotation. >> > > > >> > > > It seems that the effort stopped at some point? Would >> > it make >> > > sense to >> > > > start annotating KafkaExactlyOnceSink with >> > > @RequiresStableInput? We >> > > > could then get rid of the whitelist. >> > > > >> > > > -Max >> > > > >> > > > [1] >> > > > >> > > >> > >> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM >> > > > >> > > > >> > > > >> > > > On 01.03.19 14:28, Maximilian Michels wrote: >> > > > > Just realized that transactions do not spawn >> multiple >> > > elements in >> > > > > KafkaExactlyOnceSink. So the proposed solution to >> stop >> > > processing >> > > > > elements while a snapshot is pending would work. >> > > > > >> > > > > It is certainly not optimal in terms of >> performance for >> > > Flink and >> > > > poses >> > > > > problems when checkpoints take long to complete, >> but it >> > > would be >> > > > > worthwhile to implement this to make use of the EOS >> > feature. >> > > > > >> > > > > Thanks, >> > > > > Max >> > > > > >> > > > > On 01.03.19 12:23, Maximilian Michels wrote: >> > > > >> Thanks you for the prompt replies. It's great to >> > see that >> > > there is >> > > > >> good understanding of how EOS in Flink works. >> > > > >> >> > > > >>> This is exactly what RequiresStableInput is >> > supposed to >> > > do. On the >> > > > >>> Flink runner, this would be implemented by >> delaying >> > > processing >> > > > until >> > > > >>> the current checkpoint is done. >> > > > >> >> > > > >> I don't think that works because we have no >> > control over >> > > the Kafka >> > > > >> transactions. Imagine: >> > > > >> >> > > > >> 1) ExactlyOnceWriter writes records to Kafka and >> > commits, >> > > then >> > > > starts >> > > > >> a new transaction. >> > > > >> 2) Flink checkpoints, delaying the processing of >> > > elements, the >> > > > >> checkpoint fails. >> > > > >> 3) We restore from an old checkpoint and will >> > start writing >> > > > duplicate >> > > > >> data to Kafka. The de-duplication that the sink >> > performs >> > > does not >> > > > >> help, especially because the random shards ids >> > might be >> > > assigned >> > > > >> differently. >> > > > >> >> > > > >> IMHO we have to have control over commit to be >> able to >> > > provide EOS. >> > > > >> >> > > > >>> When we discussed this in Aug 2017, the >> understanding >> > > was that 2 >> > > > >>> Phase commit utility in Flink used to implement >> > Flink's >> > > Kafka EOS >> > > > >>> could not be implemented in Beam's context. >> > > > >> >> > > > >> That's also my understanding, unless we change the >> > interface. >> > > > >> >> > > > >>> I don't see how SDF solves this problem.. >> > > > >> >> > > > >> SDF has a checkpoint method which the Runner can >> call, >> > > but I think >> > > > >> that you are right, that the above problem would >> > be the same. >> > > > >> >> > > > >>> Absolutely. I would love to support EOS in >> KakaIO for >> > > Flink. I >> > > > think >> > > > >>> that will help many future exactly-once sinks.. >> > and address >> > > > >>> fundamental incompatibility between Beam model >> > and Flink's >> > > > horizontal >> > > > >>> checkpointing for such applications. >> > > > >> >> > > > >> Great :) >> > > > >> >> > > > >>> The FlinkRunner would need to insert the "wait >> until >> > > checkpoint >> > > > >>> finalization" logic wherever it sees >> > @RequiresStableInput, >> > > > which is >> > > > >>> already what it would have to do. >> > > > >> >> > > > >> I don't think that fixes the problem. See above >> > example. >> > > > >> >> > > > >> Thanks, >> > > > >> Max >> > > > >> >> > > > >> On 01.03.19 00:04, Raghu Angadi wrote: >> > > > >>> >> > > > >>> >> > > > >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi >> > > <ang...@gmail.com <mailto:ang...@gmail.com> >> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>> >> > > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>> >> > > > >>> <mailto:ang...@gmail.com >> > <mailto:ang...@gmail.com> <mailto:ang...@gmail.com >> > <mailto:ang...@gmail.com>> >> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>> wrote: >> > > > >>> >> > > > >>> On Thu, Feb 28, 2019 at 2:34 PM Kenneth >> Knowles >> > > > <k...@apache.org <mailto:k...@apache.org> >> > <mailto:k...@apache.org <mailto:k...@apache.org>> >> > > <mailto:k...@apache.org <mailto:k...@apache.org> >> > <mailto:k...@apache.org <mailto:k...@apache.org>>> >> > > > >>> <mailto:k...@apache.org >> > <mailto:k...@apache.org> <mailto:k...@apache.org >> > <mailto:k...@apache.org>> >> > > <mailto:k...@apache.org <mailto:k...@apache.org> >> > <mailto:k...@apache.org <mailto:k...@apache.org>>>>> wrote: >> > > > >>> >> > > > >>> I'm not sure what a hard fail is. I >> probably >> > > have a shallow >> > > > >>> understanding, but doesn't >> > @RequiresStableInput work >> > > > for 2PC? >> > > > >>> The preCommit() phase should establish >> the >> > > transaction and >> > > > >>> commit() is not called until after >> checkpoint >> > > > finalization. Can >> > > > >>> you describe the way that it does not >> work a >> > > little bit >> > > > more? >> > > > >>> >> > > > >>> >> > > > >>> - preCommit() is called before checkpoint. >> > Kafka EOS in >> > > > Flink starts >> > > > >>> the transaction before this and makes sure it >> > > flushes all >> > > > records in >> > > > >>> preCommit(). So far good. >> > > > >>> - commit is called after checkpoint is >> persisted. >> > > Now, imagine >> > > > >>> commit() fails for some reason. There is no >> > option >> > > to rerun >> > > > the 1st >> > > > >>> phase to write the records again in a new >> > > transaction. This >> > > > is a >> > > > >>> hard failure for the the job. In practice >> > Flink might >> > > > attempt to >> > > > >>> commit again (not sure how many times), >> which is >> > > likely to >> > > > fail and >> > > > >>> eventually results in job failure. >> > > > >>> >> > > > >>> >> > > > >>> In Apache Beam, the records could be stored in >> state, >> > > and can be >> > > > >>> written inside commit() to work around this >> issue. It >> > > could have >> > > > >>> scalability issues if checkpoints are not >> frequent >> > > enough in Flink >> > > > >>> runner. >> > > > >>> >> > > > >>> Raghu. >> > > > >>> >> > > > >>> >> > > > >>> >> > > > >>> Kenn >> > > > >>> >> > > > >>> On Thu, Feb 28, 2019 at 1:25 PM Raghu >> Angadi >> > > > <ang...@gmail.com <mailto:ang...@gmail.com> >> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>> >> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>> >> > > > >>> <mailto:ang...@gmail.com >> > <mailto:ang...@gmail.com> >> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>> >> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>> >> wrote: >> > > > >>> >> > > > >>> On Thu, Feb 28, 2019 at 11:01 AM >> > Kenneth Knowles >> > > > >>> <k...@apache.org >> > <mailto:k...@apache.org> <mailto:k...@apache.org >> > <mailto:k...@apache.org>> >> > > <mailto:k...@apache.org <mailto:k...@apache.org> >> > <mailto:k...@apache.org <mailto:k...@apache.org>>> >> > > > <mailto:k...@apache.org <mailto:k...@apache.org> >> > <mailto:k...@apache.org <mailto:k...@apache.org>> >> > > <mailto:k...@apache.org <mailto:k...@apache.org> >> > <mailto:k...@apache.org <mailto:k...@apache.org>>>>> wrote: >> > > > >>> >> > > > >>> I believe the way you would >> implement >> > > the logic >> > > > behind >> > > > >>> Flink's KafkaProducer would be to >> > have >> > > two steps: >> > > > >>> >> > > > >>> 1. Start transaction >> > > > >>> 2. @RequiresStableInput Close >> > transaction >> > > > >>> >> > > > >>> >> > > > >>> I see. What happens if closing the >> > transaction >> > > > fails in >> > > > >>> (2)? Flink's 2PC requires that >> > commit() should >> > > > never hard >> > > > >>> fail once preCommit() succeeds. I >> > think that is >> > > > cost of not >> > > > >>> having an extra shuffle. It is >> > alright since >> > > this >> > > > policy has >> > > > >>> worked well for Flink so far. >> > > > >>> >> > > > >>> Overall, it will be great to have >> > > @RequiresStableInput >> > > > >>> support in Flink runner. >> > > > >>> >> > > > >>> Raghu. >> > > > >>> >> > > > >>> The FlinkRunner would need to >> > insert the >> > > "wait >> > > > until >> > > > >>> checkpoint finalization" logic >> > wherever it >> > > > >>> sees @RequiresStableInput, which >> is >> > > already what it >> > > > >>> would have to do. >> > > > >>> >> > > > >>> This matches the KafkaProducer's >> > logic - >> > > delay >> > > > closing >> > > > >>> the transaction until checkpoint >> > > finalization. This >> > > > >>> answers my main question, which >> > is "is >> > > > >>> @RequiresStableInput expressive >> > enough >> > > to allow >> > > > >>> Beam-on-Flink to have exactly >> > once behavior >> > > > with the >> > > > >>> same performance characteristics >> as >> > > native Flink >> > > > >>> checkpoint finalization?" >> > > > >>> >> > > > >>> Kenn >> > > > >>> >> > > > >>> [1] >> > https://github.com/apache/beam/pull/7955 >> > > > >>> >> > > > >>> On Thu, Feb 28, 2019 at 10:43 AM >> > Reuven Lax >> > > > >>> <re...@google.com >> > <mailto:re...@google.com> >> > > <mailto:re...@google.com <mailto:re...@google.com>> >> > <mailto:re...@google.com <mailto:re...@google.com> >> > > <mailto:re...@google.com <mailto:re...@google.com>>> >> > > > <mailto:re...@google.com <mailto:re...@google.com> >> > <mailto:re...@google.com <mailto:re...@google.com>> >> > > <mailto:re...@google.com <mailto:re...@google.com> >> > <mailto:re...@google.com <mailto:re...@google.com>>>>> wrote: >> > > > >>> >> > > > >>> >> > > > >>> >> > > > >>> On Thu, Feb 28, 2019 at >> 10:41 AM >> > > Raghu Angadi >> > > > >>> <ang...@gmail.com >> > <mailto:ang...@gmail.com> >> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>> >> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>> >> > > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>> >> > > <mailto:ang...@gmail.com <mailto:ang...@gmail.com> >> > <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>> wrote: >> > > > >>> >> > > > >>> >> > > > >>> Now why does the >> Flink >> > > Runner not >> > > > support >> > > > >>> KafkaIO EOS? Flink's >> > native >> > > > >>> KafkaProducer >> supports >> > > exactly-once. It >> > > > >>> simply commits the >> > pending >> > > > >>> transaction once it >> has >> > > completed a >> > > > >>> checkpoint. >> > > > >>> >> > > > >>> >> > > > >>> >> > > > >>> On Thu, Feb 28, 2019 at >> > 9:59 AM >> > > Maximilian >> > > > >>> Michels <m...@apache.org >> > <mailto:m...@apache.org> >> > > <mailto:m...@apache.org <mailto:m...@apache.org>> >> > > > <mailto:m...@apache.org <mailto:m...@apache.org> >> > <mailto:m...@apache.org <mailto:m...@apache.org>>> >> > > <mailto:m...@apache.org <mailto:m...@apache.org> >> > <mailto:m...@apache.org <mailto:m...@apache.org>> >> > > <mailto:m...@apache.org <mailto:m...@apache.org> >> > <mailto:m...@apache.org <mailto:m...@apache.org>>>>> >> > > > >>> wrote: >> > > > >>> >> > > > >>> Hi, >> > > > >>> >> > > > >>> I came across >> > KafkaIO's Runner >> > > > whitelist [1] >> > > > >>> for enabling >> exactly-once >> > > > >>> semantics (EOS). I >> > think it is >> > > > questionable >> > > > >>> to exclude Runners >> from >> > > > >>> inside a transform, >> but I >> > > see that the >> > > > >>> intention was to save >> > users from >> > > > >>> surprises. >> > > > >>> >> > > > >>> Now why does the >> Flink >> > > Runner not >> > > > support >> > > > >>> KafkaIO EOS? Flink's >> > native >> > > > >>> KafkaProducer >> supports >> > > exactly-once. It >> > > > >>> simply commits the >> > pending >> > > > >>> transaction once it >> has >> > > completed a >> > > > >>> checkpoint. >> > > > >>> >> > > > >>> >> > > > >>> >> > > > >>> When we discussed this >> in Aug >> > > 2017, the >> > > > >>> understanding was that 2 >> > Phase >> > > commit >> > > > utility in >> > > > >>> Flink used to implement >> > Flink's >> > > Kafka >> > > > EOS could >> > > > >>> not be implemented in >> Beam's >> > > context. >> > > > >>> See this message >> > > > >>> >> > > >> > <https://www.mail-archive.com/dev@beam.apache.org/msg02664.html >> > in >> > > > >>> that dev thread. Has >> anything >> > > changed >> > > > in this >> > > > >>> regard? The whole thread >> is >> > > relevant to >> > > > this >> > > > >>> topic and worth going >> > through. >> > > > >>> >> > > > >>> I think that TwoPhaseCommit >> > utility >> > > class >> > > > wouldn't >> > > > >>> work. The Flink runner would >> > > probably want to >> > > > >>> directly use >> > notifySnapshotComplete >> > > in order to >> > > > >>> >> implement @RequiresStableInput. >> > > > >>> >> > > > >>> >> > > > >>> A checkpoint is >> > realized by >> > > sending >> > > > barriers >> > > > >>> through all channels >> > > > >>> starting from the >> > source until >> > > > reaching all >> > > > >>> sinks. Every operator >> > > > >>> persists its state >> > once it has >> > > > received a >> > > > >>> barrier on all its >> input >> > > > >>> channels, it then >> > forwards >> > > it to the >> > > > >>> downstream operators. >> > > > >>> >> > > > >>> The architecture of >> > Beam's >> > > > >>> KafkaExactlyOnceSink >> > is as >> > > follows[2]: >> > > > >>> >> > > > >>> Input -> >> > AssignRandomShardIds -> >> > > > GroupByKey >> > > > >>> -> AssignSequenceIds >> -> >> > > > >>> GroupByKey -> >> > ExactlyOnceWriter >> > > > >>> >> > > > >>> As I understood, >> Spark or >> > > Dataflow >> > > > use the >> > > > >>> GroupByKey stages to >> > persist >> > > > >>> the input. That is >> not >> > > required in >> > > > Flink to >> > > > >>> be able to take a >> > consistent >> > > > >>> snapshot of the >> pipeline. >> > > > >>> >> > > > >>> Basically, for Flink >> we >> > > don't need >> > > > any of >> > > > >>> that magic that >> > KafkaIO does. >> > > > >>> What we would need to >> > > support EOS >> > > > is a way >> > > > >>> to tell the >> > ExactlyOnceWriter >> > > > >>> (a DoFn) to commit >> once a >> > > > checkpoint has >> > > > >>> completed. >> > > > >>> >> > > > >>> I know that the new >> > version >> > > of SDF >> > > > supports >> > > > >>> checkpointing which >> > should >> > > > >>> solve this issue. But >> > there is >> > > > still a lot >> > > > >>> of work to do to make >> > this >> > > > >>> reality. >> > > > >>> >> > > > >>> >> > > > >>> I don't see how SDF >> > solves this >> > > > problem.. May be >> > > > >>> pseudo code would make >> more >> > > clear. But if >> > > > >>> helps, that is great! >> > > > >>> >> > > > >>> So I think it would >> make >> > > sense to think >> > > > >>> about a way to make >> > KafkaIO's >> > > > >>> EOS more accessible >> > to Runners >> > > > which support >> > > > >>> a different way of >> > > > >>> checkpointing. >> > > > >>> >> > > > >>> >> > > > >>> Absolutely. I would love >> to >> > > support EOS in >> > > > >>> KakaIO for Flink. I think >> > that will >> > > > help many >> > > > >>> future exactly-once >> > sinks.. and >> > > address >> > > > >>> fundamental >> > incompatibility between >> > > > Beam model >> > > > >>> and Flink's horizontal >> > checkpointing >> > > > for such >> > > > >>> applications. >> > > > >>> >> > > > >>> Raghu. >> > > > >>> >> > > > >>> Cheers, >> > > > >>> Max >> > > > >>> >> > > > >>> PS: I found this >> > document about >> > > > >>> RequiresStableInput >> > [3], but >> > > IMHO >> > > > >>> defining an >> > annotation only >> > > > manifests the >> > > > >>> conceptual difference >> > between >> > > > >>> the Runners. >> > > > >>> >> > > > >>> >> > > > >>> [1] >> > > > >>> >> > > > >> > > >> > >> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144 >> > > > >> > > > >>> >> > > > >>> [2] >> > > > >>> >> > > > >> > > >> > >> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166 >> > > > >> > > > >>> >> > > > >>> [3] >> > > > >>> >> > > > >> > > >> > >> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM >> > > > >> > > > >>> >> > > > >>> >> > > > >> > > >> > >> >