On Mon, Aug 12, 2019 at 10:09 AM Thomas Weise <t...@apache.org> wrote:
> > On Mon, Aug 12, 2019 at 8:53 AM Maximilian Michels <m...@apache.org> wrote: > >> Thanks for starting this discussion Rakesh. An efficient cache layer is >> one of the missing pieces for good performance in stateful pipelines. >> The good news are that there is a level of caching already present in >> Python which batches append requests until the bundle is finished. >> >> Thomas, in your example indeed we would have to profile to see why CPU >> utilization is high on the Flink side but not in the Python SDK harness. >> For example, older versions of Flink (<=1.5) have a high cost of >> deleting existing instances of a timer when setting a timer. >> Nevertheless, cross-bundle caching would likely result in increased >> performance. >> > > CPU on the Flink side was unchanged, and that's important. The throughout > improvement comes from the extended bundle caching on the SDK side. That's > what tells me that cross-bundle caching is needed. Of course, it will > require a good solution for the write also and I like your idea of using > the checkpoint boundary for that, especially since that already aligns with > the bundle boundary and is under runner control. Of course we also want to > be careful to not cause overly bursty writes. > > Profiling will be useful for the timer processing, that is also on my list > of suspects. > > >> Luke, I think the idea to merge pending state requests could be >> complementary to caching across bundles. >> >> Question: Couldn't we defer flushing back state from the SDK to the >> Runner indefinitely, provided that we add a way to flush the state in >> case of a checkpoint? >> > Flushing is needed to prevent the SDK from running out of memory. Having a fixed budget for state inside the SDK would have flushing happen under certain state usage scenarios. I could also see that only flushing at checkpoint may lead to slow checkpoint performance so we may want to flush state that hasn't been used in a while as well. > Another performance improvement would be caching read requests because >> these first go to the Runner regardless of already cached appends. >> >> -Max >> >> On 09.08.19 17:12, Lukasz Cwik wrote: >> > >> > >> > On Fri, Aug 9, 2019 at 2:32 AM Robert Bradshaw <rober...@google.com >> > <mailto:rober...@google.com>> wrote: >> > >> > The question is whether the SDK needs to wait for the StateResponse >> to >> > come back before declaring the bundle done. The proposal was to not >> > send the cache token back as part of an append StateResponse [1], >> but >> > pre-provide it as part of the bundle request. >> > >> > >> > Agree, the purpose of the I'm Blocked message is to occur during bundle >> > processing. >> > >> > >> > Thinking about this some more, if we assume the state response was >> > successfully applied, there's no reason for the SDK to block the >> > bundle until it has its hands on the cache token--we can update the >> > cache once the StateResponse comes back whether or not the bundle is >> > still active. On the other hand, the runner needs a way to assert it >> > has received and processed all StateRequests from the SDK associated >> > with a bundle before it can declare the bundle complete (regardless >> of >> > the cache tokens), so this might not be safe without some extra >> > coordination (e.g. the ProcessBundleResponse indicating the number >> of >> > state requests associated with a bundle). >> > >> > >> > Since the state request stream is ordered, we can add the id of the last >> > state request as part of the ProcessBundleResponse. >> > >> > >> > [1] >> > >> https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L627 >> > >> > On Thu, Aug 8, 2019 at 6:57 PM Lukasz Cwik <lc...@google.com >> > <mailto:lc...@google.com>> wrote: >> > > >> > > The purpose of the new state API call in BEAM-7000 is to tell the >> > runner that the SDK is now blocked waiting for the result of a >> > specific state request and it should be used for fetches (not >> > updates) and is there to allow for SDKs to differentiate readLater >> > (I will need this data at some point in time in the future) from >> > read (I need this data now). This comes up commonly where the user >> > prefetches multiple state cells and then looks at their content >> > allowing the runner to batch up those calls on its end. >> > > >> > > The way it can be used for clear+append is that the runner can >> > store requests in memory up until some time/memory limit or until it >> > gets its first "blocked" call and then issue all the requests >> together. >> > > >> > > >> > > On Thu, Aug 8, 2019 at 9:42 AM Robert Bradshaw >> > <rober...@google.com <mailto:rober...@google.com>> wrote: >> > >> >> > >> On Tue, Aug 6, 2019 at 12:07 AM Thomas Weise <t...@apache.org >> > <mailto:t...@apache.org>> wrote: >> > >> > >> > >> > That would add a synchronization point that forces extra >> > latency especially in streaming mode. >> > >> > >> > >> > Wouldn't it be possible for the runner to assign the token when >> > starting the bundle and for the SDK to pass it along the state >> > requests? That way, there would be no need to batch and wait for a >> > flush. >> > >> >> > >> I think it makes sense to let the runner pre-assign these state >> > update >> > >> tokens rather than forcing a synchronization point. >> > >> >> > >> Here's some pointers for the Python implementation: >> > >> >> > >> Currently, when a DoFn needs UserState, a StateContext object is >> used >> > >> that converts from a StateSpec to the actual value. When running >> > >> portably, this is FnApiUserStateContext [1]. The state handles >> > >> themselves are cached at [2] but this context only lives for the >> > >> lifetime of a single bundle. Logic could be added here to use the >> > >> token to share these across bundles. >> > >> >> > >> Each of these handles in turn invokes state_handler.get* methods >> when >> > >> its read is called. (Here state_handler is a thin wrapper around >> the >> > >> service itself) and constructs the appropriate result from the >> > >> StateResponse. We would need to implement caching at this level >> as >> > >> well, including the deserialization. This will probably require >> some >> > >> restructoring of how _StateBackedIterable is implemented (or, >> > >> possibly, making that class itself cache aware). Hopefully that's >> > >> enough to get started. >> > >> >> > >> [1] >> > >> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L402 >> > >> [2] >> > >> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L436 >> > >> . >> > >> >> > >> > On Mon, Aug 5, 2019 at 2:49 PM Lukasz Cwik <lc...@google.com >> > <mailto:lc...@google.com>> wrote: >> > >> >> >> > >> >> I believe the intent is to add a new state API call telling >> > the runner that it is blocked waiting for a response (BEAM-7000). >> > >> >> >> > >> >> This should allow the runner to wait till it sees one of these >> > I'm blocked requests and then merge + batch any state calls it may >> > have at that point in time allowing it to convert clear + appends >> > into set calls and do any other optimizations as well. By default, >> > the runner would have a time and space based limit on how many >> > outstanding state calls there are before choosing to resolve them. >> > >> >> >> > >> >> On Mon, Aug 5, 2019 at 5:43 PM Lukasz Cwik <lc...@google.com >> > <mailto:lc...@google.com>> wrote: >> > >> >>> >> > >> >>> Now I see what you mean. >> > >> >>> >> > >> >>> On Mon, Aug 5, 2019 at 5:42 PM Thomas Weise <t...@apache.org >> > <mailto:t...@apache.org>> wrote: >> > >> >>>> >> > >> >>>> Hi Luke, >> > >> >>>> >> > >> >>>> I guess the answer is that it depends on the state backend. >> > If a set operation in the state backend is available that is more >> > efficient than clear+append, then it would be beneficial to have a >> > dedicated fn api operation to allow for such optimization. That's >> > something that needs to be determined with a profiler :) >> > >> >>>> >> > >> >>>> But the low hanging fruit is cross-bundle caching. >> > >> >>>> >> > >> >>>> Thomas >> > >> >>>> >> > >> >>>> On Mon, Aug 5, 2019 at 2:06 PM Lukasz Cwik < >> lc...@google.com >> > <mailto:lc...@google.com>> wrote: >> > >> >>>>> >> > >> >>>>> Thomas, why do you think a single round trip is needed? >> > >> >>>>> >> > >> >>>>> clear + append can be done blindly from the SDK side and it >> > has total knowledge of the state at that point in time till the end >> > of the bundle at which point you want to wait to get the cache token >> > back from the runner for the append call so that for the next bundle >> > you can reuse the state if the key wasn't processed elsewhere. >> > >> >>>>> >> > >> >>>>> Also, all state calls are "streamed" over gRPC so you don't >> > need to wait for clear to complete before being able to send append. >> > >> >>>>> >> > >> >>>>> On Tue, Jul 30, 2019 at 12:58 AM jincheng sun >> > <sunjincheng...@gmail.com <mailto:sunjincheng...@gmail.com>> wrote: >> > >> >>>>>> >> > >> >>>>>> Hi Rakesh, >> > >> >>>>>> >> > >> >>>>>> Glad to see you pointer this problem out! >> > >> >>>>>> +1 for add this implementation. Manage State by >> > write-through-cache is pretty important for Streaming job! >> > >> >>>>>> >> > >> >>>>>> Best, Jincheng >> > >> >>>>>> >> > >> >>>>>> Thomas Weise <t...@apache.org <mailto:t...@apache.org>> 于 >> > 2019年7月29日周一 下午8:54写道: >> > >> >>>>>>> >> > >> >>>>>>> FYI a basic test appears to confirm the importance of the >> > cross-bundle caching: I found that the throughput can be increased >> > by playing with the bundle size in the Flink runner. Default caps at >> > 1000 elements (or 1 second). So on a high throughput stream the >> > bundles would be capped by the count limit. Bumping the count limit >> > increases the throughput by reducing the chatter over the state >> > plane (more cache hits due to larger bundle). >> > >> >>>>>>> >> > >> >>>>>>> The next level of investigation would involve profiling. >> > But just by looking at metrics, the CPU utilization on the Python >> > worker side dropped significantly while on the Flink side it remains >> > nearly same. There are no metrics for state operations on either >> > side, I think it would be very helpful to get these in place also. >> > >> >>>>>>> >> > >> >>>>>>> Below the stateful processing code for reference. >> > >> >>>>>>> >> > >> >>>>>>> Thomas >> > >> >>>>>>> >> > >> >>>>>>> >> > >> >>>>>>> class StatefulFn(beam.DoFn): >> > >> >>>>>>> count_state_spec = userstate.CombiningValueStateSpec( >> > >> >>>>>>> 'count', >> > beam.coders.IterableCoder(beam.coders.VarIntCoder()), sum) >> > >> >>>>>>> timer_spec = userstate.TimerSpec('timer', >> > userstate.TimeDomain.WATERMARK) >> > >> >>>>>>> >> > >> >>>>>>> def process(self, kv, >> > count=beam.DoFn.StateParam(count_state_spec), >> > timer=beam.DoFn.TimerParam(timer_spec), >> window=beam.DoFn.WindowParam): >> > >> >>>>>>> count.add(1) >> > >> >>>>>>> timer_seconds = (window.end.micros // 1000000) - >> 1 >> > >> >>>>>>> timer.set(timer_seconds) >> > >> >>>>>>> >> > >> >>>>>>> @userstate.on_timer(timer_spec) >> > >> >>>>>>> def process_timer(self, >> > count=beam.DoFn.StateParam(count_state_spec), >> > window=beam.DoFn.WindowParam): >> > >> >>>>>>> if count.read() == 0: >> > >> >>>>>>> logging.warning("###timer fired with count >> > %d, window %s" % (count.read(), window)) >> > >> >>>>>>> >> > >> >>>>>>> >> > >> >>>>>>> >> > >> >>>>>>> On Thu, Jul 25, 2019 at 5:09 AM Robert Bradshaw >> > <rober...@google.com <mailto:rober...@google.com>> wrote: >> > >> >>>>>>>> >> > >> >>>>>>>> On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar >> > <rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>> wrote: >> > >> >>>>>>>> > >> > >> >>>>>>>> > Thanks Robert, >> > >> >>>>>>>> > >> > >> >>>>>>>> > I stumble on the jira that you have created some >> time ago >> > >> >>>>>>>> > https://jira.apache.org/jira/browse/BEAM-5428 >> > >> >>>>>>>> > >> > >> >>>>>>>> > You also marked code where code changes are required: >> > >> >>>>>>>> > >> > >> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291 >> > >> >>>>>>>> > >> > >> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349 >> > >> >>>>>>>> > >> > >> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465 >> > >> >>>>>>>> > >> > >> >>>>>>>> > I am willing to provide help to implement this. Let me >> > know how I can help. >> > >> >>>>>>>> >> > >> >>>>>>>> As far as I'm aware, no one is actively working on it >> > right now. >> > >> >>>>>>>> Please feel free to assign yourself the JIRA entry and >> > I'll be happy >> > >> >>>>>>>> to answer any questions you might have if (well probably >> > when) these >> > >> >>>>>>>> pointers are insufficient. >> > >> >>>>>>>> >> > >> >>>>>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw >> > <rober...@google.com <mailto:rober...@google.com>> wrote: >> > >> >>>>>>>> >> >> > >> >>>>>>>> >> This is documented at >> > >> >>>>>>>> >> >> > >> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m >> > >> >>>>>>>> >> . Note that it requires participation of both the >> > runner and the SDK >> > >> >>>>>>>> >> (though there are no correctness issues if one or the >> > other side does >> > >> >>>>>>>> >> not understand the protocol, caching just won't be >> used). >> > >> >>>>>>>> >> >> > >> >>>>>>>> >> I don't think it's been implemented anywhere, but >> > could be very >> > >> >>>>>>>> >> beneficial for performance. >> > >> >>>>>>>> >> >> > >> >>>>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar >> > <rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>> wrote: >> > >> >>>>>>>> >> > >> > >> >>>>>>>> >> > I checked the python sdk[1] and it has similar >> > implementation as Java SDK. >> > >> >>>>>>>> >> > >> > >> >>>>>>>> >> > I would agree with Thomas. In case of high volume >> > event stream and bigger cluster size, network call can potentially >> > cause a bottleneck. >> > >> >>>>>>>> >> > >> > >> >>>>>>>> >> > @Robert >> > >> >>>>>>>> >> > I am interested to see the proposal. Can you >> > provide me the link of the proposal? >> > >> >>>>>>>> >> > >> > >> >>>>>>>> >> > [1]: >> > >> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295 >> > >> >>>>>>>> >> > >> > >> >>>>>>>> >> > >> > >> >>>>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise >> > <t...@apache.org <mailto:t...@apache.org>> wrote: >> > >> >>>>>>>> >> >> >> > >> >>>>>>>> >> >> Thanks for the pointer. For streaming, it will be >> > important to support caching across bundles. It appears that even >> > the Java SDK doesn't support that yet? >> > >> >>>>>>>> >> >> >> > >> >>>>>>>> >> >> >> > >> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221 >> > >> >>>>>>>> >> >> >> > >> >>>>>>>> >> >> Regarding clear/append: It would be nice if both >> > could occur within a single Fn Api roundtrip when the state is >> > persisted. >> > >> >>>>>>>> >> >> >> > >> >>>>>>>> >> >> Thanks, >> > >> >>>>>>>> >> >> Thomas >> > >> >>>>>>>> >> >> >> > >> >>>>>>>> >> >> >> > >> >>>>>>>> >> >> >> > >> >>>>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik >> > <lc...@google.com <mailto:lc...@google.com>> wrote: >> > >> >>>>>>>> >> >>> >> > >> >>>>>>>> >> >>> User state is built on top of read, append and >> > clear and not off a read and write paradigm to allow for blind >> appends. >> > >> >>>>>>>> >> >>> >> > >> >>>>>>>> >> >>> The optimization you speak of can be done >> > completely inside the SDK without any additional protocol being >> > required as long as you clear the state first and then append all >> > your new data. The Beam Java SDK does this for all runners when >> > executed portably[1]. You could port the same logic to the Beam >> > Python SDK as well. >> > >> >>>>>>>> >> >>> >> > >> >>>>>>>> >> >>> 1: >> > >> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84 >> > >> >>>>>>>> >> >>> >> > >> >>>>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw >> > <rober...@google.com <mailto:rober...@google.com>> wrote: >> > >> >>>>>>>> >> >>>> >> > >> >>>>>>>> >> >>>> Python workers also have a per-bundle SDK-side >> > cache. A protocol has >> > >> >>>>>>>> >> >>>> been proposed, but hasn't yet been implemented >> > in any SDKs or runners. >> > >> >>>>>>>> >> >>>> >> > >> >>>>>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax >> > <re...@google.com <mailto:re...@google.com>> wrote: >> > >> >>>>>>>> >> >>>> > >> > >> >>>>>>>> >> >>>> > It's runner dependent. Some runners (e.g. the >> > Dataflow runner) do have such a cache, though I think it's currently >> > has a cap for large bags. >> > >> >>>>>>>> >> >>>> > >> > >> >>>>>>>> >> >>>> > Reuven >> > >> >>>>>>>> >> >>>> > >> > >> >>>>>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar >> > <rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>> wrote: >> > >> >>>>>>>> >> >>>> >> >> > >> >>>>>>>> >> >>>> >> Hi, >> > >> >>>>>>>> >> >>>> >> >> > >> >>>>>>>> >> >>>> >> I have been using python sdk for the >> > application and also using BagState in production. I was wondering >> > whether state logic has any write-through-cache implemented or not. >> > If we are sending every read and write request through network then >> > it comes with a performance cost. We can avoid network call for a >> > read operation if we have write-through-cache. >> > >> >>>>>>>> >> >>>> >> I have superficially looked into the >> > implementation and I didn't see any cache implementation. >> > >> >>>>>>>> >> >>>> >> >> > >> >>>>>>>> >> >>>> >> is it possible to have this cache? would it >> > cause any issue if we have the caching layer? >> > >> >>>>>>>> >> >>>> >> >> > >> >