We've had a few threads related to this. There was one proposal that seemed
to achieve consensus [1]. The TL;DR is that we have to assume any DoFn
might have side effects (in the broadest sense of the term where anything
other than a pure mathematical function is a side effect) and when we want
deterministic input we use a special DoFn parameter like distinct from
ProcessContext to request it, something like:

@ProcessElement
public void process(DeterministicInput elem, OutputReceiver mainOutput) {
 ... elem.get() instead of context.element() ...
 ... mainOutput.output() instead of context.output() ...
}

A runner can then add checkpointing if needed or elide it if not needed. It
depends on the runner's inherent checkpointing behavior and the ability to
analyze a pipeline to know whether intervening transforms are deterministic
functions.

I started some work on breaking down
(StartBundle|Process|FinishBundle)Context to transition towards this, but
development has stalled in favor of other priorities. I'd be happy to chat
with anyone who wants to pick this up.

Kenn

[1]
https://lists.apache.org/thread.html/ae3c838df060e47148439d1dad818d5e927b2a25ff00cc4153221dff@%3Cdev.beam.apache.org%3E

On Wed, Aug 9, 2017 at 2:07 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Yes, I think making this explicit would be good. Having a transformation
> that makes assumptions about how the runner implements certain things is
> not optimal. Also, I think that most people probably don't use Kafka with
> the Dataflow Runner (because GCE has Pubsub, but I'm guest guessing here).
> This would mean that the intersection of "people who would benefit from an
> exactly-once Kafka sink" and "people who use Beam on Dataflow" is rather
> small, and therefore not many people would benefit from such a Transform.
>
> This is all just conjecture, of course.
>
> Best,
> Aljoscha
>
> > On 8. Aug 2017, at 23:34, Reuven Lax <re...@google.com.INVALID> wrote:
> >
> > I think the issue we're hitting is how to write this in Beam.
> >
> > Dataflow historically guaranteed checkpointing at every GBK (which due to
> > the design of Dataflow's streaming shuffle was reasonably efficient). In
> > Beam we never formalized these semantics, leaving these syncs in a gray
> > area. I believe the Spark runner currently checkpoints the RDD on every
> > GBK, so these unwritten semantics currently work for Dataflow and for
> Spark.
> >
> > We need someway to express this operation in Beam, whether it be via an
> > explicit Checkpoint() operation or via marking DoFns as having side
> > effects, and having the runner automatically insert such a Checkpoint in
> > front of them. In Flink, this operation can be implemented using what
> > Aljoscha posted.
> >
> > Reuven
> >
> > On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> >> Hi,
> >>
> >> In Flink, there is a TwoPhaseCommit SinkFunction that can be used for
> such
> >> cases: [1]. The PR for a Kafka 0.11 exactly once producer builds on
> that:
> >> [2]
> >>
> >> Best,
> >> Aljoscha
> >>
> >> [1] https://github.com/apache/flink/blob/62e99918a45b7215c099fbcf160d45
> >> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
> >> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java#L55
> <
> >> https://github.com/apache/flink/blob/62e99918a45b7215c099fbcf160d45
> >> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
> >> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java#L55>
> >> [2] https://github.com/apache/flink/pull/4239
> >>> On 3. Aug 2017, at 04:03, Raghu Angadi <rang...@google.com.INVALID>
> >> wrote:
> >>>
> >>> Kafka 0.11 added support for transactions[1], which allows end-to-end
> >>> exactly-once semantics. Beam's KafkaIO users can benefit from these
> while
> >>> using runners that support exactly-once processing.
> >>>
> >>> I have an implementation of EOS support for Kafka sink :
> >>> https://github.com/apache/beam/pull/3612
> >>> It has two shuffles and builds on Beam state-API and checkpoint barrier
> >>> between stages (as in Dataflow). Pull request has a longer description.
> >>>
> >>> - What other runners in addition to Dataflow would be compatible with
> >> such
> >>> a strategy?
> >>> - I think it does not quite work for Flink (as it has a global
> >> checkpoint,
> >>> not between the stages). How would one go about implementing such a
> sink.
> >>>
> >>> Any comments on the pull request are also welcome.
> >>>
> >>> Thanks,
> >>> Raghu.
> >>>
> >>> [1]
> >>> https://www.confluent.io/blog/exactly-once-semantics-are-
> >> possible-heres-how-apache-kafka-does-it/
> >>
> >>
>
>

Reply via email to