Now I see what you mean. On Mon, Aug 5, 2019 at 5:42 PM Thomas Weise <t...@apache.org> wrote:
> Hi Luke, > > I guess the answer is that it depends on the state backend. If a set > operation in the state backend is available that is more efficient than > clear+append, then it would be beneficial to have a dedicated fn api > operation to allow for such optimization. That's something that needs to be > determined with a profiler :) > > But the low hanging fruit is cross-bundle caching. > > Thomas > > On Mon, Aug 5, 2019 at 2:06 PM Lukasz Cwik <lc...@google.com> wrote: > >> Thomas, why do you think a single round trip is needed? >> >> clear + append can be done blindly from the SDK side and it has total >> knowledge of the state at that point in time till the end of the bundle at >> which point you want to wait to get the cache token back from the runner >> for the append call so that for the next bundle you can reuse the state if >> the key wasn't processed elsewhere. >> >> Also, all state calls are "streamed" over gRPC so you don't need to wait >> for clear to complete before being able to send append. >> >> On Tue, Jul 30, 2019 at 12:58 AM jincheng sun <sunjincheng...@gmail.com> >> wrote: >> >>> Hi Rakesh, >>> >>> Glad to see you pointer this problem out! >>> +1 for add this implementation. Manage State by write-through-cache is >>> pretty important for Streaming job! >>> >>> Best, Jincheng >>> >>> Thomas Weise <t...@apache.org> 于2019年7月29日周一 下午8:54写道: >>> >>>> FYI a basic test appears to confirm the importance of the cross-bundle >>>> caching: I found that the throughput can be increased by playing with the >>>> bundle size in the Flink runner. Default caps at 1000 elements (or 1 >>>> second). So on a high throughput stream the bundles would be capped by the >>>> count limit. Bumping the count limit increases the throughput by reducing >>>> the chatter over the state plane (more cache hits due to larger bundle). >>>> >>>> The next level of investigation would involve profiling. But just by >>>> looking at metrics, the CPU utilization on the Python worker side dropped >>>> significantly while on the Flink side it remains nearly same. There are no >>>> metrics for state operations on either side, I think it would be very >>>> helpful to get these in place also. >>>> >>>> Below the stateful processing code for reference. >>>> >>>> Thomas >>>> >>>> >>>> class StatefulFn(beam.DoFn): >>>> count_state_spec = userstate.CombiningValueStateSpec( >>>> 'count', beam.coders.IterableCoder(beam.coders.VarIntCoder()), >>>> sum) >>>> timer_spec = userstate.TimerSpec('timer', >>>> userstate.TimeDomain.WATERMARK) >>>> >>>> def process(self, kv, count=beam.DoFn.StateParam(count_state_spec), >>>> timer=beam.DoFn.TimerParam(timer_spec), window=beam.DoFn.WindowParam): >>>> count.add(1) >>>> timer_seconds = (window.end.micros // 1000000) - 1 >>>> timer.set(timer_seconds) >>>> >>>> @userstate.on_timer(timer_spec) >>>> def process_timer(self, >>>> count=beam.DoFn.StateParam(count_state_spec), >>>> window=beam.DoFn.WindowParam): >>>> if count.read() == 0: >>>> logging.warning("###timer fired with count %d, window %s" % >>>> (count.read(), window)) >>>> >>>> >>>> >>>> On Thu, Jul 25, 2019 at 5:09 AM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>> >>>>> On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar <rakeshku...@lyft.com> >>>>> wrote: >>>>> > >>>>> > Thanks Robert, >>>>> > >>>>> > I stumble on the jira that you have created some time ago >>>>> > https://jira.apache.org/jira/browse/BEAM-5428 >>>>> > >>>>> > You also marked code where code changes are required: >>>>> > >>>>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291 >>>>> > >>>>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349 >>>>> > >>>>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465 >>>>> > >>>>> > I am willing to provide help to implement this. Let me know how I >>>>> can help. >>>>> >>>>> As far as I'm aware, no one is actively working on it right now. >>>>> Please feel free to assign yourself the JIRA entry and I'll be happy >>>>> to answer any questions you might have if (well probably when) these >>>>> pointers are insufficient. >>>>> >>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw <rober...@google.com> >>>>> wrote: >>>>> >> >>>>> >> This is documented at >>>>> >> >>>>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m >>>>> >> . Note that it requires participation of both the runner and the SDK >>>>> >> (though there are no correctness issues if one or the other side >>>>> does >>>>> >> not understand the protocol, caching just won't be used). >>>>> >> >>>>> >> I don't think it's been implemented anywhere, but could be very >>>>> >> beneficial for performance. >>>>> >> >>>>> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar <rakeshku...@lyft.com> >>>>> wrote: >>>>> >> > >>>>> >> > I checked the python sdk[1] and it has similar implementation as >>>>> Java SDK. >>>>> >> > >>>>> >> > I would agree with Thomas. In case of high volume event stream >>>>> and bigger cluster size, network call can potentially cause a bottleneck. >>>>> >> > >>>>> >> > @Robert >>>>> >> > I am interested to see the proposal. Can you provide me the link >>>>> of the proposal? >>>>> >> > >>>>> >> > [1]: >>>>> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295 >>>>> >> > >>>>> >> > >>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise <t...@apache.org> >>>>> wrote: >>>>> >> >> >>>>> >> >> Thanks for the pointer. For streaming, it will be important to >>>>> support caching across bundles. It appears that even the Java SDK doesn't >>>>> support that yet? >>>>> >> >> >>>>> >> >> >>>>> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221 >>>>> >> >> >>>>> >> >> Regarding clear/append: It would be nice if both could occur >>>>> within a single Fn Api roundtrip when the state is persisted. >>>>> >> >> >>>>> >> >> Thanks, >>>>> >> >> Thomas >>>>> >> >> >>>>> >> >> >>>>> >> >> >>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik <lc...@google.com> >>>>> wrote: >>>>> >> >>> >>>>> >> >>> User state is built on top of read, append and clear and not >>>>> off a read and write paradigm to allow for blind appends. >>>>> >> >>> >>>>> >> >>> The optimization you speak of can be done completely inside the >>>>> SDK without any additional protocol being required as long as you clear >>>>> the >>>>> state first and then append all your new data. The Beam Java SDK does this >>>>> for all runners when executed portably[1]. You could port the same logic >>>>> to >>>>> the Beam Python SDK as well. >>>>> >> >>> >>>>> >> >>> 1: >>>>> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84 >>>>> >> >>> >>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw < >>>>> rober...@google.com> wrote: >>>>> >> >>>> >>>>> >> >>>> Python workers also have a per-bundle SDK-side cache. A >>>>> protocol has >>>>> >> >>>> been proposed, but hasn't yet been implemented in any SDKs or >>>>> runners. >>>>> >> >>>> >>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax <re...@google.com> >>>>> wrote: >>>>> >> >>>> > >>>>> >> >>>> > It's runner dependent. Some runners (e.g. the Dataflow >>>>> runner) do have such a cache, though I think it's currently has a cap for >>>>> large bags. >>>>> >> >>>> > >>>>> >> >>>> > Reuven >>>>> >> >>>> > >>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar < >>>>> rakeshku...@lyft.com> wrote: >>>>> >> >>>> >> >>>>> >> >>>> >> Hi, >>>>> >> >>>> >> >>>>> >> >>>> >> I have been using python sdk for the application and also >>>>> using BagState in production. I was wondering whether state logic has any >>>>> write-through-cache implemented or not. If we are sending every read and >>>>> write request through network then it comes with a performance cost. We >>>>> can >>>>> avoid network call for a read operation if we have write-through-cache. >>>>> >> >>>> >> I have superficially looked into the implementation and I >>>>> didn't see any cache implementation. >>>>> >> >>>> >> >>>>> >> >>>> >> is it possible to have this cache? would it cause any issue >>>>> if we have the caching layer? >>>>> >> >>>> >> >>>>> >>>>