On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi <ang...@gmail.com> wrote:

>
> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>> KafkaProducer supports exactly-once. It simply commits the pending
>> transaction once it has completed a checkpoint.
>
>
>
> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels <m...@apache.org> wrote:
>
>> Hi,
>>
>> I came across KafkaIO's Runner whitelist [1] for enabling exactly-once
>> semantics (EOS). I think it is questionable to exclude Runners from
>> inside a transform, but I see that the intention was to save users from
>> surprises.
>>
>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>> KafkaProducer supports exactly-once. It simply commits the pending
>> transaction once it has completed a checkpoint.
>>
>
>
> When we discussed this in Aug 2017, the understanding was that 2 Phase
> commit utility in Flink used to implement Flink's Kafka EOS could not be
> implemented in Beam's context.
> See  this message
> <https://www.mail-archive.com/dev@beam.apache.org/msg02664.html> in that
> dev thread. Has anything changed in this regard? The whole thread is
> relevant to this topic and worth going through.
>

I think that TwoPhaseCommit utility class wouldn't work. The Flink runner
would probably want to directly use notifySnapshotComplete in order to
implement @RequiresStableInput.

>
>
>>
>> A checkpoint is realized by sending barriers through all channels
>> starting from the source until reaching all sinks. Every operator
>> persists its state once it has received a barrier on all its input
>> channels, it then forwards it to the downstream operators.
>>
>> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]:
>>
>> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds ->
>> GroupByKey -> ExactlyOnceWriter
>>
>> As I understood, Spark or Dataflow use the GroupByKey stages to persist
>> the input. That is not required in Flink to be able to take a consistent
>> snapshot of the pipeline.
>>
>> Basically, for Flink we don't need any of that magic that KafkaIO does.
>> What we would need to support EOS is a way to tell the ExactlyOnceWriter
>> (a DoFn) to commit once a checkpoint has completed.
>
> I know that the new version of SDF supports checkpointing which should
>> solve this issue. But there is still a lot of work to do to make this
>> reality.
>>
>
> I don't see how SDF solves this problem.. May be pseudo code would make
> more clear.  But if helps, that is great!
>
> So I think it would make sense to think about a way to make KafkaIO's
>> EOS more accessible to Runners which support a different way of
>> checkpointing.
>>
>
> Absolutely. I would love to support EOS in KakaIO for Flink. I think that
> will help many future exactly-once sinks.. and address fundamental
> incompatibility between Beam model and Flink's horizontal checkpointing for
> such applications.
>
> Raghu.
>
>
>> Cheers,
>> Max
>>
>> PS: I found this document about RequiresStableInput [3], but IMHO
>> defining an annotation only manifests the conceptual difference between
>> the Runners.
>>
>>
>> [1]
>>
>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
>> [2]
>>
>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
>> [3]
>>
>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>>
>

Reply via email to