Hi Rakesh, Glad to see you pointer this problem out! +1 for add this implementation. Manage State by write-through-cache is pretty important for Streaming job!
Best, Jincheng Thomas Weise <t...@apache.org> 于2019年7月29日周一 下午8:54写道: > FYI a basic test appears to confirm the importance of the cross-bundle > caching: I found that the throughput can be increased by playing with the > bundle size in the Flink runner. Default caps at 1000 elements (or 1 > second). So on a high throughput stream the bundles would be capped by the > count limit. Bumping the count limit increases the throughput by reducing > the chatter over the state plane (more cache hits due to larger bundle). > > The next level of investigation would involve profiling. But just by > looking at metrics, the CPU utilization on the Python worker side dropped > significantly while on the Flink side it remains nearly same. There are no > metrics for state operations on either side, I think it would be very > helpful to get these in place also. > > Below the stateful processing code for reference. > > Thomas > > > class StatefulFn(beam.DoFn): > count_state_spec = userstate.CombiningValueStateSpec( > 'count', beam.coders.IterableCoder(beam.coders.VarIntCoder()), sum) > timer_spec = userstate.TimerSpec('timer', > userstate.TimeDomain.WATERMARK) > > def process(self, kv, count=beam.DoFn.StateParam(count_state_spec), > timer=beam.DoFn.TimerParam(timer_spec), window=beam.DoFn.WindowParam): > count.add(1) > timer_seconds = (window.end.micros // 1000000) - 1 > timer.set(timer_seconds) > > @userstate.on_timer(timer_spec) > def process_timer(self, count=beam.DoFn.StateParam(count_state_spec), > window=beam.DoFn.WindowParam): > if count.read() == 0: > logging.warning("###timer fired with count %d, window %s" % > (count.read(), window)) > > > > On Thu, Jul 25, 2019 at 5:09 AM Robert Bradshaw <rober...@google.com> > wrote: > >> On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar <rakeshku...@lyft.com> >> wrote: >> > >> > Thanks Robert, >> > >> > I stumble on the jira that you have created some time ago >> > https://jira.apache.org/jira/browse/BEAM-5428 >> > >> > You also marked code where code changes are required: >> > >> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291 >> > >> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349 >> > >> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465 >> > >> > I am willing to provide help to implement this. Let me know how I can >> help. >> >> As far as I'm aware, no one is actively working on it right now. >> Please feel free to assign yourself the JIRA entry and I'll be happy >> to answer any questions you might have if (well probably when) these >> pointers are insufficient. >> >> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw <rober...@google.com> >> wrote: >> >> >> >> This is documented at >> >> >> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m >> >> . Note that it requires participation of both the runner and the SDK >> >> (though there are no correctness issues if one or the other side does >> >> not understand the protocol, caching just won't be used). >> >> >> >> I don't think it's been implemented anywhere, but could be very >> >> beneficial for performance. >> >> >> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar <rakeshku...@lyft.com> >> wrote: >> >> > >> >> > I checked the python sdk[1] and it has similar implementation as >> Java SDK. >> >> > >> >> > I would agree with Thomas. In case of high volume event stream and >> bigger cluster size, network call can potentially cause a bottleneck. >> >> > >> >> > @Robert >> >> > I am interested to see the proposal. Can you provide me the link of >> the proposal? >> >> > >> >> > [1]: >> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295 >> >> > >> >> > >> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise <t...@apache.org> wrote: >> >> >> >> >> >> Thanks for the pointer. For streaming, it will be important to >> support caching across bundles. It appears that even the Java SDK doesn't >> support that yet? >> >> >> >> >> >> >> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221 >> >> >> >> >> >> Regarding clear/append: It would be nice if both could occur within >> a single Fn Api roundtrip when the state is persisted. >> >> >> >> >> >> Thanks, >> >> >> Thomas >> >> >> >> >> >> >> >> >> >> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik <lc...@google.com> >> wrote: >> >> >>> >> >> >>> User state is built on top of read, append and clear and not off a >> read and write paradigm to allow for blind appends. >> >> >>> >> >> >>> The optimization you speak of can be done completely inside the >> SDK without any additional protocol being required as long as you clear the >> state first and then append all your new data. The Beam Java SDK does this >> for all runners when executed portably[1]. You could port the same logic to >> the Beam Python SDK as well. >> >> >>> >> >> >>> 1: >> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84 >> >> >>> >> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw < >> rober...@google.com> wrote: >> >> >>>> >> >> >>>> Python workers also have a per-bundle SDK-side cache. A protocol >> has >> >> >>>> been proposed, but hasn't yet been implemented in any SDKs or >> runners. >> >> >>>> >> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax <re...@google.com> >> wrote: >> >> >>>> > >> >> >>>> > It's runner dependent. Some runners (e.g. the Dataflow runner) >> do have such a cache, though I think it's currently has a cap for large >> bags. >> >> >>>> > >> >> >>>> > Reuven >> >> >>>> > >> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar < >> rakeshku...@lyft.com> wrote: >> >> >>>> >> >> >> >>>> >> Hi, >> >> >>>> >> >> >> >>>> >> I have been using python sdk for the application and also >> using BagState in production. I was wondering whether state logic has any >> write-through-cache implemented or not. If we are sending every read and >> write request through network then it comes with a performance cost. We can >> avoid network call for a read operation if we have write-through-cache. >> >> >>>> >> I have superficially looked into the implementation and I >> didn't see any cache implementation. >> >> >>>> >> >> >> >>>> >> is it possible to have this cache? would it cause any issue if >> we have the caching layer? >> >> >>>> >> >> >