Hi,
I sent a comment on the doc, just for my understanding of the reshuffle work
around.
Etienne
Le mardi 03 juillet 2018 à 07:33 -0700, Kenneth Knowles a écrit :
> Since we always assume ProcessElement could have arbitrary side effects (esp.
> randomization), the state and timers set
> up by a call to ProcessElement cannot be considered stable until they are
> persisted. It seems very similar to the cost
> of outputting to a downstream @RequiresStableInput transform, if not an
> identical implementation.
> The thing timers add is a way to loop which you can't do if it is an output.
> Adding @Pure annotations might help, if the input elements are stable and
> ProcessElement is pure.
>
> Kenn
> On Mon, Jul 2, 2018 at 7:05 PM Reuven Lax <re...@google.com> wrote:
> > The common use case for a timer is to read in data that was stored using
> > the state API in processElement. There is
> > no guarantee that is stable, and I believe no runner currently guarantees
> > this. For example:
> >
> > class MyDoFn extends DoFn<ElementT, Void> {
> > @StateId("bag") private final StateSpec<BagState<ElementT>> buffer =
> > StateSpec.bag(ElementCoder.of());
> > @TimerId("timer") private final TimerSpec =
> > TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
> >
> > @ProcessElement public void processElement(@Element ElementT element,
> > @StateId("bag") BagState<ElementT> bag,
> > @TimerId("timer") Timer timer) {
> > bag.add(element);
> >
> > timer.align(Duration.standardSeconds(30)).offset(Duration.standardSeconds(3)).setRelative();
> > }
> >
> > @OnTimer("timer") public void onTimer(@StateId("bag") BagState<ElementT>
> > bag) {
> > sendToExternalSystem(bag.read());
> > }
> > }
> >
> > If you tagged onTimer with @RequiresStableInput, then you could guarantee
> > that if the timer retried then it would
> > read the same elements out of the bag. Today this is not guaranteed - the
> > data written to the bag might not even be
> > persisted yet when the timer fires (for example, both the processElement
> > and the onTimer might be executed by the
> > runner in the same bundle).
> >
> > This particular example is a simplistic one of course - you could
> > accomplish the same thing with triggers. When
> > Raghu worked on the exactly-once Kafka sink this was very problematic. The
> > final solution used some specific details
> > of Kafka to work, and is complicated and not portable to other sinks.
> >
> > BTW - you can of course just have OnTimer produce the output to another
> > transform marked with RequiresStableInput.
> > However this solution is very expensive - every element must be persisted
> > to stable storage multiple times - and we
> > tried hard to avoid doing this in the Kafka sink.
> >
> > Reuven
> > On Mon, Jul 2, 2018 at 6:24 PM Robert Bradshaw <rober...@google.com> wrote:
> > > Could you give an example of such a usecase? (I suppose I'm not quite
> > > following what it means for a timer to be
> > > unstable...)
> > >
> > > On Mon, Jul 2, 2018 at 6:20 PM Reuven Lax <re...@google.com> wrote:
> > > > One issue: we definitely have some strong use cases where we want this
> > > > on ProcessTimer but not on
> > > > ProcessElement. Since both are on the same DoFn, I'm not sure how you
> > > > would represent this as a separate
> > > > transform.
> > > > On Mon, Jul 2, 2018 at 5:05 PM Robert Bradshaw <rober...@google.com>
> > > > wrote:
> > > > > Thanks for the writeup.
> > > > > I'm wondering with, rather than phrasing this as an annotation on
> > > > > DoFn methods that gets plumbed down through
> > > > > the portability representation, if it would make more sense to
> > > > > introduce a new, primitive "EnsureStableInput"
> > > > > transform. For those runners whose reshuffle provide stable inputs,
> > > > > they could use that as an implementation,
> > > > > and other runners could provide other suitable implementations.
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu <robi...@google.com> wrote:
> > > > > > Hi everyone,
> > > > > > Thanks for your feedback on the doc. I have revamped it according
> > > > > > to all of the comments. The major changes
> > > > > > I have made are:
> > > > > > * The problem description should be more general and accurate now.
> > > > > > * I added more background information, such as details about
> > > > > > Reshuffle, so I should be easier to understand
> > > > > > now.
> > > > > > * I made it clear what is the scope of my current project and what
> > > > > > could be left to future work.
> > > > > > * It now reflects the current progress of my work, and discusses
> > > > > > how it should work with the portable
> > > > > > pipeline representation (WIP)
> > > > > >
> > > > > > Also, I forgot to mention last time that this doc may be
> > > > > > interesting to those of you interested in
> > > > > > Reshuffle, because Reshuffle is used as a current workaround for
> > > > > > the problem described in the doc.
> > > > > >
> > > > > > More comments are always welcome.
> > > > > >
> > > > > > Best,
> > > > > > Robin
> > > > > > On Fri, Jun 15, 2018 at 7:34 AM Kenneth Knowles <k...@google.com>
> > > > > > wrote:
> > > > > > > Thanks for the write up. It is great to see someone pushing this
> > > > > > > through.
> > > > > > > I wanted to bring Luke's high-level question back to the list for
> > > > > > > visibility: what about portability and
> > > > > > > other SDKs?
> > > > > > > Portability is probably trivial, but the "other SDKs" question is
> > > > > > > probably best answered by folks working
> > > > > > > on them who can have opinions about how it works in their SDKs
> > > > > > > idioms.
> > > > > > >
> > > > > > > Kenn