I'm also thinking that it would be best to apply to the whole transform. So
side inputs, main inputs, timers and any future input constructs.

On Sat, Jul 7, 2018 at 2:00 PM Reuven Lax <re...@google.com> wrote:

> I think the entire transform. There might be some use case for having only
> some inputs stable, but I can't think of any offhand.
> BTW, it so happens that with DataflowRunner all side inputs happen to be
> stable (though that's more of a side effect of implementation).
> On Tue, Jul 3, 2018 at 9:46 AM Lukasz Cwik <lc...@google.com> wrote:
>> Does it make sense to only have some inputs be stable for a transform or
>> for the entire transform to require stable inputs?
>> On Tue, Jul 3, 2018 at 7:34 AM Kenneth Knowles <k...@google.com> wrote:
>>> Since we always assume ProcessElement could have arbitrary side effects
>>> (esp. randomization), the state and timers set up by a call to
>>> ProcessElement cannot be considered stable until they are persisted. It
>>> seems very similar to the cost of outputting to a downstream
>>> @RequiresStableInput transform, if not an identical implementation.
>>> The thing timers add is a way to loop which you can't do if it is an
>>> output.
>>> Adding @Pure annotations might help, if the input elements are stable
>>> and ProcessElement is pure.
>>> Kenn
>>> On Mon, Jul 2, 2018 at 7:05 PM Reuven Lax <re...@google.com> wrote:
>>>> The common use case for a timer is to read in data that was stored
>>>> using the state API in processElement. There is no guarantee that is
>>>> stable, and I believe no runner currently guarantees this. For example:
>>>> class MyDoFn extends DoFn<ElementT, Void> {
>>>>   @StateId("bag") private final StateSpec<BagState<ElementT>> buffer =
>>>> StateSpec.bag(ElementCoder.of());
>>>>   @TimerId("timer") private final TimerSpec =
>>>> TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
>>>>   @ProcessElement public void processElement(@Element ElementT
>>>> element, @StateId("bag") BagState<ElementT> bag, @TimerId("timer") Timer
>>>> timer) {
>>>>       bag.add(element);
>>>> timer.align(Duration.standardSeconds(30)).offset(Duration.standardSeconds(3)).setRelative();
>>>>   }
>>>>   @OnTimer("timer") public void onTimer(@StateId("bag")
>>>> BagState<ElementT> bag) {
>>>>     sendToExternalSystem(bag.read());
>>>>   }
>>>> }
>>>> If you tagged onTimer with @RequiresStableInput, then you could
>>>> guarantee that if the timer retried then it would read the same elements
>>>> out of the bag. Today this is not guaranteed - the data written to the bag
>>>> might not even be persisted yet when the timer fires (for example, both the
>>>> processElement and the onTimer might be executed by the runner in the same
>>>> bundle).
>>>> This particular example is a simplistic one of course - you could
>>>> accomplish the same thing with triggers. When Raghu worked on the
>>>> exactly-once Kafka sink this was very problematic. The final solution used
>>>> some specific details of Kafka to work, and is complicated and not portable
>>>> to other sinks.
>>>> BTW - you can of course just have OnTimer produce the output to another
>>>> transform marked with RequiresStableInput. However this solution is very
>>>> expensive - every element must be persisted to stable storage multiple
>>>> times - and we tried hard to avoid doing this in the Kafka sink.
>>>> Reuven
>>>> On Mon, Jul 2, 2018 at 6:24 PM Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>>> Could you give an example of such a usecase? (I suppose I'm not quite
>>>>> following what it means for a timer to be unstable...)
>>>>> On Mon, Jul 2, 2018 at 6:20 PM Reuven Lax <re...@google.com> wrote:
>>>>>> One issue: we definitely have some strong use cases where we want
>>>>>> this on ProcessTimer but not on ProcessElement. Since both are on the 
>>>>>> same
>>>>>> DoFn, I'm not sure how you would represent this as a separate transform.
>>>>>> On Mon, Jul 2, 2018 at 5:05 PM Robert Bradshaw <rober...@google.com>
>>>>>> wrote:
>>>>>>> Thanks for the writeup.
>>>>>>> I'm wondering with, rather than phrasing this as an annotation on
>>>>>>> DoFn methods that gets plumbed down through the portability 
>>>>>>> representation,
>>>>>>> if it would make more sense to introduce a new, primitive
>>>>>>> "EnsureStableInput" transform. For those runners whose reshuffle provide
>>>>>>> stable inputs, they could use that as an implementation, and other 
>>>>>>> runners
>>>>>>> could provide other suitable implementations.
>>>>>>> On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu <robi...@google.com> wrote:
>>>>>>>> Hi everyone,
>>>>>>>> Thanks for your feedback on the doc. I have revamped it according
>>>>>>>> to all of the comments. The major changes I have made are:
>>>>>>>> * The problem description should be more general and accurate now.
>>>>>>>> * I added more background information, such as details about
>>>>>>>> Reshuffle, so I should be easier to understand now.
>>>>>>>> * I made it clear what is the scope of my current project and what
>>>>>>>> could be left to future work.
>>>>>>>> * It now reflects the current progress of my work, and discusses
>>>>>>>> how it should work with the portable pipeline representation (WIP)
>>>>>>>> Also, I forgot to mention last time that this doc may be
>>>>>>>> interesting to those of you interested in Reshuffle, because Reshuffle 
>>>>>>>> is
>>>>>>>> used as a current workaround for the problem described in the doc.
>>>>>>>> More comments are always welcome.
>>>>>>>> Best,
>>>>>>>> Robin
>>>>>>>> On Fri, Jun 15, 2018 at 7:34 AM Kenneth Knowles <k...@google.com>
>>>>>>>> wrote:
>>>>>>>>> Thanks for the write up. It is great to see someone pushing this
>>>>>>>>> through.
>>>>>>>>> I wanted to bring Luke's high-level question back to the list for
>>>>>>>>> visibility: what about portability and other SDKs?
>>>>>>>>> Portability is probably trivial, but the "other SDKs" question is
>>>>>>>>> probably best answered by folks working on them who can have opinions 
>>>>>>>>> about
>>>>>>>>> how it works in their SDKs idioms.
>>>>>>>>> Kenn
>>>>>>>>> ​

Reply via email to