Does it make sense to only have some inputs be stable for a transform or
for the entire transform to require stable inputs?

On Tue, Jul 3, 2018 at 7:34 AM Kenneth Knowles <k...@google.com> wrote:

> Since we always assume ProcessElement could have arbitrary side effects
> (esp. randomization), the state and timers set up by a call to
> ProcessElement cannot be considered stable until they are persisted. It
> seems very similar to the cost of outputting to a downstream
> @RequiresStableInput transform, if not an identical implementation.
>
> The thing timers add is a way to loop which you can't do if it is an
> output.
>
> Adding @Pure annotations might help, if the input elements are stable and
> ProcessElement is pure.
>
> Kenn
>
> On Mon, Jul 2, 2018 at 7:05 PM Reuven Lax <re...@google.com> wrote:
>
>> The common use case for a timer is to read in data that was stored using
>> the state API in processElement. There is no guarantee that is stable, and
>> I believe no runner currently guarantees this. For example:
>>
>> class MyDoFn extends DoFn<ElementT, Void> {
>>   @StateId("bag") private final StateSpec<BagState<ElementT>> buffer =
>> StateSpec.bag(ElementCoder.of());
>>   @TimerId("timer") private final TimerSpec =
>> TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
>>
>>   @ProcessElement public void processElement(@Element ElementT
>> element, @StateId("bag") BagState<ElementT> bag, @TimerId("timer") Timer
>> timer) {
>>       bag.add(element);
>>
>> timer.align(Duration.standardSeconds(30)).offset(Duration.standardSeconds(3)).setRelative();
>>   }
>>
>>   @OnTimer("timer") public void onTimer(@StateId("bag")
>> BagState<ElementT> bag) {
>>     sendToExternalSystem(bag.read());
>>   }
>> }
>>
>> If you tagged onTimer with @RequiresStableInput, then you could guarantee
>> that if the timer retried then it would read the same elements out of the
>> bag. Today this is not guaranteed - the data written to the bag might not
>> even be persisted yet when the timer fires (for example, both the
>> processElement and the onTimer might be executed by the runner in the same
>> bundle).
>>
>> This particular example is a simplistic one of course - you could
>> accomplish the same thing with triggers. When Raghu worked on the
>> exactly-once Kafka sink this was very problematic. The final solution used
>> some specific details of Kafka to work, and is complicated and not portable
>> to other sinks.
>>
>> BTW - you can of course just have OnTimer produce the output to another
>> transform marked with RequiresStableInput. However this solution is very
>> expensive - every element must be persisted to stable storage multiple
>> times - and we tried hard to avoid doing this in the Kafka sink.
>>
>> Reuven
>>
>> On Mon, Jul 2, 2018 at 6:24 PM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> Could you give an example of such a usecase? (I suppose I'm not quite
>>> following what it means for a timer to be unstable...)
>>>
>>> On Mon, Jul 2, 2018 at 6:20 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> One issue: we definitely have some strong use cases where we want this
>>>> on ProcessTimer but not on ProcessElement. Since both are on the same DoFn,
>>>> I'm not sure how you would represent this as a separate transform.
>>>>
>>>> On Mon, Jul 2, 2018 at 5:05 PM Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>>
>>>>> Thanks for the writeup.
>>>>>
>>>>> I'm wondering with, rather than phrasing this as an annotation on DoFn
>>>>> methods that gets plumbed down through the portability representation, if
>>>>> it would make more sense to introduce a new, primitive "EnsureStableInput"
>>>>> transform. For those runners whose reshuffle provide stable inputs, they
>>>>> could use that as an implementation, and other runners could provide other
>>>>> suitable implementations.
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu <robi...@google.com> wrote:
>>>>>
>>>>>> Hi everyone,
>>>>>>
>>>>>> Thanks for your feedback on the doc. I have revamped it according to
>>>>>> all of the comments. The major changes I have made are:
>>>>>> * The problem description should be more general and accurate now.
>>>>>> * I added more background information, such as details about
>>>>>> Reshuffle, so I should be easier to understand now.
>>>>>> * I made it clear what is the scope of my current project and what
>>>>>> could be left to future work.
>>>>>> * It now reflects the current progress of my work, and discusses how
>>>>>> it should work with the portable pipeline representation (WIP)
>>>>>>
>>>>>> Also, I forgot to mention last time that this doc may be interesting
>>>>>> to those of you interested in Reshuffle, because Reshuffle is used as a
>>>>>> current workaround for the problem described in the doc.
>>>>>>
>>>>>> More comments are always welcome.
>>>>>>
>>>>>> Best,
>>>>>> Robin
>>>>>>
>>>>>> On Fri, Jun 15, 2018 at 7:34 AM Kenneth Knowles <k...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the write up. It is great to see someone pushing this
>>>>>>> through.
>>>>>>>
>>>>>>> I wanted to bring Luke's high-level question back to the list for
>>>>>>> visibility: what about portability and other SDKs?
>>>>>>>
>>>>>>> Portability is probably trivial, but the "other SDKs" question is
>>>>>>> probably best answered by folks working on them who can have opinions 
>>>>>>> about
>>>>>>> how it works in their SDKs idioms.
>>>>>>>
>>>>>>> Kenn
>>>>>>> ​
>>>>>>>
>>>>>>

Reply via email to