I assume this holds for side inputs as well.

On Wed, Aug 9, 2017 at 9:41 AM, Kenneth Knowles <k...@google.com.invalid>
wrote:

> Yea, exactly.
>
> On Wed, Aug 9, 2017 at 9:40 AM, Reuven Lax <re...@google.com.invalid>
> wrote:
>
> > Oh, I understand now. This DoFn is saying "make my input
> deterministically
> > replayable." If it turns out the input already is deterministically
> > replayable, then nothing needs to be done.
> >
> >
> >
> > On Wed, Aug 9, 2017 at 9:10 AM, Kenneth Knowles <k...@google.com.invalid>
> > wrote:
> >
> > > The term "determinism" refers to a property of the input PCollection,
> not
> > > any transform or DoFn. What we mean by it is that the PCollection has
> > > well-defined contents, so any transform consuming it will see
> consistent
> > > PCollection contents across retries.
> > >
> > > Illustrated, I think we are talking about the same situation, where we
> > hope
> > > for an execution like this:
> > >
> > >     Transform(s) -> Checkpoint -> WriteTransform
> > >
> > > In every case I know of the purpose of the Checkpoint is so that
> > > WriteTransform sees the same input across retries, even if the upstream
> > > Transform(s) are not deterministic.
> > >
> > > So "marking DoFns as having side effects, and having the runner
> > > automatically insert such a Checkpoint in front of them" is precisely
> > what
> > > what you get from "Requires deterministic input". Of course, there are
> > lots
> > > of kinds of side effects and they don't all require deterministic
> input,
> > so
> > > that's how the vocabulary developed.
> > >
> > > Kenn
> > >
> > > On Wed, Aug 9, 2017 at 8:48 AM, Reuven Lax <re...@google.com.invalid>
> > > wrote:
> > >
> > > > Is determinism the right thing for this? One thing to keep in mind,
> is
> > > that
> > > > most inputs will not be deterministic. If any upstream aggregation is
> > > done
> > > > and allowed_lateness > 0, then that aggregation is non deterministic
> > > > (basically, if it is retried it might get a slightly different set of
> > > input
> > > > elements to aggregate) and so are downstream dependent values.
> > Similarly
> > > if
> > > > an upstream aggregation uses count or processing-time triggers, the
> > > result
> > > > of that aggregation will be non deterministic.
> > > >
> > > > In the above cases, the property of the DoFn that requires this
> > > > checkpointing is not determinism, it's the fact that the DoFn has
> side
> > > > effects.
> > > >
> > > > BTW, nothing prevents us from allowing automatic inference, but
> _also_
> > > > adding a checkpoint operator (which will be a noop operator for
> runners
> > > > such as Dataflow).
> > > >
> > > > Reuven
> > > >
> > > > On Wed, Aug 9, 2017 at 8:32 AM, Kenneth Knowles
> <k...@google.com.invalid
> > >
> > > > wrote:
> > > >
> > > > > We've had a few threads related to this. There was one proposal
> that
> > > > seemed
> > > > > to achieve consensus [1]. The TL;DR is that we have to assume any
> > DoFn
> > > > > might have side effects (in the broadest sense of the term where
> > > anything
> > > > > other than a pure mathematical function is a side effect) and when
> we
> > > > want
> > > > > deterministic input we use a special DoFn parameter like distinct
> > from
> > > > > ProcessContext to request it, something like:
> > > > >
> > > > > @ProcessElement
> > > > > public void process(DeterministicInput elem, OutputReceiver
> > > mainOutput) {
> > > > >  ... elem.get() instead of context.element() ...
> > > > >  ... mainOutput.output() instead of context.output() ...
> > > > > }
> > > > >
> > > > > A runner can then add checkpointing if needed or elide it if not
> > > needed.
> > > > It
> > > > > depends on the runner's inherent checkpointing behavior and the
> > ability
> > > > to
> > > > > analyze a pipeline to know whether intervening transforms are
> > > > deterministic
> > > > > functions.
> > > > >
> > > > > I started some work on breaking down
> > > > > (StartBundle|Process|FinishBundle)Context to transition towards
> > this,
> > > > but
> > > > > development has stalled in favor of other priorities. I'd be happy
> to
> > > > chat
> > > > > with anyone who wants to pick this up.
> > > > >
> > > > > Kenn
> > > > >
> > > > > [1]
> > > > > https://lists.apache.org/thread.html/
> ae3c838df060e47148439d1dad818d
> > > > > 5e927b2a25ff00cc4153221dff@%3Cdev.beam.apache.org%3E
> > > > >
> > > > > On Wed, Aug 9, 2017 at 2:07 AM, Aljoscha Krettek <
> > aljos...@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Yes, I think making this explicit would be good. Having a
> > > > transformation
> > > > > > that makes assumptions about how the runner implements certain
> > things
> > > > is
> > > > > > not optimal. Also, I think that most people probably don't use
> > Kafka
> > > > with
> > > > > > the Dataflow Runner (because GCE has Pubsub, but I'm guest
> guessing
> > > > > here).
> > > > > > This would mean that the intersection of "people who would
> benefit
> > > from
> > > > > an
> > > > > > exactly-once Kafka sink" and "people who use Beam on Dataflow" is
> > > > rather
> > > > > > small, and therefore not many people would benefit from such a
> > > > Transform.
> > > > > >
> > > > > > This is all just conjecture, of course.
> > > > > >
> > > > > > Best,
> > > > > > Aljoscha
> > > > > >
> > > > > > > On 8. Aug 2017, at 23:34, Reuven Lax <re...@google.com.INVALID
> >
> > > > wrote:
> > > > > > >
> > > > > > > I think the issue we're hitting is how to write this in Beam.
> > > > > > >
> > > > > > > Dataflow historically guaranteed checkpointing at every GBK
> > (which
> > > > due
> > > > > to
> > > > > > > the design of Dataflow's streaming shuffle was reasonably
> > > efficient).
> > > > > In
> > > > > > > Beam we never formalized these semantics, leaving these syncs
> in
> > a
> > > > gray
> > > > > > > area. I believe the Spark runner currently checkpoints the RDD
> on
> > > > every
> > > > > > > GBK, so these unwritten semantics currently work for Dataflow
> and
> > > for
> > > > > > Spark.
> > > > > > >
> > > > > > > We need someway to express this operation in Beam, whether it
> be
> > > via
> > > > an
> > > > > > > explicit Checkpoint() operation or via marking DoFns as having
> > side
> > > > > > > effects, and having the runner automatically insert such a
> > > Checkpoint
> > > > > in
> > > > > > > front of them. In Flink, this operation can be implemented
> using
> > > what
> > > > > > > Aljoscha posted.
> > > > > > >
> > > > > > > Reuven
> > > > > > >
> > > > > > > On Tue, Aug 8, 2017 at 8:22 AM, Aljoscha Krettek <
> > > > aljos...@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi,
> > > > > > >>
> > > > > > >> In Flink, there is a TwoPhaseCommit SinkFunction that can be
> > used
> > > > for
> > > > > > such
> > > > > > >> cases: [1]. The PR for a Kafka 0.11 exactly once producer
> builds
> > > on
> > > > > > that:
> > > > > > >> [2]
> > > > > > >>
> > > > > > >> Best,
> > > > > > >> Aljoscha
> > > > > > >>
> > > > > > >> [1] https://github.com/apache/flink/blob/
> > > > > 62e99918a45b7215c099fbcf160d45
> > > > > > >> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
> > > > > > >> flink/streaming/api/functions/sink/
> TwoPhaseCommitSinkFunction.
> > > > > java#L55
> > > > > > <
> > > > > > >> https://github.com/apache/flink/blob/
> > > 62e99918a45b7215c099fbcf160d45
> > > > > > >> aa02d4559e/flink-streaming-java/src/main/java/org/apache/
> > > > > > >> flink/streaming/api/functions/sink/
> TwoPhaseCommitSinkFunction.
> > > > > java#L55>
> > > > > > >> [2] https://github.com/apache/flink/pull/4239
> > > > > > >>> On 3. Aug 2017, at 04:03, Raghu Angadi
> > > <rang...@google.com.INVALID
> > > > >
> > > > > > >> wrote:
> > > > > > >>>
> > > > > > >>> Kafka 0.11 added support for transactions[1], which allows
> > > > end-to-end
> > > > > > >>> exactly-once semantics. Beam's KafkaIO users can benefit from
> > > these
> > > > > > while
> > > > > > >>> using runners that support exactly-once processing.
> > > > > > >>>
> > > > > > >>> I have an implementation of EOS support for Kafka sink :
> > > > > > >>> https://github.com/apache/beam/pull/3612
> > > > > > >>> It has two shuffles and builds on Beam state-API and
> checkpoint
> > > > > barrier
> > > > > > >>> between stages (as in Dataflow). Pull request has a longer
> > > > > description.
> > > > > > >>>
> > > > > > >>> - What other runners in addition to Dataflow would be
> > compatible
> > > > with
> > > > > > >> such
> > > > > > >>> a strategy?
> > > > > > >>> - I think it does not quite work for Flink (as it has a
> global
> > > > > > >> checkpoint,
> > > > > > >>> not between the stages). How would one go about implementing
> > > such a
> > > > > > sink.
> > > > > > >>>
> > > > > > >>> Any comments on the pull request are also welcome.
> > > > > > >>>
> > > > > > >>> Thanks,
> > > > > > >>> Raghu.
> > > > > > >>>
> > > > > > >>> [1]
> > > > > > >>> https://www.confluent.io/blog/exactly-once-semantics-are-
> > > > > > >> possible-heres-how-apache-kafka-does-it/
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to