Since we always assume ProcessElement could have arbitrary side effects (esp. randomization), the state and timers set up by a call to ProcessElement cannot be considered stable until they are persisted. It seems very similar to the cost of outputting to a downstream @RequiresStableInput transform, if not an identical implementation.
The thing timers add is a way to loop which you can't do if it is an output. Adding @Pure annotations might help, if the input elements are stable and ProcessElement is pure. Kenn On Mon, Jul 2, 2018 at 7:05 PM Reuven Lax <re...@google.com> wrote: > The common use case for a timer is to read in data that was stored using > the state API in processElement. There is no guarantee that is stable, and > I believe no runner currently guarantees this. For example: > > class MyDoFn extends DoFn<ElementT, Void> { > @StateId("bag") private final StateSpec<BagState<ElementT>> buffer = > StateSpec.bag(ElementCoder.of()); > @TimerId("timer") private final TimerSpec = > TimerSpecs.timer(TimeDomain.PROCESSING_TIME); > > @ProcessElement public void processElement(@Element ElementT > element, @StateId("bag") BagState<ElementT> bag, @TimerId("timer") Timer > timer) { > bag.add(element); > > timer.align(Duration.standardSeconds(30)).offset(Duration.standardSeconds(3)).setRelative(); > } > > @OnTimer("timer") public void onTimer(@StateId("bag") > BagState<ElementT> bag) { > sendToExternalSystem(bag.read()); > } > } > > If you tagged onTimer with @RequiresStableInput, then you could guarantee > that if the timer retried then it would read the same elements out of the > bag. Today this is not guaranteed - the data written to the bag might not > even be persisted yet when the timer fires (for example, both the > processElement and the onTimer might be executed by the runner in the same > bundle). > > This particular example is a simplistic one of course - you could > accomplish the same thing with triggers. When Raghu worked on the > exactly-once Kafka sink this was very problematic. The final solution used > some specific details of Kafka to work, and is complicated and not portable > to other sinks. > > BTW - you can of course just have OnTimer produce the output to another > transform marked with RequiresStableInput. However this solution is very > expensive - every element must be persisted to stable storage multiple > times - and we tried hard to avoid doing this in the Kafka sink. > > Reuven > > On Mon, Jul 2, 2018 at 6:24 PM Robert Bradshaw <rober...@google.com> > wrote: > >> Could you give an example of such a usecase? (I suppose I'm not quite >> following what it means for a timer to be unstable...) >> >> On Mon, Jul 2, 2018 at 6:20 PM Reuven Lax <re...@google.com> wrote: >> >>> One issue: we definitely have some strong use cases where we want this >>> on ProcessTimer but not on ProcessElement. Since both are on the same DoFn, >>> I'm not sure how you would represent this as a separate transform. >>> >>> On Mon, Jul 2, 2018 at 5:05 PM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> Thanks for the writeup. >>>> >>>> I'm wondering with, rather than phrasing this as an annotation on DoFn >>>> methods that gets plumbed down through the portability representation, if >>>> it would make more sense to introduce a new, primitive "EnsureStableInput" >>>> transform. For those runners whose reshuffle provide stable inputs, they >>>> could use that as an implementation, and other runners could provide other >>>> suitable implementations. >>>> >>>> >>>> >>>> On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu <robi...@google.com> wrote: >>>> >>>>> Hi everyone, >>>>> >>>>> Thanks for your feedback on the doc. I have revamped it according to >>>>> all of the comments. The major changes I have made are: >>>>> * The problem description should be more general and accurate now. >>>>> * I added more background information, such as details about >>>>> Reshuffle, so I should be easier to understand now. >>>>> * I made it clear what is the scope of my current project and what >>>>> could be left to future work. >>>>> * It now reflects the current progress of my work, and discusses how >>>>> it should work with the portable pipeline representation (WIP) >>>>> >>>>> Also, I forgot to mention last time that this doc may be interesting >>>>> to those of you interested in Reshuffle, because Reshuffle is used as a >>>>> current workaround for the problem described in the doc. >>>>> >>>>> More comments are always welcome. >>>>> >>>>> Best, >>>>> Robin >>>>> >>>>> On Fri, Jun 15, 2018 at 7:34 AM Kenneth Knowles <k...@google.com> >>>>> wrote: >>>>> >>>>>> Thanks for the write up. It is great to see someone pushing this >>>>>> through. >>>>>> >>>>>> I wanted to bring Luke's high-level question back to the list for >>>>>> visibility: what about portability and other SDKs? >>>>>> >>>>>> Portability is probably trivial, but the "other SDKs" question is >>>>>> probably best answered by folks working on them who can have opinions >>>>>> about >>>>>> how it works in their SDKs idioms. >>>>>> >>>>>> Kenn >>>>>> >>>>>> >>>>>