I'm also thinking that it would be best to apply to the whole transform. So side inputs, main inputs, timers and any future input constructs.
On Sat, Jul 7, 2018 at 2:00 PM Reuven Lax <re...@google.com> wrote: > I think the entire transform. There might be some use case for having only > some inputs stable, but I can't think of any offhand. > > BTW, it so happens that with DataflowRunner all side inputs happen to be > stable (though that's more of a side effect of implementation). > > On Tue, Jul 3, 2018 at 9:46 AM Lukasz Cwik <lc...@google.com> wrote: > >> Does it make sense to only have some inputs be stable for a transform or >> for the entire transform to require stable inputs? >> >> On Tue, Jul 3, 2018 at 7:34 AM Kenneth Knowles <k...@google.com> wrote: >> >>> Since we always assume ProcessElement could have arbitrary side effects >>> (esp. randomization), the state and timers set up by a call to >>> ProcessElement cannot be considered stable until they are persisted. It >>> seems very similar to the cost of outputting to a downstream >>> @RequiresStableInput transform, if not an identical implementation. >>> >>> The thing timers add is a way to loop which you can't do if it is an >>> output. >>> >>> Adding @Pure annotations might help, if the input elements are stable >>> and ProcessElement is pure. >>> >>> Kenn >>> >>> On Mon, Jul 2, 2018 at 7:05 PM Reuven Lax <re...@google.com> wrote: >>> >>>> The common use case for a timer is to read in data that was stored >>>> using the state API in processElement. There is no guarantee that is >>>> stable, and I believe no runner currently guarantees this. For example: >>>> >>>> class MyDoFn extends DoFn<ElementT, Void> { >>>> @StateId("bag") private final StateSpec<BagState<ElementT>> buffer = >>>> StateSpec.bag(ElementCoder.of()); >>>> @TimerId("timer") private final TimerSpec = >>>> TimerSpecs.timer(TimeDomain.PROCESSING_TIME); >>>> >>>> @ProcessElement public void processElement(@Element ElementT >>>> element, @StateId("bag") BagState<ElementT> bag, @TimerId("timer") Timer >>>> timer) { >>>> bag.add(element); >>>> >>>> timer.align(Duration.standardSeconds(30)).offset(Duration.standardSeconds(3)).setRelative(); >>>> } >>>> >>>> @OnTimer("timer") public void onTimer(@StateId("bag") >>>> BagState<ElementT> bag) { >>>> sendToExternalSystem(bag.read()); >>>> } >>>> } >>>> >>>> If you tagged onTimer with @RequiresStableInput, then you could >>>> guarantee that if the timer retried then it would read the same elements >>>> out of the bag. Today this is not guaranteed - the data written to the bag >>>> might not even be persisted yet when the timer fires (for example, both the >>>> processElement and the onTimer might be executed by the runner in the same >>>> bundle). >>>> >>>> This particular example is a simplistic one of course - you could >>>> accomplish the same thing with triggers. When Raghu worked on the >>>> exactly-once Kafka sink this was very problematic. The final solution used >>>> some specific details of Kafka to work, and is complicated and not portable >>>> to other sinks. >>>> >>>> BTW - you can of course just have OnTimer produce the output to another >>>> transform marked with RequiresStableInput. However this solution is very >>>> expensive - every element must be persisted to stable storage multiple >>>> times - and we tried hard to avoid doing this in the Kafka sink. >>>> >>>> Reuven >>>> >>>> On Mon, Jul 2, 2018 at 6:24 PM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>> >>>>> Could you give an example of such a usecase? (I suppose I'm not quite >>>>> following what it means for a timer to be unstable...) >>>>> >>>>> On Mon, Jul 2, 2018 at 6:20 PM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> One issue: we definitely have some strong use cases where we want >>>>>> this on ProcessTimer but not on ProcessElement. Since both are on the >>>>>> same >>>>>> DoFn, I'm not sure how you would represent this as a separate transform. >>>>>> >>>>>> On Mon, Jul 2, 2018 at 5:05 PM Robert Bradshaw <rober...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks for the writeup. >>>>>>> >>>>>>> I'm wondering with, rather than phrasing this as an annotation on >>>>>>> DoFn methods that gets plumbed down through the portability >>>>>>> representation, >>>>>>> if it would make more sense to introduce a new, primitive >>>>>>> "EnsureStableInput" transform. For those runners whose reshuffle provide >>>>>>> stable inputs, they could use that as an implementation, and other >>>>>>> runners >>>>>>> could provide other suitable implementations. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu <robi...@google.com> wrote: >>>>>>> >>>>>>>> Hi everyone, >>>>>>>> >>>>>>>> Thanks for your feedback on the doc. I have revamped it according >>>>>>>> to all of the comments. The major changes I have made are: >>>>>>>> * The problem description should be more general and accurate now. >>>>>>>> * I added more background information, such as details about >>>>>>>> Reshuffle, so I should be easier to understand now. >>>>>>>> * I made it clear what is the scope of my current project and what >>>>>>>> could be left to future work. >>>>>>>> * It now reflects the current progress of my work, and discusses >>>>>>>> how it should work with the portable pipeline representation (WIP) >>>>>>>> >>>>>>>> Also, I forgot to mention last time that this doc may be >>>>>>>> interesting to those of you interested in Reshuffle, because Reshuffle >>>>>>>> is >>>>>>>> used as a current workaround for the problem described in the doc. >>>>>>>> >>>>>>>> More comments are always welcome. >>>>>>>> >>>>>>>> Best, >>>>>>>> Robin >>>>>>>> >>>>>>>> On Fri, Jun 15, 2018 at 7:34 AM Kenneth Knowles <k...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks for the write up. It is great to see someone pushing this >>>>>>>>> through. >>>>>>>>> >>>>>>>>> I wanted to bring Luke's high-level question back to the list for >>>>>>>>> visibility: what about portability and other SDKs? >>>>>>>>> >>>>>>>>> Portability is probably trivial, but the "other SDKs" question is >>>>>>>>> probably best answered by folks working on them who can have opinions >>>>>>>>> about >>>>>>>>> how it works in their SDKs idioms. >>>>>>>>> >>>>>>>>> Kenn >>>>>>>>> >>>>>>>>> >>>>>>>>