On Fri, Aug 9, 2019 at 2:32 AM Robert Bradshaw <rober...@google.com> wrote:

> The question is whether the SDK needs to wait for the StateResponse to
> come back before declaring the bundle done. The proposal was to not
> send the cache token back as part of an append StateResponse [1], but
> pre-provide it as part of the bundle request.
>

Agree, the purpose of the I'm Blocked message is to occur during bundle
processing.


> Thinking about this some more, if we assume the state response was
> successfully applied, there's no reason for the SDK to block the
> bundle until it has its hands on the cache token--we can update the
> cache once the StateResponse comes back whether or not the bundle is
> still active. On the other hand, the runner needs a way to assert it
> has received and processed all StateRequests from the SDK associated
> with a bundle before it can declare the bundle complete (regardless of
> the cache tokens), so this might not be safe without some extra
> coordination (e.g. the ProcessBundleResponse indicating the number of
> state requests associated with a bundle).
>

Since the state request stream is ordered, we can add the id of the last
state request as part of the ProcessBundleResponse.


> [1]
> https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L627
>
> On Thu, Aug 8, 2019 at 6:57 PM Lukasz Cwik <lc...@google.com> wrote:
> >
> > The purpose of the new state API call in BEAM-7000 is to tell the runner
> that the SDK is now blocked waiting for the result of a specific state
> request and it should be used for fetches (not updates) and is there to
> allow for SDKs to differentiate readLater (I will need this data at some
> point in time in the future) from read (I need this data now). This comes
> up commonly where the user prefetches multiple state cells and then looks
> at their content allowing the runner to batch up those calls on its end.
> >
> > The way it can be used for clear+append is that the runner can store
> requests in memory up until some time/memory limit or until it gets its
> first "blocked" call and then issue all the requests together.
> >
> >
> > On Thu, Aug 8, 2019 at 9:42 AM Robert Bradshaw <rober...@google.com>
> wrote:
> >>
> >> On Tue, Aug 6, 2019 at 12:07 AM Thomas Weise <t...@apache.org> wrote:
> >> >
> >> > That would add a synchronization point that forces extra latency
> especially in streaming mode.
> >> >
> >> > Wouldn't it be possible for the runner to assign the token when
> starting the bundle and for the SDK to pass it along the state requests?
> That way, there would be no need to batch and wait for a flush.
> >>
> >> I think it makes sense to let the runner pre-assign these state update
> >> tokens rather than forcing a synchronization point.
> >>
> >> Here's some pointers for the Python implementation:
> >>
> >> Currently, when a DoFn needs UserState, a StateContext object is used
> >> that converts from a StateSpec to the actual value. When running
> >> portably, this is FnApiUserStateContext [1]. The state handles
> >> themselves are cached at [2] but this context only lives for the
> >> lifetime of a single bundle. Logic could be added here to use the
> >> token to share these across bundles.
> >>
> >> Each of these handles in turn invokes state_handler.get* methods when
> >> its read is called. (Here state_handler is a thin wrapper around the
> >> service itself) and constructs the appropriate result from the
> >> StateResponse. We would need to implement caching at this level as
> >> well, including the deserialization. This will probably require some
> >> restructoring of how _StateBackedIterable is implemented (or,
> >> possibly, making that class itself cache aware). Hopefully that's
> >> enough to get started.
> >>
> >> [1]
> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L402
> >> [2]
> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L436
> >> .
> >>
> >> > On Mon, Aug 5, 2019 at 2:49 PM Lukasz Cwik <lc...@google.com> wrote:
> >> >>
> >> >> I believe the intent is to add a new state API call telling the
> runner that it is blocked waiting for a response (BEAM-7000).
> >> >>
> >> >> This should allow the runner to wait till it sees one of these I'm
> blocked requests and then merge + batch any state calls it may have at that
> point in time allowing it to convert clear + appends into set calls and do
> any other optimizations as well. By default, the runner would have a time
> and space based limit on how many outstanding state calls there are before
> choosing to resolve them.
> >> >>
> >> >> On Mon, Aug 5, 2019 at 5:43 PM Lukasz Cwik <lc...@google.com> wrote:
> >> >>>
> >> >>> Now I see what you mean.
> >> >>>
> >> >>> On Mon, Aug 5, 2019 at 5:42 PM Thomas Weise <t...@apache.org> wrote:
> >> >>>>
> >> >>>> Hi Luke,
> >> >>>>
> >> >>>> I guess the answer is that it depends on the state backend. If a
> set operation in the state backend is available that is more efficient than
> clear+append, then it would be beneficial to have a dedicated fn api
> operation to allow for such optimization. That's something that needs to be
> determined with a profiler :)
> >> >>>>
> >> >>>> But the low hanging fruit is cross-bundle caching.
> >> >>>>
> >> >>>> Thomas
> >> >>>>
> >> >>>> On Mon, Aug 5, 2019 at 2:06 PM Lukasz Cwik <lc...@google.com>
> wrote:
> >> >>>>>
> >> >>>>> Thomas, why do you think a single round trip is needed?
> >> >>>>>
> >> >>>>> clear + append can be done blindly from the SDK side and it has
> total knowledge of the state at that point in time till the end of the
> bundle at which point you want to wait to get the cache token back from the
> runner for the append call so that for the next bundle you can reuse the
> state if the key wasn't processed elsewhere.
> >> >>>>>
> >> >>>>> Also, all state calls are "streamed" over gRPC so you don't need
> to wait for clear to complete before being able to send append.
> >> >>>>>
> >> >>>>> On Tue, Jul 30, 2019 at 12:58 AM jincheng sun <
> sunjincheng...@gmail.com> wrote:
> >> >>>>>>
> >> >>>>>> Hi Rakesh,
> >> >>>>>>
> >> >>>>>> Glad to see you pointer this problem out!
> >> >>>>>> +1 for add this implementation. Manage State by
> write-through-cache is pretty important for Streaming job!
> >> >>>>>>
> >> >>>>>> Best, Jincheng
> >> >>>>>>
> >> >>>>>> Thomas Weise <t...@apache.org> 于2019年7月29日周一 下午8:54写道:
> >> >>>>>>>
> >> >>>>>>> FYI a basic test appears to confirm the importance of the
> cross-bundle caching: I found that the throughput can be increased by
> playing with the bundle size in the Flink runner. Default caps at 1000
> elements (or 1 second). So on a high throughput stream the bundles would be
> capped by the count limit. Bumping the count limit increases the throughput
> by reducing the chatter over the state plane (more cache hits due to larger
> bundle).
> >> >>>>>>>
> >> >>>>>>> The next level of investigation would involve profiling. But
> just by looking at metrics, the CPU utilization on the Python worker side
> dropped significantly while on the Flink side it remains nearly same. There
> are no metrics for state operations on either side, I think it would be
> very helpful to get these in place also.
> >> >>>>>>>
> >> >>>>>>> Below the stateful processing code for reference.
> >> >>>>>>>
> >> >>>>>>> Thomas
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>>> class StatefulFn(beam.DoFn):
> >> >>>>>>>     count_state_spec = userstate.CombiningValueStateSpec(
> >> >>>>>>>         'count',
> beam.coders.IterableCoder(beam.coders.VarIntCoder()), sum)
> >> >>>>>>>     timer_spec = userstate.TimerSpec('timer',
> userstate.TimeDomain.WATERMARK)
> >> >>>>>>>
> >> >>>>>>>     def process(self, kv,
> count=beam.DoFn.StateParam(count_state_spec),
> timer=beam.DoFn.TimerParam(timer_spec), window=beam.DoFn.WindowParam):
> >> >>>>>>>         count.add(1)
> >> >>>>>>>         timer_seconds = (window.end.micros // 1000000) - 1
> >> >>>>>>>         timer.set(timer_seconds)
> >> >>>>>>>
> >> >>>>>>>     @userstate.on_timer(timer_spec)
> >> >>>>>>>     def process_timer(self,
> count=beam.DoFn.StateParam(count_state_spec), window=beam.DoFn.WindowParam):
> >> >>>>>>>         if count.read() == 0:
> >> >>>>>>>             logging.warning("###timer fired with count %d,
> window %s" % (count.read(), window))
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>>> On Thu, Jul 25, 2019 at 5:09 AM Robert Bradshaw <
> rober...@google.com> wrote:
> >> >>>>>>>>
> >> >>>>>>>> On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar <
> rakeshku...@lyft.com> wrote:
> >> >>>>>>>> >
> >> >>>>>>>> > Thanks Robert,
> >> >>>>>>>> >
> >> >>>>>>>> >  I stumble on the jira that you have created some time ago
> >> >>>>>>>> > https://jira.apache.org/jira/browse/BEAM-5428
> >> >>>>>>>> >
> >> >>>>>>>> > You also marked code where code changes are required:
> >> >>>>>>>> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
> >> >>>>>>>> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
> >> >>>>>>>> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
> >> >>>>>>>> >
> >> >>>>>>>> > I am willing to provide help to implement this. Let me know
> how I can help.
> >> >>>>>>>>
> >> >>>>>>>> As far as I'm aware, no one is actively working on it right
> now.
> >> >>>>>>>> Please feel free to assign yourself the JIRA entry and I'll be
> happy
> >> >>>>>>>> to answer any questions you might have if (well probably when)
> these
> >> >>>>>>>> pointers are insufficient.
> >> >>>>>>>>
> >> >>>>>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw <
> rober...@google.com> wrote:
> >> >>>>>>>> >>
> >> >>>>>>>> >> This is documented at
> >> >>>>>>>> >>
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
> >> >>>>>>>> >> . Note that it requires participation of both the runner
> and the SDK
> >> >>>>>>>> >> (though there are no correctness issues if one or the other
> side does
> >> >>>>>>>> >> not understand the protocol, caching just won't be used).
> >> >>>>>>>> >>
> >> >>>>>>>> >> I don't think it's been implemented anywhere, but could be
> very
> >> >>>>>>>> >> beneficial for performance.
> >> >>>>>>>> >>
> >> >>>>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar <
> rakeshku...@lyft.com> wrote:
> >> >>>>>>>> >> >
> >> >>>>>>>> >> > I checked the python sdk[1] and it has similar
> implementation as Java SDK.
> >> >>>>>>>> >> >
> >> >>>>>>>> >> > I would agree with Thomas. In case of high volume event
> stream and bigger cluster size, network call can potentially cause a
> bottleneck.
> >> >>>>>>>> >> >
> >> >>>>>>>> >> > @Robert
> >> >>>>>>>> >> > I am interested to see the proposal. Can you provide me
> the link of the proposal?
> >> >>>>>>>> >> >
> >> >>>>>>>> >> > [1]:
> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
> >> >>>>>>>> >> >
> >> >>>>>>>> >> >
> >> >>>>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise <
> t...@apache.org> wrote:
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >> Thanks for the pointer. For streaming, it will be
> important to support caching across bundles. It appears that even the Java
> SDK doesn't support that yet?
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >>
> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >> Regarding clear/append: It would be nice if both could
> occur within a single Fn Api roundtrip when the state is persisted.
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >> Thanks,
> >> >>>>>>>> >> >> Thomas
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >>
> >> >>>>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik <
> lc...@google.com> wrote:
> >> >>>>>>>> >> >>>
> >> >>>>>>>> >> >>> User state is built on top of read, append and clear
> and not off a read and write paradigm to allow for blind appends.
> >> >>>>>>>> >> >>>
> >> >>>>>>>> >> >>> The optimization you speak of can be done completely
> inside the SDK without any additional protocol being required as long as
> you clear the state first and then append all your new data. The Beam Java
> SDK does this for all runners when executed portably[1]. You could port the
> same logic to the Beam Python SDK as well.
> >> >>>>>>>> >> >>>
> >> >>>>>>>> >> >>> 1:
> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
> >> >>>>>>>> >> >>>
> >> >>>>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw <
> rober...@google.com> wrote:
> >> >>>>>>>> >> >>>>
> >> >>>>>>>> >> >>>> Python workers also have a per-bundle SDK-side cache.
> A protocol has
> >> >>>>>>>> >> >>>> been proposed, but hasn't yet been implemented in any
> SDKs or runners.
> >> >>>>>>>> >> >>>>
> >> >>>>>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax <
> re...@google.com> wrote:
> >> >>>>>>>> >> >>>> >
> >> >>>>>>>> >> >>>> > It's runner dependent. Some runners (e.g. the
> Dataflow runner) do have such a cache, though I think it's currently has a
> cap for large bags.
> >> >>>>>>>> >> >>>> >
> >> >>>>>>>> >> >>>> > Reuven
> >> >>>>>>>> >> >>>> >
> >> >>>>>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar <
> rakeshku...@lyft.com> wrote:
> >> >>>>>>>> >> >>>> >>
> >> >>>>>>>> >> >>>> >> Hi,
> >> >>>>>>>> >> >>>> >>
> >> >>>>>>>> >> >>>> >> I have been using python sdk for the application
> and also using BagState in production. I was wondering whether state logic
> has any write-through-cache implemented or not. If we are sending every
> read and write request through network then it comes with a performance
> cost. We can avoid network call for a read operation if we have
> write-through-cache.
> >> >>>>>>>> >> >>>> >> I have superficially looked into the implementation
> and I didn't see any cache implementation.
> >> >>>>>>>> >> >>>> >>
> >> >>>>>>>> >> >>>> >> is it possible to have this cache? would it cause
> any issue if we have the caching layer?
> >> >>>>>>>> >> >>>> >>
>

Reply via email to