On Mon, Aug 12, 2019 at 8:53 AM Maximilian Michels <m...@apache.org> wrote:

> Thanks for starting this discussion Rakesh. An efficient cache layer is
> one of the missing pieces for good performance in stateful pipelines.
> The good news are that there is a level of caching already present in
> Python which batches append requests until the bundle is finished.
>
> Thomas, in your example indeed we would have to profile to see why CPU
> utilization is high on the Flink side but not in the Python SDK harness.
> For example, older versions of Flink (<=1.5) have a high cost of
> deleting existing instances of a timer when setting a timer.
> Nevertheless, cross-bundle caching would likely result in increased
> performance.
>

CPU on the Flink side was unchanged, and that's important. The throughout
improvement comes from the extended bundle caching on the SDK side. That's
what tells me that cross-bundle caching is needed. Of course, it will
require a good solution for the write also and I like your idea of using
the checkpoint boundary for that, especially since that already aligns with
the bundle boundary and is under runner control. Of course we also want to
be careful to not cause overly bursty writes.

Profiling will be useful for the timer processing, that is also on my list
of suspects.


> Luke, I think the idea to merge pending state requests could be
> complementary to caching across bundles.
>
> Question: Couldn't we defer flushing back state from the SDK to the
> Runner indefinitely, provided that we add a way to flush the state in
> case of a checkpoint?
>
> Another performance improvement would be caching read requests because
> these first go to the Runner regardless of already cached appends.
>
> -Max
>
> On 09.08.19 17:12, Lukasz Cwik wrote:
> >
> >
> > On Fri, Aug 9, 2019 at 2:32 AM Robert Bradshaw <rober...@google.com
> > <mailto:rober...@google.com>> wrote:
> >
> >     The question is whether the SDK needs to wait for the StateResponse
> to
> >     come back before declaring the bundle done. The proposal was to not
> >     send the cache token back as part of an append StateResponse [1], but
> >     pre-provide it as part of the bundle request.
> >
> >
> > Agree, the purpose of the I'm Blocked message is to occur during bundle
> > processing.
> >
> >
> >     Thinking about this some more, if we assume the state response was
> >     successfully applied, there's no reason for the SDK to block the
> >     bundle until it has its hands on the cache token--we can update the
> >     cache once the StateResponse comes back whether or not the bundle is
> >     still active. On the other hand, the runner needs a way to assert it
> >     has received and processed all StateRequests from the SDK associated
> >     with a bundle before it can declare the bundle complete (regardless
> of
> >     the cache tokens), so this might not be safe without some extra
> >     coordination (e.g. the ProcessBundleResponse indicating the number of
> >     state requests associated with a bundle).
> >
> >
> > Since the state request stream is ordered, we can add the id of the last
> > state request as part of the ProcessBundleResponse.
> >
> >
> >     [1]
> >
> https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L627
> >
> >     On Thu, Aug 8, 2019 at 6:57 PM Lukasz Cwik <lc...@google.com
> >     <mailto:lc...@google.com>> wrote:
> >     >
> >     > The purpose of the new state API call in BEAM-7000 is to tell the
> >     runner that the SDK is now blocked waiting for the result of a
> >     specific state request and it should be used for fetches (not
> >     updates) and is there to allow for SDKs to differentiate readLater
> >     (I will need this data at some point in time in the future) from
> >     read (I need this data now). This comes up commonly where the user
> >     prefetches multiple state cells and then looks at their content
> >     allowing the runner to batch up those calls on its end.
> >     >
> >     > The way it can be used for clear+append is that the runner can
> >     store requests in memory up until some time/memory limit or until it
> >     gets its first "blocked" call and then issue all the requests
> together.
> >     >
> >     >
> >     > On Thu, Aug 8, 2019 at 9:42 AM Robert Bradshaw
> >     <rober...@google.com <mailto:rober...@google.com>> wrote:
> >     >>
> >     >> On Tue, Aug 6, 2019 at 12:07 AM Thomas Weise <t...@apache.org
> >     <mailto:t...@apache.org>> wrote:
> >     >> >
> >     >> > That would add a synchronization point that forces extra
> >     latency especially in streaming mode.
> >     >> >
> >     >> > Wouldn't it be possible for the runner to assign the token when
> >     starting the bundle and for the SDK to pass it along the state
> >     requests? That way, there would be no need to batch and wait for a
> >     flush.
> >     >>
> >     >> I think it makes sense to let the runner pre-assign these state
> >     update
> >     >> tokens rather than forcing a synchronization point.
> >     >>
> >     >> Here's some pointers for the Python implementation:
> >     >>
> >     >> Currently, when a DoFn needs UserState, a StateContext object is
> used
> >     >> that converts from a StateSpec to the actual value. When running
> >     >> portably, this is FnApiUserStateContext [1]. The state handles
> >     >> themselves are cached at [2] but this context only lives for the
> >     >> lifetime of a single bundle. Logic could be added here to use the
> >     >> token to share these across bundles.
> >     >>
> >     >> Each of these handles in turn invokes state_handler.get* methods
> when
> >     >> its read is called. (Here state_handler is a thin wrapper around
> the
> >     >> service itself) and constructs the appropriate result from the
> >     >> StateResponse. We would need to implement caching at this level as
> >     >> well, including the deserialization. This will probably require
> some
> >     >> restructoring of how _StateBackedIterable is implemented (or,
> >     >> possibly, making that class itself cache aware). Hopefully that's
> >     >> enough to get started.
> >     >>
> >     >> [1]
> >
> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L402
> >     >> [2]
> >
> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L436
> >     >> .
> >     >>
> >     >> > On Mon, Aug 5, 2019 at 2:49 PM Lukasz Cwik <lc...@google.com
> >     <mailto:lc...@google.com>> wrote:
> >     >> >>
> >     >> >> I believe the intent is to add a new state API call telling
> >     the runner that it is blocked waiting for a response (BEAM-7000).
> >     >> >>
> >     >> >> This should allow the runner to wait till it sees one of these
> >     I'm blocked requests and then merge + batch any state calls it may
> >     have at that point in time allowing it to convert clear + appends
> >     into set calls and do any other optimizations as well. By default,
> >     the runner would have a time and space based limit on how many
> >     outstanding state calls there are before choosing to resolve them.
> >     >> >>
> >     >> >> On Mon, Aug 5, 2019 at 5:43 PM Lukasz Cwik <lc...@google.com
> >     <mailto:lc...@google.com>> wrote:
> >     >> >>>
> >     >> >>> Now I see what you mean.
> >     >> >>>
> >     >> >>> On Mon, Aug 5, 2019 at 5:42 PM Thomas Weise <t...@apache.org
> >     <mailto:t...@apache.org>> wrote:
> >     >> >>>>
> >     >> >>>> Hi Luke,
> >     >> >>>>
> >     >> >>>> I guess the answer is that it depends on the state backend.
> >     If a set operation in the state backend is available that is more
> >     efficient than clear+append, then it would be beneficial to have a
> >     dedicated fn api operation to allow for such optimization. That's
> >     something that needs to be determined with a profiler :)
> >     >> >>>>
> >     >> >>>> But the low hanging fruit is cross-bundle caching.
> >     >> >>>>
> >     >> >>>> Thomas
> >     >> >>>>
> >     >> >>>> On Mon, Aug 5, 2019 at 2:06 PM Lukasz Cwik <lc...@google.com
> >     <mailto:lc...@google.com>> wrote:
> >     >> >>>>>
> >     >> >>>>> Thomas, why do you think a single round trip is needed?
> >     >> >>>>>
> >     >> >>>>> clear + append can be done blindly from the SDK side and it
> >     has total knowledge of the state at that point in time till the end
> >     of the bundle at which point you want to wait to get the cache token
> >     back from the runner for the append call so that for the next bundle
> >     you can reuse the state if the key wasn't processed elsewhere.
> >     >> >>>>>
> >     >> >>>>> Also, all state calls are "streamed" over gRPC so you don't
> >     need to wait for clear to complete before being able to send append.
> >     >> >>>>>
> >     >> >>>>> On Tue, Jul 30, 2019 at 12:58 AM jincheng sun
> >     <sunjincheng...@gmail.com <mailto:sunjincheng...@gmail.com>> wrote:
> >     >> >>>>>>
> >     >> >>>>>> Hi Rakesh,
> >     >> >>>>>>
> >     >> >>>>>> Glad to see you pointer this problem out!
> >     >> >>>>>> +1 for add this implementation. Manage State by
> >     write-through-cache is pretty important for Streaming job!
> >     >> >>>>>>
> >     >> >>>>>> Best, Jincheng
> >     >> >>>>>>
> >     >> >>>>>> Thomas Weise <t...@apache.org <mailto:t...@apache.org>> 于
> >     2019年7月29日周一 下午8:54写道:
> >     >> >>>>>>>
> >     >> >>>>>>> FYI a basic test appears to confirm the importance of the
> >     cross-bundle caching: I found that the throughput can be increased
> >     by playing with the bundle size in the Flink runner. Default caps at
> >     1000 elements (or 1 second). So on a high throughput stream the
> >     bundles would be capped by the count limit. Bumping the count limit
> >     increases the throughput by reducing the chatter over the state
> >     plane (more cache hits due to larger bundle).
> >     >> >>>>>>>
> >     >> >>>>>>> The next level of investigation would involve profiling.
> >     But just by looking at metrics, the CPU utilization on the Python
> >     worker side dropped significantly while on the Flink side it remains
> >     nearly same. There are no metrics for state operations on either
> >     side, I think it would be very helpful to get these in place also.
> >     >> >>>>>>>
> >     >> >>>>>>> Below the stateful processing code for reference.
> >     >> >>>>>>>
> >     >> >>>>>>> Thomas
> >     >> >>>>>>>
> >     >> >>>>>>>
> >     >> >>>>>>> class StatefulFn(beam.DoFn):
> >     >> >>>>>>>     count_state_spec = userstate.CombiningValueStateSpec(
> >     >> >>>>>>>         'count',
> >     beam.coders.IterableCoder(beam.coders.VarIntCoder()), sum)
> >     >> >>>>>>>     timer_spec = userstate.TimerSpec('timer',
> >     userstate.TimeDomain.WATERMARK)
> >     >> >>>>>>>
> >     >> >>>>>>>     def process(self, kv,
> >     count=beam.DoFn.StateParam(count_state_spec),
> >     timer=beam.DoFn.TimerParam(timer_spec),
> window=beam.DoFn.WindowParam):
> >     >> >>>>>>>         count.add(1)
> >     >> >>>>>>>         timer_seconds = (window.end.micros // 1000000) - 1
> >     >> >>>>>>>         timer.set(timer_seconds)
> >     >> >>>>>>>
> >     >> >>>>>>>     @userstate.on_timer(timer_spec)
> >     >> >>>>>>>     def process_timer(self,
> >     count=beam.DoFn.StateParam(count_state_spec),
> >     window=beam.DoFn.WindowParam):
> >     >> >>>>>>>         if count.read() == 0:
> >     >> >>>>>>>             logging.warning("###timer fired with count
> >     %d, window %s" % (count.read(), window))
> >     >> >>>>>>>
> >     >> >>>>>>>
> >     >> >>>>>>>
> >     >> >>>>>>> On Thu, Jul 25, 2019 at 5:09 AM Robert Bradshaw
> >     <rober...@google.com <mailto:rober...@google.com>> wrote:
> >     >> >>>>>>>>
> >     >> >>>>>>>> On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar
> >     <rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>> wrote:
> >     >> >>>>>>>> >
> >     >> >>>>>>>> > Thanks Robert,
> >     >> >>>>>>>> >
> >     >> >>>>>>>> >  I stumble on the jira that you have created some time
> ago
> >     >> >>>>>>>> > https://jira.apache.org/jira/browse/BEAM-5428
> >     >> >>>>>>>> >
> >     >> >>>>>>>> > You also marked code where code changes are required:
> >     >> >>>>>>>> >
> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
> >     >> >>>>>>>> >
> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
> >     >> >>>>>>>> >
> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
> >     >> >>>>>>>> >
> >     >> >>>>>>>> > I am willing to provide help to implement this. Let me
> >     know how I can help.
> >     >> >>>>>>>>
> >     >> >>>>>>>> As far as I'm aware, no one is actively working on it
> >     right now.
> >     >> >>>>>>>> Please feel free to assign yourself the JIRA entry and
> >     I'll be happy
> >     >> >>>>>>>> to answer any questions you might have if (well probably
> >     when) these
> >     >> >>>>>>>> pointers are insufficient.
> >     >> >>>>>>>>
> >     >> >>>>>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw
> >     <rober...@google.com <mailto:rober...@google.com>> wrote:
> >     >> >>>>>>>> >>
> >     >> >>>>>>>> >> This is documented at
> >     >> >>>>>>>> >>
> >
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
> >     >> >>>>>>>> >> . Note that it requires participation of both the
> >     runner and the SDK
> >     >> >>>>>>>> >> (though there are no correctness issues if one or the
> >     other side does
> >     >> >>>>>>>> >> not understand the protocol, caching just won't be
> used).
> >     >> >>>>>>>> >>
> >     >> >>>>>>>> >> I don't think it's been implemented anywhere, but
> >     could be very
> >     >> >>>>>>>> >> beneficial for performance.
> >     >> >>>>>>>> >>
> >     >> >>>>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar
> >     <rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>> wrote:
> >     >> >>>>>>>> >> >
> >     >> >>>>>>>> >> > I checked the python sdk[1] and it has similar
> >     implementation as Java SDK.
> >     >> >>>>>>>> >> >
> >     >> >>>>>>>> >> > I would agree with Thomas. In case of high volume
> >     event stream and bigger cluster size, network call can potentially
> >     cause a bottleneck.
> >     >> >>>>>>>> >> >
> >     >> >>>>>>>> >> > @Robert
> >     >> >>>>>>>> >> > I am interested to see the proposal. Can you
> >     provide me the link of the proposal?
> >     >> >>>>>>>> >> >
> >     >> >>>>>>>> >> > [1]:
> >
> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
> >     >> >>>>>>>> >> >
> >     >> >>>>>>>> >> >
> >     >> >>>>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise
> >     <t...@apache.org <mailto:t...@apache.org>> wrote:
> >     >> >>>>>>>> >> >>
> >     >> >>>>>>>> >> >> Thanks for the pointer. For streaming, it will be
> >     important to support caching across bundles. It appears that even
> >     the Java SDK doesn't support that yet?
> >     >> >>>>>>>> >> >>
> >     >> >>>>>>>> >> >>
> >
> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
> >     >> >>>>>>>> >> >>
> >     >> >>>>>>>> >> >> Regarding clear/append: It would be nice if both
> >     could occur within a single Fn Api roundtrip when the state is
> >     persisted.
> >     >> >>>>>>>> >> >>
> >     >> >>>>>>>> >> >> Thanks,
> >     >> >>>>>>>> >> >> Thomas
> >     >> >>>>>>>> >> >>
> >     >> >>>>>>>> >> >>
> >     >> >>>>>>>> >> >>
> >     >> >>>>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik
> >     <lc...@google.com <mailto:lc...@google.com>> wrote:
> >     >> >>>>>>>> >> >>>
> >     >> >>>>>>>> >> >>> User state is built on top of read, append and
> >     clear and not off a read and write paradigm to allow for blind
> appends.
> >     >> >>>>>>>> >> >>>
> >     >> >>>>>>>> >> >>> The optimization you speak of can be done
> >     completely inside the SDK without any additional protocol being
> >     required as long as you clear the state first and then append all
> >     your new data. The Beam Java SDK does this for all runners when
> >     executed portably[1]. You could port the same logic to the Beam
> >     Python SDK as well.
> >     >> >>>>>>>> >> >>>
> >     >> >>>>>>>> >> >>> 1:
> >
> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
> >     >> >>>>>>>> >> >>>
> >     >> >>>>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw
> >     <rober...@google.com <mailto:rober...@google.com>> wrote:
> >     >> >>>>>>>> >> >>>>
> >     >> >>>>>>>> >> >>>> Python workers also have a per-bundle SDK-side
> >     cache. A protocol has
> >     >> >>>>>>>> >> >>>> been proposed, but hasn't yet been implemented
> >     in any SDKs or runners.
> >     >> >>>>>>>> >> >>>>
> >     >> >>>>>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax
> >     <re...@google.com <mailto:re...@google.com>> wrote:
> >     >> >>>>>>>> >> >>>> >
> >     >> >>>>>>>> >> >>>> > It's runner dependent. Some runners (e.g. the
> >     Dataflow runner) do have such a cache, though I think it's currently
> >     has a cap for large bags.
> >     >> >>>>>>>> >> >>>> >
> >     >> >>>>>>>> >> >>>> > Reuven
> >     >> >>>>>>>> >> >>>> >
> >     >> >>>>>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar
> >     <rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>> wrote:
> >     >> >>>>>>>> >> >>>> >>
> >     >> >>>>>>>> >> >>>> >> Hi,
> >     >> >>>>>>>> >> >>>> >>
> >     >> >>>>>>>> >> >>>> >> I have been using python sdk for the
> >     application and also using BagState in production. I was wondering
> >     whether state logic has any write-through-cache implemented or not.
> >     If we are sending every read and write request through network then
> >     it comes with a performance cost. We can avoid network call for a
> >     read operation if we have write-through-cache.
> >     >> >>>>>>>> >> >>>> >> I have superficially looked into the
> >     implementation and I didn't see any cache implementation.
> >     >> >>>>>>>> >> >>>> >>
> >     >> >>>>>>>> >> >>>> >> is it possible to have this cache? would it
> >     cause any issue if we have the caching layer?
> >     >> >>>>>>>> >> >>>> >>
> >
>

Reply via email to