Does it make sense to only have some inputs be stable for a transform or for the entire transform to require stable inputs?
On Tue, Jul 3, 2018 at 7:34 AM Kenneth Knowles <k...@google.com> wrote: > Since we always assume ProcessElement could have arbitrary side effects > (esp. randomization), the state and timers set up by a call to > ProcessElement cannot be considered stable until they are persisted. It > seems very similar to the cost of outputting to a downstream > @RequiresStableInput transform, if not an identical implementation. > > The thing timers add is a way to loop which you can't do if it is an > output. > > Adding @Pure annotations might help, if the input elements are stable and > ProcessElement is pure. > > Kenn > > On Mon, Jul 2, 2018 at 7:05 PM Reuven Lax <re...@google.com> wrote: > >> The common use case for a timer is to read in data that was stored using >> the state API in processElement. There is no guarantee that is stable, and >> I believe no runner currently guarantees this. For example: >> >> class MyDoFn extends DoFn<ElementT, Void> { >> @StateId("bag") private final StateSpec<BagState<ElementT>> buffer = >> StateSpec.bag(ElementCoder.of()); >> @TimerId("timer") private final TimerSpec = >> TimerSpecs.timer(TimeDomain.PROCESSING_TIME); >> >> @ProcessElement public void processElement(@Element ElementT >> element, @StateId("bag") BagState<ElementT> bag, @TimerId("timer") Timer >> timer) { >> bag.add(element); >> >> timer.align(Duration.standardSeconds(30)).offset(Duration.standardSeconds(3)).setRelative(); >> } >> >> @OnTimer("timer") public void onTimer(@StateId("bag") >> BagState<ElementT> bag) { >> sendToExternalSystem(bag.read()); >> } >> } >> >> If you tagged onTimer with @RequiresStableInput, then you could guarantee >> that if the timer retried then it would read the same elements out of the >> bag. Today this is not guaranteed - the data written to the bag might not >> even be persisted yet when the timer fires (for example, both the >> processElement and the onTimer might be executed by the runner in the same >> bundle). >> >> This particular example is a simplistic one of course - you could >> accomplish the same thing with triggers. When Raghu worked on the >> exactly-once Kafka sink this was very problematic. The final solution used >> some specific details of Kafka to work, and is complicated and not portable >> to other sinks. >> >> BTW - you can of course just have OnTimer produce the output to another >> transform marked with RequiresStableInput. However this solution is very >> expensive - every element must be persisted to stable storage multiple >> times - and we tried hard to avoid doing this in the Kafka sink. >> >> Reuven >> >> On Mon, Jul 2, 2018 at 6:24 PM Robert Bradshaw <rober...@google.com> >> wrote: >> >>> Could you give an example of such a usecase? (I suppose I'm not quite >>> following what it means for a timer to be unstable...) >>> >>> On Mon, Jul 2, 2018 at 6:20 PM Reuven Lax <re...@google.com> wrote: >>> >>>> One issue: we definitely have some strong use cases where we want this >>>> on ProcessTimer but not on ProcessElement. Since both are on the same DoFn, >>>> I'm not sure how you would represent this as a separate transform. >>>> >>>> On Mon, Jul 2, 2018 at 5:05 PM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>> >>>>> Thanks for the writeup. >>>>> >>>>> I'm wondering with, rather than phrasing this as an annotation on DoFn >>>>> methods that gets plumbed down through the portability representation, if >>>>> it would make more sense to introduce a new, primitive "EnsureStableInput" >>>>> transform. For those runners whose reshuffle provide stable inputs, they >>>>> could use that as an implementation, and other runners could provide other >>>>> suitable implementations. >>>>> >>>>> >>>>> >>>>> On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu <robi...@google.com> wrote: >>>>> >>>>>> Hi everyone, >>>>>> >>>>>> Thanks for your feedback on the doc. I have revamped it according to >>>>>> all of the comments. The major changes I have made are: >>>>>> * The problem description should be more general and accurate now. >>>>>> * I added more background information, such as details about >>>>>> Reshuffle, so I should be easier to understand now. >>>>>> * I made it clear what is the scope of my current project and what >>>>>> could be left to future work. >>>>>> * It now reflects the current progress of my work, and discusses how >>>>>> it should work with the portable pipeline representation (WIP) >>>>>> >>>>>> Also, I forgot to mention last time that this doc may be interesting >>>>>> to those of you interested in Reshuffle, because Reshuffle is used as a >>>>>> current workaround for the problem described in the doc. >>>>>> >>>>>> More comments are always welcome. >>>>>> >>>>>> Best, >>>>>> Robin >>>>>> >>>>>> On Fri, Jun 15, 2018 at 7:34 AM Kenneth Knowles <k...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks for the write up. It is great to see someone pushing this >>>>>>> through. >>>>>>> >>>>>>> I wanted to bring Luke's high-level question back to the list for >>>>>>> visibility: what about portability and other SDKs? >>>>>>> >>>>>>> Portability is probably trivial, but the "other SDKs" question is >>>>>>> probably best answered by folks working on them who can have opinions >>>>>>> about >>>>>>> how it works in their SDKs idioms. >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> >>>>>>