I believe the intent is to add a new state API call telling the runner that it is blocked waiting for a response (BEAM-7000).
This should allow the runner to wait till it sees one of these I'm blocked requests and then merge + batch any state calls it may have at that point in time allowing it to convert clear + appends into set calls and do any other optimizations as well. By default, the runner would have a time and space based limit on how many outstanding state calls there are before choosing to resolve them. On Mon, Aug 5, 2019 at 5:43 PM Lukasz Cwik <lc...@google.com> wrote: > Now I see what you mean. > > On Mon, Aug 5, 2019 at 5:42 PM Thomas Weise <t...@apache.org> wrote: > >> Hi Luke, >> >> I guess the answer is that it depends on the state backend. If a set >> operation in the state backend is available that is more efficient than >> clear+append, then it would be beneficial to have a dedicated fn api >> operation to allow for such optimization. That's something that needs to be >> determined with a profiler :) >> >> But the low hanging fruit is cross-bundle caching. >> >> Thomas >> >> On Mon, Aug 5, 2019 at 2:06 PM Lukasz Cwik <lc...@google.com> wrote: >> >>> Thomas, why do you think a single round trip is needed? >>> >>> clear + append can be done blindly from the SDK side and it has total >>> knowledge of the state at that point in time till the end of the bundle at >>> which point you want to wait to get the cache token back from the runner >>> for the append call so that for the next bundle you can reuse the state if >>> the key wasn't processed elsewhere. >>> >>> Also, all state calls are "streamed" over gRPC so you don't need to wait >>> for clear to complete before being able to send append. >>> >>> On Tue, Jul 30, 2019 at 12:58 AM jincheng sun <sunjincheng...@gmail.com> >>> wrote: >>> >>>> Hi Rakesh, >>>> >>>> Glad to see you pointer this problem out! >>>> +1 for add this implementation. Manage State by write-through-cache is >>>> pretty important for Streaming job! >>>> >>>> Best, Jincheng >>>> >>>> Thomas Weise <t...@apache.org> 于2019年7月29日周一 下午8:54写道: >>>> >>>>> FYI a basic test appears to confirm the importance of the cross-bundle >>>>> caching: I found that the throughput can be increased by playing with the >>>>> bundle size in the Flink runner. Default caps at 1000 elements (or 1 >>>>> second). So on a high throughput stream the bundles would be capped by the >>>>> count limit. Bumping the count limit increases the throughput by reducing >>>>> the chatter over the state plane (more cache hits due to larger bundle). >>>>> >>>>> The next level of investigation would involve profiling. But just by >>>>> looking at metrics, the CPU utilization on the Python worker side dropped >>>>> significantly while on the Flink side it remains nearly same. There are no >>>>> metrics for state operations on either side, I think it would be very >>>>> helpful to get these in place also. >>>>> >>>>> Below the stateful processing code for reference. >>>>> >>>>> Thomas >>>>> >>>>> >>>>> class StatefulFn(beam.DoFn): >>>>> count_state_spec = userstate.CombiningValueStateSpec( >>>>> 'count', beam.coders.IterableCoder(beam.coders.VarIntCoder()), >>>>> sum) >>>>> timer_spec = userstate.TimerSpec('timer', >>>>> userstate.TimeDomain.WATERMARK) >>>>> >>>>> def process(self, kv, >>>>> count=beam.DoFn.StateParam(count_state_spec), >>>>> timer=beam.DoFn.TimerParam(timer_spec), window=beam.DoFn.WindowParam): >>>>> count.add(1) >>>>> timer_seconds = (window.end.micros // 1000000) - 1 >>>>> timer.set(timer_seconds) >>>>> >>>>> @userstate.on_timer(timer_spec) >>>>> def process_timer(self, >>>>> count=beam.DoFn.StateParam(count_state_spec), >>>>> window=beam.DoFn.WindowParam): >>>>> if count.read() == 0: >>>>> logging.warning("###timer fired with count %d, window %s" >>>>> % (count.read(), window)) >>>>> >>>>> >>>>> >>>>> On Thu, Jul 25, 2019 at 5:09 AM Robert Bradshaw <rober...@google.com> >>>>> wrote: >>>>> >>>>>> On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar <rakeshku...@lyft.com> >>>>>> wrote: >>>>>> > >>>>>> > Thanks Robert, >>>>>> > >>>>>> > I stumble on the jira that you have created some time ago >>>>>> > https://jira.apache.org/jira/browse/BEAM-5428 >>>>>> > >>>>>> > You also marked code where code changes are required: >>>>>> > >>>>>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291 >>>>>> > >>>>>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349 >>>>>> > >>>>>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465 >>>>>> > >>>>>> > I am willing to provide help to implement this. Let me know how I >>>>>> can help. >>>>>> >>>>>> As far as I'm aware, no one is actively working on it right now. >>>>>> Please feel free to assign yourself the JIRA entry and I'll be happy >>>>>> to answer any questions you might have if (well probably when) these >>>>>> pointers are insufficient. >>>>>> >>>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw < >>>>>> rober...@google.com> wrote: >>>>>> >> >>>>>> >> This is documented at >>>>>> >> >>>>>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m >>>>>> >> . Note that it requires participation of both the runner and the >>>>>> SDK >>>>>> >> (though there are no correctness issues if one or the other side >>>>>> does >>>>>> >> not understand the protocol, caching just won't be used). >>>>>> >> >>>>>> >> I don't think it's been implemented anywhere, but could be very >>>>>> >> beneficial for performance. >>>>>> >> >>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar <rakeshku...@lyft.com> >>>>>> wrote: >>>>>> >> > >>>>>> >> > I checked the python sdk[1] and it has similar implementation as >>>>>> Java SDK. >>>>>> >> > >>>>>> >> > I would agree with Thomas. In case of high volume event stream >>>>>> and bigger cluster size, network call can potentially cause a bottleneck. >>>>>> >> > >>>>>> >> > @Robert >>>>>> >> > I am interested to see the proposal. Can you provide me the link >>>>>> of the proposal? >>>>>> >> > >>>>>> >> > [1]: >>>>>> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295 >>>>>> >> > >>>>>> >> > >>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise <t...@apache.org> >>>>>> wrote: >>>>>> >> >> >>>>>> >> >> Thanks for the pointer. For streaming, it will be important to >>>>>> support caching across bundles. It appears that even the Java SDK doesn't >>>>>> support that yet? >>>>>> >> >> >>>>>> >> >> >>>>>> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221 >>>>>> >> >> >>>>>> >> >> Regarding clear/append: It would be nice if both could occur >>>>>> within a single Fn Api roundtrip when the state is persisted. >>>>>> >> >> >>>>>> >> >> Thanks, >>>>>> >> >> Thomas >>>>>> >> >> >>>>>> >> >> >>>>>> >> >> >>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik <lc...@google.com> >>>>>> wrote: >>>>>> >> >>> >>>>>> >> >>> User state is built on top of read, append and clear and not >>>>>> off a read and write paradigm to allow for blind appends. >>>>>> >> >>> >>>>>> >> >>> The optimization you speak of can be done completely inside >>>>>> the SDK without any additional protocol being required as long as you >>>>>> clear >>>>>> the state first and then append all your new data. The Beam Java SDK does >>>>>> this for all runners when executed portably[1]. You could port the same >>>>>> logic to the Beam Python SDK as well. >>>>>> >> >>> >>>>>> >> >>> 1: >>>>>> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84 >>>>>> >> >>> >>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw < >>>>>> rober...@google.com> wrote: >>>>>> >> >>>> >>>>>> >> >>>> Python workers also have a per-bundle SDK-side cache. A >>>>>> protocol has >>>>>> >> >>>> been proposed, but hasn't yet been implemented in any SDKs or >>>>>> runners. >>>>>> >> >>>> >>>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax <re...@google.com> >>>>>> wrote: >>>>>> >> >>>> > >>>>>> >> >>>> > It's runner dependent. Some runners (e.g. the Dataflow >>>>>> runner) do have such a cache, though I think it's currently has a cap for >>>>>> large bags. >>>>>> >> >>>> > >>>>>> >> >>>> > Reuven >>>>>> >> >>>> > >>>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar < >>>>>> rakeshku...@lyft.com> wrote: >>>>>> >> >>>> >> >>>>>> >> >>>> >> Hi, >>>>>> >> >>>> >> >>>>>> >> >>>> >> I have been using python sdk for the application and also >>>>>> using BagState in production. I was wondering whether state logic has any >>>>>> write-through-cache implemented or not. If we are sending every read and >>>>>> write request through network then it comes with a performance cost. We >>>>>> can >>>>>> avoid network call for a read operation if we have write-through-cache. >>>>>> >> >>>> >> I have superficially looked into the implementation and I >>>>>> didn't see any cache implementation. >>>>>> >> >>>> >> >>>>>> >> >>>> >> is it possible to have this cache? would it cause any >>>>>> issue if we have the caching layer? >>>>>> >> >>>> >> >>>>>> >>>>>