I think the entire transform. There might be some use case for having only some inputs stable, but I can't think of any offhand.
BTW, it so happens that with DataflowRunner all side inputs happen to be stable (though that's more of a side effect of implementation). On Tue, Jul 3, 2018 at 9:46 AM Lukasz Cwik <[email protected]> wrote: > Does it make sense to only have some inputs be stable for a transform or > for the entire transform to require stable inputs? > > On Tue, Jul 3, 2018 at 7:34 AM Kenneth Knowles <[email protected]> wrote: > >> Since we always assume ProcessElement could have arbitrary side effects >> (esp. randomization), the state and timers set up by a call to >> ProcessElement cannot be considered stable until they are persisted. It >> seems very similar to the cost of outputting to a downstream >> @RequiresStableInput transform, if not an identical implementation. >> >> The thing timers add is a way to loop which you can't do if it is an >> output. >> >> Adding @Pure annotations might help, if the input elements are stable and >> ProcessElement is pure. >> >> Kenn >> >> On Mon, Jul 2, 2018 at 7:05 PM Reuven Lax <[email protected]> wrote: >> >>> The common use case for a timer is to read in data that was stored using >>> the state API in processElement. There is no guarantee that is stable, and >>> I believe no runner currently guarantees this. For example: >>> >>> class MyDoFn extends DoFn<ElementT, Void> { >>> @StateId("bag") private final StateSpec<BagState<ElementT>> buffer = >>> StateSpec.bag(ElementCoder.of()); >>> @TimerId("timer") private final TimerSpec = >>> TimerSpecs.timer(TimeDomain.PROCESSING_TIME); >>> >>> @ProcessElement public void processElement(@Element ElementT >>> element, @StateId("bag") BagState<ElementT> bag, @TimerId("timer") Timer >>> timer) { >>> bag.add(element); >>> >>> timer.align(Duration.standardSeconds(30)).offset(Duration.standardSeconds(3)).setRelative(); >>> } >>> >>> @OnTimer("timer") public void onTimer(@StateId("bag") >>> BagState<ElementT> bag) { >>> sendToExternalSystem(bag.read()); >>> } >>> } >>> >>> If you tagged onTimer with @RequiresStableInput, then you could >>> guarantee that if the timer retried then it would read the same elements >>> out of the bag. Today this is not guaranteed - the data written to the bag >>> might not even be persisted yet when the timer fires (for example, both the >>> processElement and the onTimer might be executed by the runner in the same >>> bundle). >>> >>> This particular example is a simplistic one of course - you could >>> accomplish the same thing with triggers. When Raghu worked on the >>> exactly-once Kafka sink this was very problematic. The final solution used >>> some specific details of Kafka to work, and is complicated and not portable >>> to other sinks. >>> >>> BTW - you can of course just have OnTimer produce the output to another >>> transform marked with RequiresStableInput. However this solution is very >>> expensive - every element must be persisted to stable storage multiple >>> times - and we tried hard to avoid doing this in the Kafka sink. >>> >>> Reuven >>> >>> On Mon, Jul 2, 2018 at 6:24 PM Robert Bradshaw <[email protected]> >>> wrote: >>> >>>> Could you give an example of such a usecase? (I suppose I'm not quite >>>> following what it means for a timer to be unstable...) >>>> >>>> On Mon, Jul 2, 2018 at 6:20 PM Reuven Lax <[email protected]> wrote: >>>> >>>>> One issue: we definitely have some strong use cases where we want this >>>>> on ProcessTimer but not on ProcessElement. Since both are on the same >>>>> DoFn, >>>>> I'm not sure how you would represent this as a separate transform. >>>>> >>>>> On Mon, Jul 2, 2018 at 5:05 PM Robert Bradshaw <[email protected]> >>>>> wrote: >>>>> >>>>>> Thanks for the writeup. >>>>>> >>>>>> I'm wondering with, rather than phrasing this as an annotation on >>>>>> DoFn methods that gets plumbed down through the portability >>>>>> representation, >>>>>> if it would make more sense to introduce a new, primitive >>>>>> "EnsureStableInput" transform. For those runners whose reshuffle provide >>>>>> stable inputs, they could use that as an implementation, and other >>>>>> runners >>>>>> could provide other suitable implementations. >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu <[email protected]> wrote: >>>>>> >>>>>>> Hi everyone, >>>>>>> >>>>>>> Thanks for your feedback on the doc. I have revamped it according to >>>>>>> all of the comments. The major changes I have made are: >>>>>>> * The problem description should be more general and accurate now. >>>>>>> * I added more background information, such as details about >>>>>>> Reshuffle, so I should be easier to understand now. >>>>>>> * I made it clear what is the scope of my current project and what >>>>>>> could be left to future work. >>>>>>> * It now reflects the current progress of my work, and discusses how >>>>>>> it should work with the portable pipeline representation (WIP) >>>>>>> >>>>>>> Also, I forgot to mention last time that this doc may be interesting >>>>>>> to those of you interested in Reshuffle, because Reshuffle is used as a >>>>>>> current workaround for the problem described in the doc. >>>>>>> >>>>>>> More comments are always welcome. >>>>>>> >>>>>>> Best, >>>>>>> Robin >>>>>>> >>>>>>> On Fri, Jun 15, 2018 at 7:34 AM Kenneth Knowles <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks for the write up. It is great to see someone pushing this >>>>>>>> through. >>>>>>>> >>>>>>>> I wanted to bring Luke's high-level question back to the list for >>>>>>>> visibility: what about portability and other SDKs? >>>>>>>> >>>>>>>> Portability is probably trivial, but the "other SDKs" question is >>>>>>>> probably best answered by folks working on them who can have opinions >>>>>>>> about >>>>>>>> how it works in their SDKs idioms. >>>>>>>> >>>>>>>> Kenn >>>>>>>> >>>>>>>> >>>>>>>
