Reza - you're definitely not derailing, that's exactly what I was looking for!
I've actually recently encountered an additional use case where I'd like to use ValueState in the Python SDK. I'm experimenting with an ArrowBatchingDoFn that uses state and timers to batch up python dictionaries into arrow record batches (actually my entire purpose for jumping down this python state rabbit hole). At first blush it seems like the best way to do this would be to just replicate the batching approach in the timely processing post [1], but when the bag is full combine the elements into an arrow record batch, rather than enriching all of the elements and writing them out separately. However, if possible I'd like to pre-allocate buffers for each column and populate them as elements arrive (at least for columns with a fixed size type), so a bag state wouldn't be ideal. Also, a CombiningValueState is not ideal because I'd need to implement a merge_accumulators function that combines several in-progress batches. I could certainly implement that, but I'd prefer that it never be called unless absolutely necessary, which doesn't seem to be the case for CombiningValueState. (As an aside, maybe there's some room there for a middle ground between ValueState and CombiningValueState I suppose you could argue that this is a pretty low-level optimization we should be able to shield our users from, but right now I just wish I had ValueState in python so I didn't have to hack it up with a BagState :) Anyway, in light of this and all the other use-cases mentioned here, I think the resolution is to just implement ValueState in python, and document the danger with ValueState in both Python and Java. Just to be clear, the danger I'm referring to is that users might easily forget that data can be out of order, and use ValueState in a way that assumes it's been populated with data from the most recent element in event time, then in practice out of order data clobbers their state. I'm happy to write up a PR for this - are there any objections to that? [1] https://beam.apache.org/blog/2017/08/28/timely-processing.html On Mon, Apr 29, 2019 at 12:23 AM Robert Bradshaw <rober...@google.com> wrote: > On Mon, Apr 29, 2019 at 3:43 AM Reza Rokni <r...@google.com> wrote: > > > > @Robert Bradshaw Some examples, mostly built out from cases around > Timeseries data, don't want to derail this thread so at a hi level : > > Thanks. Perfectly on-topic for the thread. > > > Looping timers, a timer which allows for creation of a value within a > window when no external input has been seen. Requires metadata like "is > timer set". > > > > BiTemporalStream join, where we need to match leftCol.timestamp to a > value == (max(rightCol.timestamp) where rightCol.timestamp <= > leftCol.timestamp)) , this if for a application matching trades to quotes. > > I'd be interested in seeing the code here. The fact that you have a > max here makes me wonder if combining would be applicable. > > (FWIW, I've long thought it would be useful to do this kind of thing > with Windows. Basically, it'd be like session windows with one side > being the window from the timestamp forward into the future, and the > other side being from the timestamp back a certain amount in the past. > This seems a common join pattern.) > > > Metadata is used for > > > > Taking the Key from the KV for use within the OnTimer call. > > Knowing where we are in watermarks for GC of objects in state. > > More timer metadata (min timer ..) > > > > It could be argued that what we are using state for mostly workarounds > for things that could eventually end up in the API itself. For example > > > > There is a Jira for OnTimer Context to have Key. > > The GC needs are mostly due to not having a Map State object in all > runners yet. > > Yeah. GC could probably be done with a max combine. The Key (which > should be in the API) could be an AnyCombine for now (safe to > overwrite because it's always the same). > > > However I think as folks explore Beam there will always be little things > that require Metadata and so having access to something which gives us fine > grain control ( as Kenneth mentioned) is useful. > > Likely. I guess in line with making easy things easy, I'd like to make > dangerous things hard(er). As Kenn says, we'll probably need some kind > of lower-level thing, especially if we introduce OnMerge. > > > Cheers > > > > Reza > > > > On Sat, 27 Apr 2019 at 02:59, Kenneth Knowles <k...@apache.org> wrote: > >> > >> To be clear, the intent was always that ValueState would be not usable > in merging pipelines. So no danger of clobbering, but also limited > functionality. Is there a runner than accepts it and clobbers? The whole > idea of the new DoFn is that it is easy to do the construction-time > analysis and reject the invalid pipeline. It is actually runner independent > and I think already implemented in ParDo's validation, no? > >> > >> Kenn > >> > >> On Fri, Apr 26, 2019 at 10:14 AM Lukasz Cwik <lc...@google.com> wrote: > >>> > >>> I am in the camp where we should only support merging state (either > naturally via things like bags or via combiners). I believe that having the > wrapper that Brian suggests is useful for users. As for the @OnMerge > method, I believe combiners should have the ability to look at the window > information and we should treat @OnMerge as syntactic sugar over a combiner > if the combiner API is too cumbersome. > >>> > >>> I believe using combiners can also extend to side inputs and help us > deal with singleton and map like side inputs when multiple firings occur. I > also like treating everything like a combiner because it will give us a lot > reuse of combiner implementations across all the places they could be used > and will be especially useful when we start exposing APIs related to > retractions on combiners. > >>> > >>> On Fri, Apr 26, 2019 at 9:43 AM Brian Hulette <bhule...@google.com> > wrote: > >>>> > >>>> Yeah the danger with out of order processing concerns me more than > the merging as well. As a new Beam user, I immediately gravitated towards > ValueState since it was easy to think about and I just assumed there wasn't > anything to be concerned about. So it was shocking to learn that there is > this dangerous edge-case. > >>>> > >>>> What if ValueState were just implemented as a wrapper of > CombiningState with a LatestCombineFn and documented as such (and perhaps > we encourage users to consider using a CombiningState explicitly if at all > possible)? > >>>> > >>>> Brian > >>>> > >>>> > >>>> > >>>> On Fri, Apr 26, 2019 at 2:29 AM Robert Bradshaw <rober...@google.com> > wrote: > >>>>> > >>>>> On Fri, Apr 26, 2019 at 6:40 AM Kenneth Knowles <k...@apache.org> > wrote: > >>>>> > > >>>>> > You could use a CombiningState with a CombineFn that returns the > minimum for this case. > >>>>> > >>>>> We've also wanted to be able to set data when setting a timer that > >>>>> would be returned when the timer fires. (It's in the FnAPI, but not > >>>>> the SDKs yet.) > >>>>> > >>>>> The metadata is an interesting usecase, do you have some more > specific > >>>>> examples? Might boil down to not having a rich enough (single) state > >>>>> type. > >>>>> > >>>>> > But I've come to feel there is a mismatch. On the one hand, > ParDo(<stateful DoFn>) is a way to drop to a lower level and write logic > that does not fit a more general computational pattern, really taking fine > control. On the other hand, automatically merging state via CombiningState > or BagState is more of a no-knobs higher level of programming. To me there > seems to be a bit of a philosophical conflict. > >>>>> > > >>>>> > These days, I feel like an @OnMerge method would be more natural. > If you are using state and timers, you probably often want more direct > control over how state from windows gets merged. An of course we don't even > have a design for timers - you would need some kind of timestamp CombineFn > but I think setting/unsetting timers manually makes more sense. Especially > considering the trickiness around merging windows in the absence of > retractions, you really need this callback, so you can issue retractions > manually for any output your stateful DoFn emitted in windows that no > longer exist. > >>>>> > >>>>> I agree we'll probably need an @OnMerge. On the other hand, I like > >>>>> being able to have good defaults. The high/low level thing is a > >>>>> continuum (the indexing example falling towards the high end). > >>>>> > >>>>> Actually, the merging questions bother me less than how easy it is to > >>>>> accidentally clobber previous values. It looks so easy (like the > >>>>> easiest state to use) but is actually the most dangerous. If one > wants > >>>>> this behavior, I would rather an explicit AnyCombineFn or > >>>>> LatestCombineFn which makes you think about the semantics. > >>>>> > >>>>> - Robert > >>>>> > >>>>> > On Thu, Apr 25, 2019 at 5:49 PM Reza Rokni <r...@google.com> wrote: > >>>>> >> > >>>>> >> +1 on the metadata use case. > >>>>> >> > >>>>> >> For performance reasons the Timer API does not support a read() > operation, which for the vast majority of use cases is not a required > feature. In the small set of use cases where it is needed, for example when > you need to set a Timer in EventTime based on the smallest timestamp seen > in the elements within a DoFn, we can make use of a ValueState object to > keep track of the value. > >>>>> >> > >>>>> >> On Fri, 26 Apr 2019 at 00:38, Reuven Lax <re...@google.com> > wrote: > >>>>> >>> > >>>>> >>> I see examples of people using ValueState that I think are not > captured CombiningState. For example, one common one is users who set a > timer and then record the timestamp of that timer in a ValueState. In > general when you store state that is metadata about other state you store, > then ValueState will usually make more sense than CombiningState. > >>>>> >>> > >>>>> >>> On Thu, Apr 25, 2019 at 9:32 AM Brian Hulette < > bhule...@google.com> wrote: > >>>>> >>>> > >>>>> >>>> Currently the Python SDK does not make ValueState available to > users. My initial inclination was to go ahead and implement it there to be > consistent with Java, but Robert brings up a great point here that > ValueState has an inherent race condition for out of order data, and a lot > of it's use cases can actually be implemented with a CombiningState instead. > >>>>> >>>> > >>>>> >>>> It seems to me that at the very least we should discourage the > use of ValueState by noting the danger in the documentation and preferring > CombiningState in examples, and perhaps we should go further and deprecate > it in Java and not implement it in python. Either way I think we should be > consistent between Java and Python. > >>>>> >>>> > >>>>> >>>> I'm curious what people think about this, are there use cases > that we really need to keep ValueState around for? > >>>>> >>>> > >>>>> >>>> Brian > >>>>> >>>> > >>>>> >>>> ---------- Forwarded message --------- > >>>>> >>>> From: Robert Bradshaw <rober...@google.com> > >>>>> >>>> Date: Thu, Apr 25, 2019, 08:31 > >>>>> >>>> Subject: Re: [docs] Python State & Timers > >>>>> >>>> To: dev <dev@beam.apache.org> > >>>>> >>>> > >>>>> >>>> > >>>>> >>>> > >>>>> >>>> > >>>>> >>>> On Thu, Apr 25, 2019, 5:26 PM Maximilian Michels < > m...@apache.org> wrote: > >>>>> >>>>> > >>>>> >>>>> Completely agree that CombiningState is nicer in this example. > Users may > >>>>> >>>>> still want to use ValueState when there is nothing to combine. > >>>>> >>>> > >>>>> >>>> > >>>>> >>>> I've always had trouble coming up with any good examples of > this. > >>>>> >>>> > >>>>> >>>>> Also, > >>>>> >>>>> users already know ValueState from the Java SDK. > >>>>> >>>> > >>>>> >>>> > >>>>> >>>> Maybe we should deprecate that :) > >>>>> >>>> > >>>>> >>>> > >>>>> >>>>> On 25.04.19 17:12, Robert Bradshaw wrote: > >>>>> >>>>> > On Thu, Apr 25, 2019 at 4:58 PM Maximilian Michels < > m...@apache.org> wrote: > >>>>> >>>>> >> > >>>>> >>>>> >> I forgot to give an example, just to clarify for others: > >>>>> >>>>> >> > >>>>> >>>>> >>> What was the specific example that was less natural? > >>>>> >>>>> >> > >>>>> >>>>> >> Basically every time we use ListState to express > ValueState, e.g. > >>>>> >>>>> >> > >>>>> >>>>> >> next_index, = list(state.read()) or [0] > >>>>> >>>>> >> > >>>>> >>>>> >> Taken from: > >>>>> >>>>> >> > https://github.com/apache/beam/pull/8363/files#diff-ba1a2aed98079ccce869cd660ca9d97dR301 > >>>>> >>>>> > > >>>>> >>>>> > Yes, ListState is much less natural here. I think generally > >>>>> >>>>> > CombiningValue is often a better replacement. E.g. the Java > example > >>>>> >>>>> > reads > >>>>> >>>>> > > >>>>> >>>>> > > >>>>> >>>>> > public void processElement( > >>>>> >>>>> > ProcessContext context, @StateId("index") > ValueState<Integer> index) { > >>>>> >>>>> > int current = firstNonNull(index.read(), 0); > >>>>> >>>>> > context.output(KV.of(current, context.element())); > >>>>> >>>>> > index.write(current+1); > >>>>> >>>>> > } > >>>>> >>>>> > > >>>>> >>>>> > > >>>>> >>>>> > which is replaced with bag state > >>>>> >>>>> > > >>>>> >>>>> > > >>>>> >>>>> > def process(self, element, > state=DoFn.StateParam(INDEX_STATE)): > >>>>> >>>>> > next_index, = list(state.read()) or [0] > >>>>> >>>>> > yield (element, next_index) > >>>>> >>>>> > state.clear() > >>>>> >>>>> > state.add(next_index + 1) > >>>>> >>>>> > > >>>>> >>>>> > > >>>>> >>>>> > whereas CombiningState would be more natural (than > ListState, and > >>>>> >>>>> > arguably than even ValueState), giving > >>>>> >>>>> > > >>>>> >>>>> > > >>>>> >>>>> > def process(self, element, > index=DoFn.StateParam(INDEX_STATE)): > >>>>> >>>>> > yield element, index.read() > >>>>> >>>>> > index.add(1) > >>>>> >>>>> > > >>>>> >>>>> > > >>>>> >>>>> > > >>>>> >>>>> > > >>>>> >>>>> >> > >>>>> >>>>> >> -Max > >>>>> >>>>> >> > >>>>> >>>>> >> On 25.04.19 16:40, Robert Bradshaw wrote: > >>>>> >>>>> >>> https://github.com/apache/beam/pull/8402 > >>>>> >>>>> >>> > >>>>> >>>>> >>> On Thu, Apr 25, 2019 at 4:26 PM Robert Bradshaw < > rober...@google.com> wrote: > >>>>> >>>>> >>>> > >>>>> >>>>> >>>> Oh, this is for the indexing example. > >>>>> >>>>> >>>> > >>>>> >>>>> >>>> I actually think using CombiningState is more cleaner > than ValueState. > >>>>> >>>>> >>>> > >>>>> >>>>> >>>> > https://github.com/apache/beam/blob/release-2.12.0/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py#L262 > >>>>> >>>>> >>>> > >>>>> >>>>> >>>> (The fact that one must specify the accumulator coder is, > however, > >>>>> >>>>> >>>> unfortunate. We should probably infer that if we can.) > >>>>> >>>>> >>>> > >>>>> >>>>> >>>> On Thu, Apr 25, 2019 at 4:19 PM Robert Bradshaw < > rober...@google.com> wrote: > >>>>> >>>>> >>>>> > >>>>> >>>>> >>>>> The desire was to avoid the implicit disallowed > combination wart in > >>>>> >>>>> >>>>> Python (until we could make sense of it), and also > ValueState could be > >>>>> >>>>> >>>>> surprising with respect to older values overwriting > newer ones. What > >>>>> >>>>> >>>>> was the specific example that was less natural? > >>>>> >>>>> >>>>> > >>>>> >>>>> >>>>> On Thu, Apr 25, 2019 at 3:01 PM Maximilian Michels < > m...@apache.org> wrote: > >>>>> >>>>> >>>>>> > >>>>> >>>>> >>>>>> @Pablo: Thanks for following up with the PR! :) > >>>>> >>>>> >>>>>> > >>>>> >>>>> >>>>>> @Brian: I was wondering about this as well. It makes > the Python state > >>>>> >>>>> >>>>>> code a bit unnatural. I'd suggest to add a ValueState > wrapper around > >>>>> >>>>> >>>>>> ListState/CombiningState. > >>>>> >>>>> >>>>>> > >>>>> >>>>> >>>>>> @Robert: Like Reuven pointed out, we can disallow > ValueState for merging > >>>>> >>>>> >>>>>> windows with state. > >>>>> >>>>> >>>>>> > >>>>> >>>>> >>>>>> @Reza: Great. Let's make sure it has Python examples > out of the box. > >>>>> >>>>> >>>>>> Either Pablo or me could help there. > >>>>> >>>>> >>>>>> > >>>>> >>>>> >>>>>> Thanks, > >>>>> >>>>> >>>>>> Max > >>>>> >>>>> >>>>>> > >>>>> >>>>> >>>>>> On 25.04.19 04:14, Reza Ardeshir Rokni wrote: > >>>>> >>>>> >>>>>>> Pablo, Kenneth and I have a new blog ready for > publication which covers > >>>>> >>>>> >>>>>>> how to create a "looping timer" it allows for default > values to be > >>>>> >>>>> >>>>>>> created in a window when no incoming elements exists. > We just need to > >>>>> >>>>> >>>>>>> clear a few bits before publication, but would be > great to have that > >>>>> >>>>> >>>>>>> also include a python example, I wrote it in java... > >>>>> >>>>> >>>>>>> > >>>>> >>>>> >>>>>>> Cheers > >>>>> >>>>> >>>>>>> > >>>>> >>>>> >>>>>>> Reza > >>>>> >>>>> >>>>>>> > >>>>> >>>>> >>>>>>> On Thu, 25 Apr 2019 at 04:34, Reuven Lax < > re...@google.com > >>>>> >>>>> >>>>>>> <mailto:re...@google.com>> wrote: > >>>>> >>>>> >>>>>>> > >>>>> >>>>> >>>>>>> Well state is still not implemented for merging > windows even for > >>>>> >>>>> >>>>>>> Java (though I believe the idea was to disallow > ValueState there). > >>>>> >>>>> >>>>>>> > >>>>> >>>>> >>>>>>> On Wed, Apr 24, 2019 at 1:11 PM Robert Bradshaw < > rober...@google.com > >>>>> >>>>> >>>>>>> <mailto:rober...@google.com>> wrote: > >>>>> >>>>> >>>>>>> > >>>>> >>>>> >>>>>>> It was unclear what the semantics were for > ValueState for merging > >>>>> >>>>> >>>>>>> windows. (It's also a bit weird as it's > inherently a race condition > >>>>> >>>>> >>>>>>> wrt element ordering, unlike Bag and > CombineState, though you can > >>>>> >>>>> >>>>>>> always implement it as a CombineState that > always returns the latest > >>>>> >>>>> >>>>>>> value which is a bit more explicit about the > dangers here.) > >>>>> >>>>> >>>>>>> > >>>>> >>>>> >>>>>>> On Wed, Apr 24, 2019 at 10:08 PM Brian > Hulette > >>>>> >>>>> >>>>>>> <bhule...@google.com <mailto: > bhule...@google.com>> wrote: > >>>>> >>>>> >>>>>>> > > >>>>> >>>>> >>>>>>> > That's a great idea! I thought about this > too after those > >>>>> >>>>> >>>>>>> posts came up on the list recently. I > started to look into it, > >>>>> >>>>> >>>>>>> but I noticed that there's actually no > implementation of > >>>>> >>>>> >>>>>>> ValueState in userstate. Is there a reason > for that? I started > >>>>> >>>>> >>>>>>> to work on a patch to add it but I was just > curious if there was > >>>>> >>>>> >>>>>>> some reason it was omitted that I should be > aware of. > >>>>> >>>>> >>>>>>> > > >>>>> >>>>> >>>>>>> > We could certainly replicate the example > without ValueState > >>>>> >>>>> >>>>>>> by using BagState and clearing it before > each write, but it > >>>>> >>>>> >>>>>>> would be nice if we could draw a direct > parallel. > >>>>> >>>>> >>>>>>> > > >>>>> >>>>> >>>>>>> > Brian > >>>>> >>>>> >>>>>>> > > >>>>> >>>>> >>>>>>> > On Fri, Apr 12, 2019 at 7:05 AM > Maximilian Michels > >>>>> >>>>> >>>>>>> <m...@apache.org <mailto:m...@apache.org>> > wrote: > >>>>> >>>>> >>>>>>> >> > >>>>> >>>>> >>>>>>> >> > It would probably be pretty easy to > add the corresponding > >>>>> >>>>> >>>>>>> code snippets to the docs as well. > >>>>> >>>>> >>>>>>> >> > >>>>> >>>>> >>>>>>> >> It's probably a bit more work because > there is no section > >>>>> >>>>> >>>>>>> dedicated to > >>>>> >>>>> >>>>>>> >> state/timer yet in the documentation. > Tracked here: > >>>>> >>>>> >>>>>>> >> > https://jira.apache.org/jira/browse/BEAM-2472 > >>>>> >>>>> >>>>>>> >> > >>>>> >>>>> >>>>>>> >> > I've been going over this topic a bit. > I'll add the > >>>>> >>>>> >>>>>>> snippets next week, if that's fine by y'all. > >>>>> >>>>> >>>>>>> >> > >>>>> >>>>> >>>>>>> >> That would be great. The blog posts are > a great way to get > >>>>> >>>>> >>>>>>> started with > >>>>> >>>>> >>>>>>> >> state/timers. > >>>>> >>>>> >>>>>>> >> > >>>>> >>>>> >>>>>>> >> Thanks, > >>>>> >>>>> >>>>>>> >> Max > >>>>> >>>>> >>>>>>> >> > >>>>> >>>>> >>>>>>> >> On 11.04.19 20:21, Pablo Estrada wrote: > >>>>> >>>>> >>>>>>> >> > I've been going over this topic a bit. > I'll add the > >>>>> >>>>> >>>>>>> snippets next week, > >>>>> >>>>> >>>>>>> >> > if that's fine by y'all. > >>>>> >>>>> >>>>>>> >> > Best > >>>>> >>>>> >>>>>>> >> > -P. > >>>>> >>>>> >>>>>>> >> > > >>>>> >>>>> >>>>>>> >> > On Thu, Apr 11, 2019 at 5:27 AM Robert > Bradshaw > >>>>> >>>>> >>>>>>> <rober...@google.com <mailto: > rober...@google.com> > >>>>> >>>>> >>>>>>> >> > <mailto:rober...@google.com <mailto: > rober...@google.com>>> > >>>>> >>>>> >>>>>>> wrote: > >>>>> >>>>> >>>>>>> >> > > >>>>> >>>>> >>>>>>> >> > That's a great idea! It would > probably be pretty easy > >>>>> >>>>> >>>>>>> to add the > >>>>> >>>>> >>>>>>> >> > corresponding code snippets to the > docs as well. > >>>>> >>>>> >>>>>>> >> > > >>>>> >>>>> >>>>>>> >> > On Thu, Apr 11, 2019 at 2:00 PM > Maximilian Michels > >>>>> >>>>> >>>>>>> <m...@apache.org <mailto:m...@apache.org> > >>>>> >>>>> >>>>>>> >> > <mailto:m...@apache.org <mailto: > m...@apache.org>>> wrote: > >>>>> >>>>> >>>>>>> >> > > > >>>>> >>>>> >>>>>>> >> > > Hi everyone, > >>>>> >>>>> >>>>>>> >> > > > >>>>> >>>>> >>>>>>> >> > > The Python SDK still lacks > documentation on state > >>>>> >>>>> >>>>>>> and timers. > >>>>> >>>>> >>>>>>> >> > > > >>>>> >>>>> >>>>>>> >> > > As a first step, what do you > think about updating > >>>>> >>>>> >>>>>>> these two blog > >>>>> >>>>> >>>>>>> >> > posts > >>>>> >>>>> >>>>>>> >> > > with the corresponding Python > code? > >>>>> >>>>> >>>>>>> >> > > > >>>>> >>>>> >>>>>>> >> > > > >>>>> >>>>> >>>>>>> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html > >>>>> >>>>> >>>>>>> >> > > > >>>>> >>>>> >>>>>>> > https://beam.apache.org/blog/2017/08/28/timely-processing.html > >>>>> >>>>> >>>>>>> >> > > > >>>>> >>>>> >>>>>>> >> > > Thanks, > >>>>> >>>>> >>>>>>> >> > > Max > >>>>> >>>>> >>>>>>> >> > > >>>>> >>>>> >>>>>>> > >>>>> >> > >>>>> >> > >>>>> >> > >>>>> >> -- > >>>>> >> > >>>>> >> This email may be confidential and privileged. If you received > this communication by mistake, please don't forward it to anyone else, > please erase all copies and attachments, and please let me know that it has > gone to the wrong person. > >>>>> >> > >>>>> >> The above terms reflect a potential business arrangement, are > provided solely as a basis for further discussion, and are not intended to > be and do not constitute a legally binding obligation. No legally binding > obligations will be created, implied, or inferred until an agreement in > final form is executed in writing by all parties involved. > > > > > > > > -- > > > > This email may be confidential and privileged. If you received this > communication by mistake, please don't forward it to anyone else, please > erase all copies and attachments, and please let me know that it has gone > to the wrong person. > > > > The above terms reflect a potential business arrangement, are provided > solely as a basis for further discussion, and are not intended to be and do > not constitute a legally binding obligation. No legally binding obligations > will be created, implied, or inferred until an agreement in final form is > executed in writing by all parties involved. >