I think the entire transform. There might be some use case for having only
some inputs stable, but I can't think of any offhand.

BTW, it so happens that with DataflowRunner all side inputs happen to be
stable (though that's more of a side effect of implementation).

On Tue, Jul 3, 2018 at 9:46 AM Lukasz Cwik <[email protected]> wrote:

> Does it make sense to only have some inputs be stable for a transform or
> for the entire transform to require stable inputs?
>
> On Tue, Jul 3, 2018 at 7:34 AM Kenneth Knowles <[email protected]> wrote:
>
>> Since we always assume ProcessElement could have arbitrary side effects
>> (esp. randomization), the state and timers set up by a call to
>> ProcessElement cannot be considered stable until they are persisted. It
>> seems very similar to the cost of outputting to a downstream
>> @RequiresStableInput transform, if not an identical implementation.
>>
>> The thing timers add is a way to loop which you can't do if it is an
>> output.
>>
>> Adding @Pure annotations might help, if the input elements are stable and
>> ProcessElement is pure.
>>
>> Kenn
>>
>> On Mon, Jul 2, 2018 at 7:05 PM Reuven Lax <[email protected]> wrote:
>>
>>> The common use case for a timer is to read in data that was stored using
>>> the state API in processElement. There is no guarantee that is stable, and
>>> I believe no runner currently guarantees this. For example:
>>>
>>> class MyDoFn extends DoFn<ElementT, Void> {
>>>   @StateId("bag") private final StateSpec<BagState<ElementT>> buffer =
>>> StateSpec.bag(ElementCoder.of());
>>>   @TimerId("timer") private final TimerSpec =
>>> TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
>>>
>>>   @ProcessElement public void processElement(@Element ElementT
>>> element, @StateId("bag") BagState<ElementT> bag, @TimerId("timer") Timer
>>> timer) {
>>>       bag.add(element);
>>>
>>> timer.align(Duration.standardSeconds(30)).offset(Duration.standardSeconds(3)).setRelative();
>>>   }
>>>
>>>   @OnTimer("timer") public void onTimer(@StateId("bag")
>>> BagState<ElementT> bag) {
>>>     sendToExternalSystem(bag.read());
>>>   }
>>> }
>>>
>>> If you tagged onTimer with @RequiresStableInput, then you could
>>> guarantee that if the timer retried then it would read the same elements
>>> out of the bag. Today this is not guaranteed - the data written to the bag
>>> might not even be persisted yet when the timer fires (for example, both the
>>> processElement and the onTimer might be executed by the runner in the same
>>> bundle).
>>>
>>> This particular example is a simplistic one of course - you could
>>> accomplish the same thing with triggers. When Raghu worked on the
>>> exactly-once Kafka sink this was very problematic. The final solution used
>>> some specific details of Kafka to work, and is complicated and not portable
>>> to other sinks.
>>>
>>> BTW - you can of course just have OnTimer produce the output to another
>>> transform marked with RequiresStableInput. However this solution is very
>>> expensive - every element must be persisted to stable storage multiple
>>> times - and we tried hard to avoid doing this in the Kafka sink.
>>>
>>> Reuven
>>>
>>> On Mon, Jul 2, 2018 at 6:24 PM Robert Bradshaw <[email protected]>
>>> wrote:
>>>
>>>> Could you give an example of such a usecase? (I suppose I'm not quite
>>>> following what it means for a timer to be unstable...)
>>>>
>>>> On Mon, Jul 2, 2018 at 6:20 PM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> One issue: we definitely have some strong use cases where we want this
>>>>> on ProcessTimer but not on ProcessElement. Since both are on the same 
>>>>> DoFn,
>>>>> I'm not sure how you would represent this as a separate transform.
>>>>>
>>>>> On Mon, Jul 2, 2018 at 5:05 PM Robert Bradshaw <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the writeup.
>>>>>>
>>>>>> I'm wondering with, rather than phrasing this as an annotation on
>>>>>> DoFn methods that gets plumbed down through the portability 
>>>>>> representation,
>>>>>> if it would make more sense to introduce a new, primitive
>>>>>> "EnsureStableInput" transform. For those runners whose reshuffle provide
>>>>>> stable inputs, they could use that as an implementation, and other 
>>>>>> runners
>>>>>> could provide other suitable implementations.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu <[email protected]> wrote:
>>>>>>
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> Thanks for your feedback on the doc. I have revamped it according to
>>>>>>> all of the comments. The major changes I have made are:
>>>>>>> * The problem description should be more general and accurate now.
>>>>>>> * I added more background information, such as details about
>>>>>>> Reshuffle, so I should be easier to understand now.
>>>>>>> * I made it clear what is the scope of my current project and what
>>>>>>> could be left to future work.
>>>>>>> * It now reflects the current progress of my work, and discusses how
>>>>>>> it should work with the portable pipeline representation (WIP)
>>>>>>>
>>>>>>> Also, I forgot to mention last time that this doc may be interesting
>>>>>>> to those of you interested in Reshuffle, because Reshuffle is used as a
>>>>>>> current workaround for the problem described in the doc.
>>>>>>>
>>>>>>> More comments are always welcome.
>>>>>>>
>>>>>>> Best,
>>>>>>> Robin
>>>>>>>
>>>>>>> On Fri, Jun 15, 2018 at 7:34 AM Kenneth Knowles <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks for the write up. It is great to see someone pushing this
>>>>>>>> through.
>>>>>>>>
>>>>>>>> I wanted to bring Luke's high-level question back to the list for
>>>>>>>> visibility: what about portability and other SDKs?
>>>>>>>>
>>>>>>>> Portability is probably trivial, but the "other SDKs" question is
>>>>>>>> probably best answered by folks working on them who can have opinions 
>>>>>>>> about
>>>>>>>> how it works in their SDKs idioms.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>> ​
>>>>>>>>
>>>>>>>

Reply via email to