The common use case for a timer is to read in data that was stored using
the state API in processElement. There is no guarantee that is stable, and
I believe no runner currently guarantees this. For example:
class MyDoFn extends DoFn<ElementT, Void> {
@StateId("bag") private final StateSpec<BagState<ElementT>> buffer =
StateSpec.bag(ElementCoder.of());
@TimerId("timer") private final TimerSpec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@ProcessElement public void processElement(@Element ElementT
element, @StateId("bag") BagState<ElementT> bag, @TimerId("timer") Timer
timer) {
bag.add(element);
timer.align(Duration.standardSeconds(30)).offset(Duration.standardSeconds(3)).setRelative();
}
@OnTimer("timer") public void onTimer(@StateId("bag") BagState<ElementT>
bag) {
sendToExternalSystem(bag.read());
}
}
If you tagged onTimer with @RequiresStableInput, then you could guarantee
that if the timer retried then it would read the same elements out of the
bag. Today this is not guaranteed - the data written to the bag might not
even be persisted yet when the timer fires (for example, both the
processElement and the onTimer might be executed by the runner in the same
bundle).
This particular example is a simplistic one of course - you could
accomplish the same thing with triggers. When Raghu worked on the
exactly-once Kafka sink this was very problematic. The final solution used
some specific details of Kafka to work, and is complicated and not portable
to other sinks.
BTW - you can of course just have OnTimer produce the output to another
transform marked with RequiresStableInput. However this solution is very
expensive - every element must be persisted to stable storage multiple
times - and we tried hard to avoid doing this in the Kafka sink.
Reuven
On Mon, Jul 2, 2018 at 6:24 PM Robert Bradshaw <[email protected]> wrote:
> Could you give an example of such a usecase? (I suppose I'm not quite
> following what it means for a timer to be unstable...)
>
> On Mon, Jul 2, 2018 at 6:20 PM Reuven Lax <[email protected]> wrote:
>
>> One issue: we definitely have some strong use cases where we want this on
>> ProcessTimer but not on ProcessElement. Since both are on the same DoFn,
>> I'm not sure how you would represent this as a separate transform.
>>
>> On Mon, Jul 2, 2018 at 5:05 PM Robert Bradshaw <[email protected]>
>> wrote:
>>
>>> Thanks for the writeup.
>>>
>>> I'm wondering with, rather than phrasing this as an annotation on DoFn
>>> methods that gets plumbed down through the portability representation, if
>>> it would make more sense to introduce a new, primitive "EnsureStableInput"
>>> transform. For those runners whose reshuffle provide stable inputs, they
>>> could use that as an implementation, and other runners could provide other
>>> suitable implementations.
>>>
>>>
>>>
>>> On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu <[email protected]> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> Thanks for your feedback on the doc. I have revamped it according to
>>>> all of the comments. The major changes I have made are:
>>>> * The problem description should be more general and accurate now.
>>>> * I added more background information, such as details about Reshuffle,
>>>> so I should be easier to understand now.
>>>> * I made it clear what is the scope of my current project and what
>>>> could be left to future work.
>>>> * It now reflects the current progress of my work, and discusses how it
>>>> should work with the portable pipeline representation (WIP)
>>>>
>>>> Also, I forgot to mention last time that this doc may be interesting to
>>>> those of you interested in Reshuffle, because Reshuffle is used as a
>>>> current workaround for the problem described in the doc.
>>>>
>>>> More comments are always welcome.
>>>>
>>>> Best,
>>>> Robin
>>>>
>>>> On Fri, Jun 15, 2018 at 7:34 AM Kenneth Knowles <[email protected]> wrote:
>>>>
>>>>> Thanks for the write up. It is great to see someone pushing this
>>>>> through.
>>>>>
>>>>> I wanted to bring Luke's high-level question back to the list for
>>>>> visibility: what about portability and other SDKs?
>>>>>
>>>>> Portability is probably trivial, but the "other SDKs" question is
>>>>> probably best answered by folks working on them who can have opinions
>>>>> about
>>>>> how it works in their SDKs idioms.
>>>>>
>>>>> Kenn
>>>>>
>>>>>
>>>>