The term "determinism" refers to a property of the input PCollection, not
any transform or DoFn. What we mean by it is that the PCollection has
well-defined contents, so any transform consuming it will see consistent
PCollection contents across retries.

Illustrated, I think we are talking about the same situation, where we hope
for an execution like this:

    Transform(s) -> Checkpoint -> WriteTransform

In every case I know of the purpose of the Checkpoint is so that
WriteTransform sees the same input across retries, even if the upstream
Transform(s) are not deterministic.

So "marking DoFns as having side effects, and having the runner
automatically insert such a Checkpoint in front of them" is precisely what
what you get from "Requires deterministic input". Of course, there are lots
of kinds of side effects and they don't all require deterministic input, so
that's how the vocabulary developed.

Kenn

On Wed, Aug 9, 2017 at 8:48 AM, Reuven Lax <re...@google.com.invalid> wrote:

> Is determinism the right thing for this? One thing to keep in mind, is that
> most inputs will not be deterministic. If any upstream aggregation is done
> and allowed_lateness > 0, then that aggregation is non deterministic
> (basically, if it is retried it might get a slightly different set of input
> elements to aggregate) and so are downstream dependent values. Similarly if
> an upstream aggregation uses count or processing-time triggers, the result
> of that aggregation will be non deterministic.
>
> In the above cases, the property of the DoFn that requires this
> checkpointing is not determinism, it's the fact that the DoFn has side
> effects.
>
> BTW, nothing prevents us from allowing automatic inference, but _also_
> adding a checkpoint operator (which will be a noop operator for runners
> such as Dataflow).
>
> Reuven
>
> On Wed, Aug 9, 2017 at 8:32 AM, Kenneth Knowles <k...@google.com.invalid>
> wrote:
>
> > We've had a few threads related to this. There was one proposal that
> seemed
> > to achieve consensus [1]. The TL;DR is that we have to assume any DoFn
> > might have side effects (in the broadest sense of the term where anything
> > other than a pure mathematical function is a side effect) and when we
> want
> > deterministic input we use a special DoFn parameter like distinct from
> > ProcessContext to request it, something like:
> >
> > @ProcessElement
> > public void process(DeterministicInput elem, OutputReceiver mainOutput) {
> >  ... elem.get() instead of context.element() ...
> >  ... mainOutput.output() instead of context.output() ...
> > }
> >
> > A runner can then add checkpointing if needed or elide it if not needed.
> It
> > depends on the runner's inherent checkpointing behavior and the ability
> to
> > analyze a pipeline to know whether intervening transforms are
> deterministic
> > functions.
> >
> > I started some work on breaking down
> > (StartBundle|Process|FinishBundle)Context to transition towards this,
> but
> > development has stalled in favor of other priorities. I'd be happy to
> chat
> > with anyone who wants to pick this up.
> >
> > Kenn
> >
> > [1]
> > https://lists.apache.org/thread.html/ae3c838df060e47148439d1dad818d
> > 5e927b2a25ff00cc4153221dff@%3Cdev.beam.apache.org%3E
> >
> > On Wed, Aug 9, 2017 at 2:07 AM, Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> > > Yes, I think making this explicit would be good. Having a
> transformation
> > > that makes assumptions about how the runner implements certain things
> is
> > > not optimal. Also, I think that most people probably don't use Kafka
> with
> > > the Dataflow Runner (because GCE has Pubsub, but I'm guest guessing
> > here).
> > > This would mean that the intersection of "people who would benefit from
> > an
> > > exactly-once Kafka sink" and "people who use Beam on Dataflow" is
> rather
> > > small, and therefore not many people would benefit from such a
> Transform.
> > >
> > > This is all just conjecture, of course.
> > >
> > > Best,
> > > Aljoscha
> > >
> > > > On 8. Aug 2017, at 23:34, Reuven Lax <re...@google.com.INVALID>
> wrote:
> > > >
> > > > I think the issue we're hitting is how to write this in Beam.
> > > >
> > > > Dataflow historically guaranteed checkpointing at every GBK (which
> due
> > to
> > > > the design of Dataflow's streaming shuffle was reasonably efficient).
> > In
> > > > Beam we never formalized these semantics, leaving these syncs in a
> gray
> > > > area. I believe the Spark runner currently checkpoints the RDD on
> every
> > > > GBK, so these unwritten semantics currently work for Dataflow and for
> > > Spark.
> > > >
> > > > We need someway to express this operation in Beam, whether it be via
> an
> > > > explicit Checkpoint() operation or via marking DoFns as having side
> > > > effects, and having the runner automatically insert such a Checkpoint
> > in
> > > > front of them. In Flink, this operation can be implemented using what
> > > > Aljoscha posted.
> > > >
> > > > Reuven
> > > >
> > > > On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek <
> aljos...@apache.org>
> > > > wrote:
> > > >
> > > >> Hi,
> > > >>
> > > >> In Flink, there is a TwoPhaseCommit SinkFunction that can be used
> for
> > > such
> > > >> cases: [1]. The PR for a Kafka 0.11 exactly once producer builds on
> > > that:
> > > >> [2]
> > > >>
> > > >> Best,
> > > >> Aljoscha
> > > >>
> > > >> [1] https://github.com/apache/flink/blob/
> > 62e99918a45b7215c099fbcf160d45
> > > >> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
> > > >> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.
> > java#L55
> > > <
> > > >> https://github.com/apache/flink/blob/62e99918a45b7215c099fbcf160d45
> > > >> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
> > > >> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.
> > java#L55>
> > > >> [2] https://github.com/apache/flink/pull/4239
> > > >>> On 3. Aug 2017, at 04:03, Raghu Angadi <rang...@google.com.INVALID
> >
> > > >> wrote:
> > > >>>
> > > >>> Kafka 0.11 added support for transactions[1], which allows
> end-to-end
> > > >>> exactly-once semantics. Beam's KafkaIO users can benefit from these
> > > while
> > > >>> using runners that support exactly-once processing.
> > > >>>
> > > >>> I have an implementation of EOS support for Kafka sink :
> > > >>> https://github.com/apache/beam/pull/3612
> > > >>> It has two shuffles and builds on Beam state-API and checkpoint
> > barrier
> > > >>> between stages (as in Dataflow). Pull request has a longer
> > description.
> > > >>>
> > > >>> - What other runners in addition to Dataflow would be compatible
> with
> > > >> such
> > > >>> a strategy?
> > > >>> - I think it does not quite work for Flink (as it has a global
> > > >> checkpoint,
> > > >>> not between the stages). How would one go about implementing such a
> > > sink.
> > > >>>
> > > >>> Any comments on the pull request are also welcome.
> > > >>>
> > > >>> Thanks,
> > > >>> Raghu.
> > > >>>
> > > >>> [1]
> > > >>> https://www.confluent.io/blog/exactly-once-semantics-are-
> > > >> possible-heres-how-apache-kafka-does-it/
> > > >>
> > > >>
> > >
> > >
> >
>

Reply via email to