Is determinism the right thing for this? One thing to keep in mind, is that
most inputs will not be deterministic. If any upstream aggregation is done
and allowed_lateness > 0, then that aggregation is non deterministic
(basically, if it is retried it might get a slightly different set of input
elements to aggregate) and so are downstream dependent values. Similarly if
an upstream aggregation uses count or processing-time triggers, the result
of that aggregation will be non deterministic.

In the above cases, the property of the DoFn that requires this
checkpointing is not determinism, it's the fact that the DoFn has side
effects.

BTW, nothing prevents us from allowing automatic inference, but _also_
adding a checkpoint operator (which will be a noop operator for runners
such as Dataflow).

Reuven

On Wed, Aug 9, 2017 at 8:32 AM, Kenneth Knowles <k...@google.com.invalid>
wrote:

> We've had a few threads related to this. There was one proposal that seemed
> to achieve consensus [1]. The TL;DR is that we have to assume any DoFn
> might have side effects (in the broadest sense of the term where anything
> other than a pure mathematical function is a side effect) and when we want
> deterministic input we use a special DoFn parameter like distinct from
> ProcessContext to request it, something like:
>
> @ProcessElement
> public void process(DeterministicInput elem, OutputReceiver mainOutput) {
>  ... elem.get() instead of context.element() ...
>  ... mainOutput.output() instead of context.output() ...
> }
>
> A runner can then add checkpointing if needed or elide it if not needed. It
> depends on the runner's inherent checkpointing behavior and the ability to
> analyze a pipeline to know whether intervening transforms are deterministic
> functions.
>
> I started some work on breaking down
> (StartBundle|Process|FinishBundle)Context to transition towards this, but
> development has stalled in favor of other priorities. I'd be happy to chat
> with anyone who wants to pick this up.
>
> Kenn
>
> [1]
> https://lists.apache.org/thread.html/ae3c838df060e47148439d1dad818d
> 5e927b2a25ff00cc4153221dff@%3Cdev.beam.apache.org%3E
>
> On Wed, Aug 9, 2017 at 2:07 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> > Yes, I think making this explicit would be good. Having a transformation
> > that makes assumptions about how the runner implements certain things is
> > not optimal. Also, I think that most people probably don't use Kafka with
> > the Dataflow Runner (because GCE has Pubsub, but I'm guest guessing
> here).
> > This would mean that the intersection of "people who would benefit from
> an
> > exactly-once Kafka sink" and "people who use Beam on Dataflow" is rather
> > small, and therefore not many people would benefit from such a Transform.
> >
> > This is all just conjecture, of course.
> >
> > Best,
> > Aljoscha
> >
> > > On 8. Aug 2017, at 23:34, Reuven Lax <re...@google.com.INVALID> wrote:
> > >
> > > I think the issue we're hitting is how to write this in Beam.
> > >
> > > Dataflow historically guaranteed checkpointing at every GBK (which due
> to
> > > the design of Dataflow's streaming shuffle was reasonably efficient).
> In
> > > Beam we never formalized these semantics, leaving these syncs in a gray
> > > area. I believe the Spark runner currently checkpoints the RDD on every
> > > GBK, so these unwritten semantics currently work for Dataflow and for
> > Spark.
> > >
> > > We need someway to express this operation in Beam, whether it be via an
> > > explicit Checkpoint() operation or via marking DoFns as having side
> > > effects, and having the runner automatically insert such a Checkpoint
> in
> > > front of them. In Flink, this operation can be implemented using what
> > > Aljoscha posted.
> > >
> > > Reuven
> > >
> > > On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek <aljos...@apache.org>
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> In Flink, there is a TwoPhaseCommit SinkFunction that can be used for
> > such
> > >> cases: [1]. The PR for a Kafka 0.11 exactly once producer builds on
> > that:
> > >> [2]
> > >>
> > >> Best,
> > >> Aljoscha
> > >>
> > >> [1] https://github.com/apache/flink/blob/
> 62e99918a45b7215c099fbcf160d45
> > >> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
> > >> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.
> java#L55
> > <
> > >> https://github.com/apache/flink/blob/62e99918a45b7215c099fbcf160d45
> > >> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
> > >> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.
> java#L55>
> > >> [2] https://github.com/apache/flink/pull/4239
> > >>> On 3. Aug 2017, at 04:03, Raghu Angadi <rang...@google.com.INVALID>
> > >> wrote:
> > >>>
> > >>> Kafka 0.11 added support for transactions[1], which allows end-to-end
> > >>> exactly-once semantics. Beam's KafkaIO users can benefit from these
> > while
> > >>> using runners that support exactly-once processing.
> > >>>
> > >>> I have an implementation of EOS support for Kafka sink :
> > >>> https://github.com/apache/beam/pull/3612
> > >>> It has two shuffles and builds on Beam state-API and checkpoint
> barrier
> > >>> between stages (as in Dataflow). Pull request has a longer
> description.
> > >>>
> > >>> - What other runners in addition to Dataflow would be compatible with
> > >> such
> > >>> a strategy?
> > >>> - I think it does not quite work for Flink (as it has a global
> > >> checkpoint,
> > >>> not between the stages). How would one go about implementing such a
> > sink.
> > >>>
> > >>> Any comments on the pull request are also welcome.
> > >>>
> > >>> Thanks,
> > >>> Raghu.
> > >>>
> > >>> [1]
> > >>> https://www.confluent.io/blog/exactly-once-semantics-are-
> > >> possible-heres-how-apache-kafka-does-it/
> > >>
> > >>
> >
> >
>

Reply via email to