On Fri, Aug 9, 2019 at 2:32 AM Robert Bradshaw <rober...@google.com> wrote:
> The question is whether the SDK needs to wait for the StateResponse to > come back before declaring the bundle done. The proposal was to not > send the cache token back as part of an append StateResponse [1], but > pre-provide it as part of the bundle request. > Agree, the purpose of the I'm Blocked message is to occur during bundle processing. > Thinking about this some more, if we assume the state response was > successfully applied, there's no reason for the SDK to block the > bundle until it has its hands on the cache token--we can update the > cache once the StateResponse comes back whether or not the bundle is > still active. On the other hand, the runner needs a way to assert it > has received and processed all StateRequests from the SDK associated > with a bundle before it can declare the bundle complete (regardless of > the cache tokens), so this might not be safe without some extra > coordination (e.g. the ProcessBundleResponse indicating the number of > state requests associated with a bundle). > Since the state request stream is ordered, we can add the id of the last state request as part of the ProcessBundleResponse. > [1] > https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L627 > > On Thu, Aug 8, 2019 at 6:57 PM Lukasz Cwik <lc...@google.com> wrote: > > > > The purpose of the new state API call in BEAM-7000 is to tell the runner > that the SDK is now blocked waiting for the result of a specific state > request and it should be used for fetches (not updates) and is there to > allow for SDKs to differentiate readLater (I will need this data at some > point in time in the future) from read (I need this data now). This comes > up commonly where the user prefetches multiple state cells and then looks > at their content allowing the runner to batch up those calls on its end. > > > > The way it can be used for clear+append is that the runner can store > requests in memory up until some time/memory limit or until it gets its > first "blocked" call and then issue all the requests together. > > > > > > On Thu, Aug 8, 2019 at 9:42 AM Robert Bradshaw <rober...@google.com> > wrote: > >> > >> On Tue, Aug 6, 2019 at 12:07 AM Thomas Weise <t...@apache.org> wrote: > >> > > >> > That would add a synchronization point that forces extra latency > especially in streaming mode. > >> > > >> > Wouldn't it be possible for the runner to assign the token when > starting the bundle and for the SDK to pass it along the state requests? > That way, there would be no need to batch and wait for a flush. > >> > >> I think it makes sense to let the runner pre-assign these state update > >> tokens rather than forcing a synchronization point. > >> > >> Here's some pointers for the Python implementation: > >> > >> Currently, when a DoFn needs UserState, a StateContext object is used > >> that converts from a StateSpec to the actual value. When running > >> portably, this is FnApiUserStateContext [1]. The state handles > >> themselves are cached at [2] but this context only lives for the > >> lifetime of a single bundle. Logic could be added here to use the > >> token to share these across bundles. > >> > >> Each of these handles in turn invokes state_handler.get* methods when > >> its read is called. (Here state_handler is a thin wrapper around the > >> service itself) and constructs the appropriate result from the > >> StateResponse. We would need to implement caching at this level as > >> well, including the deserialization. This will probably require some > >> restructoring of how _StateBackedIterable is implemented (or, > >> possibly, making that class itself cache aware). Hopefully that's > >> enough to get started. > >> > >> [1] > https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L402 > >> [2] > https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L436 > >> . > >> > >> > On Mon, Aug 5, 2019 at 2:49 PM Lukasz Cwik <lc...@google.com> wrote: > >> >> > >> >> I believe the intent is to add a new state API call telling the > runner that it is blocked waiting for a response (BEAM-7000). > >> >> > >> >> This should allow the runner to wait till it sees one of these I'm > blocked requests and then merge + batch any state calls it may have at that > point in time allowing it to convert clear + appends into set calls and do > any other optimizations as well. By default, the runner would have a time > and space based limit on how many outstanding state calls there are before > choosing to resolve them. > >> >> > >> >> On Mon, Aug 5, 2019 at 5:43 PM Lukasz Cwik <lc...@google.com> wrote: > >> >>> > >> >>> Now I see what you mean. > >> >>> > >> >>> On Mon, Aug 5, 2019 at 5:42 PM Thomas Weise <t...@apache.org> wrote: > >> >>>> > >> >>>> Hi Luke, > >> >>>> > >> >>>> I guess the answer is that it depends on the state backend. If a > set operation in the state backend is available that is more efficient than > clear+append, then it would be beneficial to have a dedicated fn api > operation to allow for such optimization. That's something that needs to be > determined with a profiler :) > >> >>>> > >> >>>> But the low hanging fruit is cross-bundle caching. > >> >>>> > >> >>>> Thomas > >> >>>> > >> >>>> On Mon, Aug 5, 2019 at 2:06 PM Lukasz Cwik <lc...@google.com> > wrote: > >> >>>>> > >> >>>>> Thomas, why do you think a single round trip is needed? > >> >>>>> > >> >>>>> clear + append can be done blindly from the SDK side and it has > total knowledge of the state at that point in time till the end of the > bundle at which point you want to wait to get the cache token back from the > runner for the append call so that for the next bundle you can reuse the > state if the key wasn't processed elsewhere. > >> >>>>> > >> >>>>> Also, all state calls are "streamed" over gRPC so you don't need > to wait for clear to complete before being able to send append. > >> >>>>> > >> >>>>> On Tue, Jul 30, 2019 at 12:58 AM jincheng sun < > sunjincheng...@gmail.com> wrote: > >> >>>>>> > >> >>>>>> Hi Rakesh, > >> >>>>>> > >> >>>>>> Glad to see you pointer this problem out! > >> >>>>>> +1 for add this implementation. Manage State by > write-through-cache is pretty important for Streaming job! > >> >>>>>> > >> >>>>>> Best, Jincheng > >> >>>>>> > >> >>>>>> Thomas Weise <t...@apache.org> 于2019年7月29日周一 下午8:54写道: > >> >>>>>>> > >> >>>>>>> FYI a basic test appears to confirm the importance of the > cross-bundle caching: I found that the throughput can be increased by > playing with the bundle size in the Flink runner. Default caps at 1000 > elements (or 1 second). So on a high throughput stream the bundles would be > capped by the count limit. Bumping the count limit increases the throughput > by reducing the chatter over the state plane (more cache hits due to larger > bundle). > >> >>>>>>> > >> >>>>>>> The next level of investigation would involve profiling. But > just by looking at metrics, the CPU utilization on the Python worker side > dropped significantly while on the Flink side it remains nearly same. There > are no metrics for state operations on either side, I think it would be > very helpful to get these in place also. > >> >>>>>>> > >> >>>>>>> Below the stateful processing code for reference. > >> >>>>>>> > >> >>>>>>> Thomas > >> >>>>>>> > >> >>>>>>> > >> >>>>>>> class StatefulFn(beam.DoFn): > >> >>>>>>> count_state_spec = userstate.CombiningValueStateSpec( > >> >>>>>>> 'count', > beam.coders.IterableCoder(beam.coders.VarIntCoder()), sum) > >> >>>>>>> timer_spec = userstate.TimerSpec('timer', > userstate.TimeDomain.WATERMARK) > >> >>>>>>> > >> >>>>>>> def process(self, kv, > count=beam.DoFn.StateParam(count_state_spec), > timer=beam.DoFn.TimerParam(timer_spec), window=beam.DoFn.WindowParam): > >> >>>>>>> count.add(1) > >> >>>>>>> timer_seconds = (window.end.micros // 1000000) - 1 > >> >>>>>>> timer.set(timer_seconds) > >> >>>>>>> > >> >>>>>>> @userstate.on_timer(timer_spec) > >> >>>>>>> def process_timer(self, > count=beam.DoFn.StateParam(count_state_spec), window=beam.DoFn.WindowParam): > >> >>>>>>> if count.read() == 0: > >> >>>>>>> logging.warning("###timer fired with count %d, > window %s" % (count.read(), window)) > >> >>>>>>> > >> >>>>>>> > >> >>>>>>> > >> >>>>>>> On Thu, Jul 25, 2019 at 5:09 AM Robert Bradshaw < > rober...@google.com> wrote: > >> >>>>>>>> > >> >>>>>>>> On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar < > rakeshku...@lyft.com> wrote: > >> >>>>>>>> > > >> >>>>>>>> > Thanks Robert, > >> >>>>>>>> > > >> >>>>>>>> > I stumble on the jira that you have created some time ago > >> >>>>>>>> > https://jira.apache.org/jira/browse/BEAM-5428 > >> >>>>>>>> > > >> >>>>>>>> > You also marked code where code changes are required: > >> >>>>>>>> > > https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291 > >> >>>>>>>> > > https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349 > >> >>>>>>>> > > https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465 > >> >>>>>>>> > > >> >>>>>>>> > I am willing to provide help to implement this. Let me know > how I can help. > >> >>>>>>>> > >> >>>>>>>> As far as I'm aware, no one is actively working on it right > now. > >> >>>>>>>> Please feel free to assign yourself the JIRA entry and I'll be > happy > >> >>>>>>>> to answer any questions you might have if (well probably when) > these > >> >>>>>>>> pointers are insufficient. > >> >>>>>>>> > >> >>>>>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw < > rober...@google.com> wrote: > >> >>>>>>>> >> > >> >>>>>>>> >> This is documented at > >> >>>>>>>> >> > https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m > >> >>>>>>>> >> . Note that it requires participation of both the runner > and the SDK > >> >>>>>>>> >> (though there are no correctness issues if one or the other > side does > >> >>>>>>>> >> not understand the protocol, caching just won't be used). > >> >>>>>>>> >> > >> >>>>>>>> >> I don't think it's been implemented anywhere, but could be > very > >> >>>>>>>> >> beneficial for performance. > >> >>>>>>>> >> > >> >>>>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar < > rakeshku...@lyft.com> wrote: > >> >>>>>>>> >> > > >> >>>>>>>> >> > I checked the python sdk[1] and it has similar > implementation as Java SDK. > >> >>>>>>>> >> > > >> >>>>>>>> >> > I would agree with Thomas. In case of high volume event > stream and bigger cluster size, network call can potentially cause a > bottleneck. > >> >>>>>>>> >> > > >> >>>>>>>> >> > @Robert > >> >>>>>>>> >> > I am interested to see the proposal. Can you provide me > the link of the proposal? > >> >>>>>>>> >> > > >> >>>>>>>> >> > [1]: > https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295 > >> >>>>>>>> >> > > >> >>>>>>>> >> > > >> >>>>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise < > t...@apache.org> wrote: > >> >>>>>>>> >> >> > >> >>>>>>>> >> >> Thanks for the pointer. For streaming, it will be > important to support caching across bundles. It appears that even the Java > SDK doesn't support that yet? > >> >>>>>>>> >> >> > >> >>>>>>>> >> >> > https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221 > >> >>>>>>>> >> >> > >> >>>>>>>> >> >> Regarding clear/append: It would be nice if both could > occur within a single Fn Api roundtrip when the state is persisted. > >> >>>>>>>> >> >> > >> >>>>>>>> >> >> Thanks, > >> >>>>>>>> >> >> Thomas > >> >>>>>>>> >> >> > >> >>>>>>>> >> >> > >> >>>>>>>> >> >> > >> >>>>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik < > lc...@google.com> wrote: > >> >>>>>>>> >> >>> > >> >>>>>>>> >> >>> User state is built on top of read, append and clear > and not off a read and write paradigm to allow for blind appends. > >> >>>>>>>> >> >>> > >> >>>>>>>> >> >>> The optimization you speak of can be done completely > inside the SDK without any additional protocol being required as long as > you clear the state first and then append all your new data. The Beam Java > SDK does this for all runners when executed portably[1]. You could port the > same logic to the Beam Python SDK as well. > >> >>>>>>>> >> >>> > >> >>>>>>>> >> >>> 1: > https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84 > >> >>>>>>>> >> >>> > >> >>>>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw < > rober...@google.com> wrote: > >> >>>>>>>> >> >>>> > >> >>>>>>>> >> >>>> Python workers also have a per-bundle SDK-side cache. > A protocol has > >> >>>>>>>> >> >>>> been proposed, but hasn't yet been implemented in any > SDKs or runners. > >> >>>>>>>> >> >>>> > >> >>>>>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax < > re...@google.com> wrote: > >> >>>>>>>> >> >>>> > > >> >>>>>>>> >> >>>> > It's runner dependent. Some runners (e.g. the > Dataflow runner) do have such a cache, though I think it's currently has a > cap for large bags. > >> >>>>>>>> >> >>>> > > >> >>>>>>>> >> >>>> > Reuven > >> >>>>>>>> >> >>>> > > >> >>>>>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar < > rakeshku...@lyft.com> wrote: > >> >>>>>>>> >> >>>> >> > >> >>>>>>>> >> >>>> >> Hi, > >> >>>>>>>> >> >>>> >> > >> >>>>>>>> >> >>>> >> I have been using python sdk for the application > and also using BagState in production. I was wondering whether state logic > has any write-through-cache implemented or not. If we are sending every > read and write request through network then it comes with a performance > cost. We can avoid network call for a read operation if we have > write-through-cache. > >> >>>>>>>> >> >>>> >> I have superficially looked into the implementation > and I didn't see any cache implementation. > >> >>>>>>>> >> >>>> >> > >> >>>>>>>> >> >>>> >> is it possible to have this cache? would it cause > any issue if we have the caching layer? > >> >>>>>>>> >> >>>> >> >