Since we always assume ProcessElement could have arbitrary side effects
(esp. randomization), the state and timers set up by a call to
ProcessElement cannot be considered stable until they are persisted. It
seems very similar to the cost of outputting to a downstream
@RequiresStableInput transform, if not an identical implementation.

The thing timers add is a way to loop which you can't do if it is an output.

Adding @Pure annotations might help, if the input elements are stable and
ProcessElement is pure.

Kenn

On Mon, Jul 2, 2018 at 7:05 PM Reuven Lax <re...@google.com> wrote:

> The common use case for a timer is to read in data that was stored using
> the state API in processElement. There is no guarantee that is stable, and
> I believe no runner currently guarantees this. For example:
>
> class MyDoFn extends DoFn<ElementT, Void> {
>   @StateId("bag") private final StateSpec<BagState<ElementT>> buffer =
> StateSpec.bag(ElementCoder.of());
>   @TimerId("timer") private final TimerSpec =
> TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
>
>   @ProcessElement public void processElement(@Element ElementT
> element, @StateId("bag") BagState<ElementT> bag, @TimerId("timer") Timer
> timer) {
>       bag.add(element);
>
> timer.align(Duration.standardSeconds(30)).offset(Duration.standardSeconds(3)).setRelative();
>   }
>
>   @OnTimer("timer") public void onTimer(@StateId("bag")
> BagState<ElementT> bag) {
>     sendToExternalSystem(bag.read());
>   }
> }
>
> If you tagged onTimer with @RequiresStableInput, then you could guarantee
> that if the timer retried then it would read the same elements out of the
> bag. Today this is not guaranteed - the data written to the bag might not
> even be persisted yet when the timer fires (for example, both the
> processElement and the onTimer might be executed by the runner in the same
> bundle).
>
> This particular example is a simplistic one of course - you could
> accomplish the same thing with triggers. When Raghu worked on the
> exactly-once Kafka sink this was very problematic. The final solution used
> some specific details of Kafka to work, and is complicated and not portable
> to other sinks.
>
> BTW - you can of course just have OnTimer produce the output to another
> transform marked with RequiresStableInput. However this solution is very
> expensive - every element must be persisted to stable storage multiple
> times - and we tried hard to avoid doing this in the Kafka sink.
>
> Reuven
>
> On Mon, Jul 2, 2018 at 6:24 PM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> Could you give an example of such a usecase? (I suppose I'm not quite
>> following what it means for a timer to be unstable...)
>>
>> On Mon, Jul 2, 2018 at 6:20 PM Reuven Lax <re...@google.com> wrote:
>>
>>> One issue: we definitely have some strong use cases where we want this
>>> on ProcessTimer but not on ProcessElement. Since both are on the same DoFn,
>>> I'm not sure how you would represent this as a separate transform.
>>>
>>> On Mon, Jul 2, 2018 at 5:05 PM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> Thanks for the writeup.
>>>>
>>>> I'm wondering with, rather than phrasing this as an annotation on DoFn
>>>> methods that gets plumbed down through the portability representation, if
>>>> it would make more sense to introduce a new, primitive "EnsureStableInput"
>>>> transform. For those runners whose reshuffle provide stable inputs, they
>>>> could use that as an implementation, and other runners could provide other
>>>> suitable implementations.
>>>>
>>>>
>>>>
>>>> On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu <robi...@google.com> wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> Thanks for your feedback on the doc. I have revamped it according to
>>>>> all of the comments. The major changes I have made are:
>>>>> * The problem description should be more general and accurate now.
>>>>> * I added more background information, such as details about
>>>>> Reshuffle, so I should be easier to understand now.
>>>>> * I made it clear what is the scope of my current project and what
>>>>> could be left to future work.
>>>>> * It now reflects the current progress of my work, and discusses how
>>>>> it should work with the portable pipeline representation (WIP)
>>>>>
>>>>> Also, I forgot to mention last time that this doc may be interesting
>>>>> to those of you interested in Reshuffle, because Reshuffle is used as a
>>>>> current workaround for the problem described in the doc.
>>>>>
>>>>> More comments are always welcome.
>>>>>
>>>>> Best,
>>>>> Robin
>>>>>
>>>>> On Fri, Jun 15, 2018 at 7:34 AM Kenneth Knowles <k...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the write up. It is great to see someone pushing this
>>>>>> through.
>>>>>>
>>>>>> I wanted to bring Luke's high-level question back to the list for
>>>>>> visibility: what about portability and other SDKs?
>>>>>>
>>>>>> Portability is probably trivial, but the "other SDKs" question is
>>>>>> probably best answered by folks working on them who can have opinions 
>>>>>> about
>>>>>> how it works in their SDKs idioms.
>>>>>>
>>>>>> Kenn
>>>>>> ​
>>>>>>
>>>>>

Reply via email to