Hi,
I sent a comment on the doc, just for my understanding of the reshuffle work 
around.
Etienne
Le mardi 03 juillet 2018 à 07:33 -0700, Kenneth Knowles a écrit :
> Since we always assume ProcessElement could have arbitrary side effects (esp. 
> randomization), the state and timers set
> up by a call to ProcessElement cannot be considered stable until they are 
> persisted. It seems very similar to the cost
> of outputting to a downstream @RequiresStableInput transform, if not an 
> identical implementation.
> The thing timers add is a way to loop which you can't do if it is an output.
> Adding @Pure annotations might help, if the input elements are stable and 
> ProcessElement is pure.
> 
> Kenn
> On Mon, Jul 2, 2018 at 7:05 PM Reuven Lax <re...@google.com> wrote:
> > The common use case for a timer is to read in data that was stored using 
> > the state API in processElement. There is
> > no guarantee that is stable, and I believe no runner currently guarantees 
> > this. For example:
> > 
> > class MyDoFn extends DoFn<ElementT, Void> {
> >   @StateId("bag") private final StateSpec<BagState<ElementT>> buffer = 
> > StateSpec.bag(ElementCoder.of());
> >   @TimerId("timer") private final TimerSpec = 
> > TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
> > 
> >   @ProcessElement public void processElement(@Element ElementT element, 
> > @StateId("bag") BagState<ElementT> bag,
> > @TimerId("timer") Timer timer) {
> >       bag.add(element);
> >       
> > timer.align(Duration.standardSeconds(30)).offset(Duration.standardSeconds(3)).setRelative();
> >   }
> > 
> >   @OnTimer("timer") public void onTimer(@StateId("bag") BagState<ElementT> 
> > bag) {
> >     sendToExternalSystem(bag.read());
> >   }
> > }
> > 
> > If you tagged onTimer with @RequiresStableInput, then you could guarantee 
> > that if the timer retried then it would
> > read the same elements out of the bag. Today this is not guaranteed - the 
> > data written to the bag might not even be
> > persisted yet when the timer fires (for example, both the processElement 
> > and the onTimer might be executed by the
> > runner in the same bundle).
> > 
> > This particular example is a simplistic one of course - you could 
> > accomplish the same thing with triggers. When
> > Raghu worked on the exactly-once Kafka sink this was very problematic. The 
> > final solution used some specific details
> > of Kafka to work, and is complicated and not portable to other sinks.
> > 
> > BTW - you can of course just have OnTimer produce the output to another 
> > transform marked with RequiresStableInput.
> > However this solution is very expensive - every element must be persisted 
> > to stable storage multiple times - and we
> > tried hard to avoid doing this in the Kafka sink.
> > 
> > Reuven
> > On Mon, Jul 2, 2018 at 6:24 PM Robert Bradshaw <rober...@google.com> wrote:
> > > Could you give an example of such a usecase? (I suppose I'm not quite 
> > > following what it means for a timer to be
> > > unstable...)
> > > 
> > > On Mon, Jul 2, 2018 at 6:20 PM Reuven Lax <re...@google.com> wrote:
> > > > One issue: we definitely have some strong use cases where we want this 
> > > > on ProcessTimer but not on
> > > > ProcessElement. Since both are on the same DoFn, I'm not sure how you 
> > > > would represent this as a separate
> > > > transform.
> > > > On Mon, Jul 2, 2018 at 5:05 PM Robert Bradshaw <rober...@google.com> 
> > > > wrote:
> > > > > Thanks for the writeup. 
> > > > > I'm wondering with, rather than phrasing this as an annotation on 
> > > > > DoFn methods that gets plumbed down through
> > > > > the portability representation, if it would make more sense to 
> > > > > introduce a new, primitive "EnsureStableInput"
> > > > > transform. For those runners whose reshuffle provide stable inputs, 
> > > > > they could use that as an implementation,
> > > > > and other runners could provide other suitable implementations. 
> > > > > 
> > > > > 
> > > > > 
> > > > > On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu <robi...@google.com> wrote:
> > > > > > Hi everyone,
> > > > > > Thanks for your feedback on the doc. I have revamped it according 
> > > > > > to all of the comments. The major changes
> > > > > > I have made are:
> > > > > > * The problem description should be more general and accurate now.
> > > > > > * I added more background information, such as details about 
> > > > > > Reshuffle, so I should be easier to understand
> > > > > > now.
> > > > > > * I made it clear what is the scope of my current project and what 
> > > > > > could be left to future work.
> > > > > > * It now reflects the current progress of my work, and discusses 
> > > > > > how it should work with the portable
> > > > > > pipeline representation (WIP)
> > > > > > 
> > > > > > Also, I forgot to mention last time that this doc may be 
> > > > > > interesting to those of you interested in
> > > > > > Reshuffle, because Reshuffle is used as a current workaround for 
> > > > > > the problem described in the doc.
> > > > > > 
> > > > > > More comments are always welcome.
> > > > > > 
> > > > > > Best,
> > > > > > Robin
> > > > > > On Fri, Jun 15, 2018 at 7:34 AM Kenneth Knowles <k...@google.com> 
> > > > > > wrote:
> > > > > > > Thanks for the write up. It is great to see someone pushing this 
> > > > > > > through.
> > > > > > > I wanted to bring Luke's high-level question back to the list for 
> > > > > > > visibility: what about portability and
> > > > > > > other SDKs?
> > > > > > > Portability is probably trivial, but the "other SDKs" question is 
> > > > > > > probably best answered by folks working
> > > > > > > on them who can have opinions about how it works in their SDKs 
> > > > > > > idioms.
> > > > > > > 
> > > > > > > Kenn

Reply via email to