Oh, I understand now. This DoFn is saying "make my input deterministically
replayable." If it turns out the input already is deterministically
replayable, then nothing needs to be done.



On Wed, Aug 9, 2017 at 9:10 AM, Kenneth Knowles <k...@google.com.invalid>
wrote:

> The term "determinism" refers to a property of the input PCollection, not
> any transform or DoFn. What we mean by it is that the PCollection has
> well-defined contents, so any transform consuming it will see consistent
> PCollection contents across retries.
>
> Illustrated, I think we are talking about the same situation, where we hope
> for an execution like this:
>
>     Transform(s) -> Checkpoint -> WriteTransform
>
> In every case I know of the purpose of the Checkpoint is so that
> WriteTransform sees the same input across retries, even if the upstream
> Transform(s) are not deterministic.
>
> So "marking DoFns as having side effects, and having the runner
> automatically insert such a Checkpoint in front of them" is precisely what
> what you get from "Requires deterministic input". Of course, there are lots
> of kinds of side effects and they don't all require deterministic input, so
> that's how the vocabulary developed.
>
> Kenn
>
> On Wed, Aug 9, 2017 at 8:48 AM, Reuven Lax <re...@google.com.invalid>
> wrote:
>
> > Is determinism the right thing for this? One thing to keep in mind, is
> that
> > most inputs will not be deterministic. If any upstream aggregation is
> done
> > and allowed_lateness > 0, then that aggregation is non deterministic
> > (basically, if it is retried it might get a slightly different set of
> input
> > elements to aggregate) and so are downstream dependent values. Similarly
> if
> > an upstream aggregation uses count or processing-time triggers, the
> result
> > of that aggregation will be non deterministic.
> >
> > In the above cases, the property of the DoFn that requires this
> > checkpointing is not determinism, it's the fact that the DoFn has side
> > effects.
> >
> > BTW, nothing prevents us from allowing automatic inference, but _also_
> > adding a checkpoint operator (which will be a noop operator for runners
> > such as Dataflow).
> >
> > Reuven
> >
> > On Wed, Aug 9, 2017 at 8:32 AM, Kenneth Knowles <k...@google.com.invalid>
> > wrote:
> >
> > > We've had a few threads related to this. There was one proposal that
> > seemed
> > > to achieve consensus [1]. The TL;DR is that we have to assume any DoFn
> > > might have side effects (in the broadest sense of the term where
> anything
> > > other than a pure mathematical function is a side effect) and when we
> > want
> > > deterministic input we use a special DoFn parameter like distinct from
> > > ProcessContext to request it, something like:
> > >
> > > @ProcessElement
> > > public void process(DeterministicInput elem, OutputReceiver
> mainOutput) {
> > >  ... elem.get() instead of context.element() ...
> > >  ... mainOutput.output() instead of context.output() ...
> > > }
> > >
> > > A runner can then add checkpointing if needed or elide it if not
> needed.
> > It
> > > depends on the runner's inherent checkpointing behavior and the ability
> > to
> > > analyze a pipeline to know whether intervening transforms are
> > deterministic
> > > functions.
> > >
> > > I started some work on breaking down
> > > (StartBundle|Process|FinishBundle)Context to transition towards this,
> > but
> > > development has stalled in favor of other priorities. I'd be happy to
> > chat
> > > with anyone who wants to pick this up.
> > >
> > > Kenn
> > >
> > > [1]
> > > https://lists.apache.org/thread.html/ae3c838df060e47148439d1dad818d
> > > 5e927b2a25ff00cc4153221dff@%3Cdev.beam.apache.org%3E
> > >
> > > On Wed, Aug 9, 2017 at 2:07 AM, Aljoscha Krettek <aljos...@apache.org>
> > > wrote:
> > >
> > > > Yes, I think making this explicit would be good. Having a
> > transformation
> > > > that makes assumptions about how the runner implements certain things
> > is
> > > > not optimal. Also, I think that most people probably don't use Kafka
> > with
> > > > the Dataflow Runner (because GCE has Pubsub, but I'm guest guessing
> > > here).
> > > > This would mean that the intersection of "people who would benefit
> from
> > > an
> > > > exactly-once Kafka sink" and "people who use Beam on Dataflow" is
> > rather
> > > > small, and therefore not many people would benefit from such a
> > Transform.
> > > >
> > > > This is all just conjecture, of course.
> > > >
> > > > Best,
> > > > Aljoscha
> > > >
> > > > > On 8. Aug 2017, at 23:34, Reuven Lax <re...@google.com.INVALID>
> > wrote:
> > > > >
> > > > > I think the issue we're hitting is how to write this in Beam.
> > > > >
> > > > > Dataflow historically guaranteed checkpointing at every GBK (which
> > due
> > > to
> > > > > the design of Dataflow's streaming shuffle was reasonably
> efficient).
> > > In
> > > > > Beam we never formalized these semantics, leaving these syncs in a
> > gray
> > > > > area. I believe the Spark runner currently checkpoints the RDD on
> > every
> > > > > GBK, so these unwritten semantics currently work for Dataflow and
> for
> > > > Spark.
> > > > >
> > > > > We need someway to express this operation in Beam, whether it be
> via
> > an
> > > > > explicit Checkpoint() operation or via marking DoFns as having side
> > > > > effects, and having the runner automatically insert such a
> Checkpoint
> > > in
> > > > > front of them. In Flink, this operation can be implemented using
> what
> > > > > Aljoscha posted.
> > > > >
> > > > > Reuven
> > > > >
> > > > > On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek <
> > aljos...@apache.org>
> > > > > wrote:
> > > > >
> > > > >> Hi,
> > > > >>
> > > > >> In Flink, there is a TwoPhaseCommit SinkFunction that can be used
> > for
> > > > such
> > > > >> cases: [1]. The PR for a Kafka 0.11 exactly once producer builds
> on
> > > > that:
> > > > >> [2]
> > > > >>
> > > > >> Best,
> > > > >> Aljoscha
> > > > >>
> > > > >> [1] https://github.com/apache/flink/blob/
> > > 62e99918a45b7215c099fbcf160d45
> > > > >> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
> > > > >> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.
> > > java#L55
> > > > <
> > > > >> https://github.com/apache/flink/blob/
> 62e99918a45b7215c099fbcf160d45
> > > > >> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
> > > > >> flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.
> > > java#L55>
> > > > >> [2] https://github.com/apache/flink/pull/4239
> > > > >>> On 3. Aug 2017, at 04:03, Raghu Angadi
> <rang...@google.com.INVALID
> > >
> > > > >> wrote:
> > > > >>>
> > > > >>> Kafka 0.11 added support for transactions[1], which allows
> > end-to-end
> > > > >>> exactly-once semantics. Beam's KafkaIO users can benefit from
> these
> > > > while
> > > > >>> using runners that support exactly-once processing.
> > > > >>>
> > > > >>> I have an implementation of EOS support for Kafka sink :
> > > > >>> https://github.com/apache/beam/pull/3612
> > > > >>> It has two shuffles and builds on Beam state-API and checkpoint
> > > barrier
> > > > >>> between stages (as in Dataflow). Pull request has a longer
> > > description.
> > > > >>>
> > > > >>> - What other runners in addition to Dataflow would be compatible
> > with
> > > > >> such
> > > > >>> a strategy?
> > > > >>> - I think it does not quite work for Flink (as it has a global
> > > > >> checkpoint,
> > > > >>> not between the stages). How would one go about implementing
> such a
> > > > sink.
> > > > >>>
> > > > >>> Any comments on the pull request are also welcome.
> > > > >>>
> > > > >>> Thanks,
> > > > >>> Raghu.
> > > > >>>
> > > > >>> [1]
> > > > >>> https://www.confluent.io/blog/exactly-once-semantics-are-
> > > > >> possible-heres-how-apache-kafka-does-it/
> > > > >>
> > > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to