Yes, I think making this explicit would be good. Having a transformation that 
makes assumptions about how the runner implements certain things is not 
optimal. Also, I think that most people probably don't use Kafka with the 
Dataflow Runner (because GCE has Pubsub, but I'm guest guessing here). This 
would mean that the intersection of "people who would benefit from an 
exactly-once Kafka sink" and "people who use Beam on Dataflow" is rather small, 
and therefore not many people would benefit from such a Transform.

This is all just conjecture, of course.

Best,
Aljoscha

> On 8. Aug 2017, at 23:34, Reuven Lax <re...@google.com.INVALID> wrote:
> 
> I think the issue we're hitting is how to write this in Beam.
> 
> Dataflow historically guaranteed checkpointing at every GBK (which due to
> the design of Dataflow's streaming shuffle was reasonably efficient). In
> Beam we never formalized these semantics, leaving these syncs in a gray
> area. I believe the Spark runner currently checkpoints the RDD on every
> GBK, so these unwritten semantics currently work for Dataflow and for Spark.
> 
> We need someway to express this operation in Beam, whether it be via an
> explicit Checkpoint() operation or via marking DoFns as having side
> effects, and having the runner automatically insert such a Checkpoint in
> front of them. In Flink, this operation can be implemented using what
> Aljoscha posted.
> 
> Reuven
> 
> On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
> 
>> Hi,
>> 
>> In Flink, there is a TwoPhaseCommit SinkFunction that can be used for such
>> cases: [1]. The PR for a Kafka 0.11 exactly once producer builds on that:
>> [2]
>> 
>> Best,
>> Aljoscha
>> 
>> [1] https://github.com/apache/flink/blob/62e99918a45b7215c099fbcf160d45
>> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
>> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java#L55 <
>> https://github.com/apache/flink/blob/62e99918a45b7215c099fbcf160d45
>> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
>> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java#L55>
>> [2] https://github.com/apache/flink/pull/4239
>>> On 3. Aug 2017, at 04:03, Raghu Angadi <rang...@google.com.INVALID>
>> wrote:
>>> 
>>> Kafka 0.11 added support for transactions[1], which allows end-to-end
>>> exactly-once semantics. Beam's KafkaIO users can benefit from these while
>>> using runners that support exactly-once processing.
>>> 
>>> I have an implementation of EOS support for Kafka sink :
>>> https://github.com/apache/beam/pull/3612
>>> It has two shuffles and builds on Beam state-API and checkpoint barrier
>>> between stages (as in Dataflow). Pull request has a longer description.
>>> 
>>> - What other runners in addition to Dataflow would be compatible with
>> such
>>> a strategy?
>>> - I think it does not quite work for Flink (as it has a global
>> checkpoint,
>>> not between the stages). How would one go about implementing such a sink.
>>> 
>>> Any comments on the pull request are also welcome.
>>> 
>>> Thanks,
>>> Raghu.
>>> 
>>> [1]
>>> https://www.confluent.io/blog/exactly-once-semantics-are-
>> possible-heres-how-apache-kafka-does-it/
>> 
>> 

Reply via email to