On Mon, Aug 12, 2019 at 10:09 AM Thomas Weise <t...@apache.org> wrote:

>
> On Mon, Aug 12, 2019 at 8:53 AM Maximilian Michels <m...@apache.org> wrote:
>
>> Thanks for starting this discussion Rakesh. An efficient cache layer is
>> one of the missing pieces for good performance in stateful pipelines.
>> The good news are that there is a level of caching already present in
>> Python which batches append requests until the bundle is finished.
>>
>> Thomas, in your example indeed we would have to profile to see why CPU
>> utilization is high on the Flink side but not in the Python SDK harness.
>> For example, older versions of Flink (<=1.5) have a high cost of
>> deleting existing instances of a timer when setting a timer.
>> Nevertheless, cross-bundle caching would likely result in increased
>> performance.
>>
>
> CPU on the Flink side was unchanged, and that's important. The throughout
> improvement comes from the extended bundle caching on the SDK side. That's
> what tells me that cross-bundle caching is needed. Of course, it will
> require a good solution for the write also and I like your idea of using
> the checkpoint boundary for that, especially since that already aligns with
> the bundle boundary and is under runner control. Of course we also want to
> be careful to not cause overly bursty writes.
>
> Profiling will be useful for the timer processing, that is also on my list
> of suspects.
>
>
>> Luke, I think the idea to merge pending state requests could be
>> complementary to caching across bundles.
>>
>> Question: Couldn't we defer flushing back state from the SDK to the
>> Runner indefinitely, provided that we add a way to flush the state in
>> case of a checkpoint?
>>
>
Flushing is needed to prevent the SDK from running out of memory. Having a
fixed budget for state inside the SDK would have flushing happen under
certain state usage scenarios.
I could also see that only flushing at checkpoint may lead to slow
checkpoint performance so we may want to flush state that hasn't been used
in a while as well.


> Another performance improvement would be caching read requests because
>> these first go to the Runner regardless of already cached appends.
>>
>> -Max
>>
>> On 09.08.19 17:12, Lukasz Cwik wrote:
>> >
>> >
>> > On Fri, Aug 9, 2019 at 2:32 AM Robert Bradshaw <rober...@google.com
>> > <mailto:rober...@google.com>> wrote:
>> >
>> >     The question is whether the SDK needs to wait for the StateResponse
>> to
>> >     come back before declaring the bundle done. The proposal was to not
>> >     send the cache token back as part of an append StateResponse [1],
>> but
>> >     pre-provide it as part of the bundle request.
>> >
>> >
>> > Agree, the purpose of the I'm Blocked message is to occur during bundle
>> > processing.
>> >
>> >
>> >     Thinking about this some more, if we assume the state response was
>> >     successfully applied, there's no reason for the SDK to block the
>> >     bundle until it has its hands on the cache token--we can update the
>> >     cache once the StateResponse comes back whether or not the bundle is
>> >     still active. On the other hand, the runner needs a way to assert it
>> >     has received and processed all StateRequests from the SDK associated
>> >     with a bundle before it can declare the bundle complete (regardless
>> of
>> >     the cache tokens), so this might not be safe without some extra
>> >     coordination (e.g. the ProcessBundleResponse indicating the number
>> of
>> >     state requests associated with a bundle).
>> >
>> >
>> > Since the state request stream is ordered, we can add the id of the last
>> > state request as part of the ProcessBundleResponse.
>> >
>> >
>> >     [1]
>> >
>> https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L627
>> >
>> >     On Thu, Aug 8, 2019 at 6:57 PM Lukasz Cwik <lc...@google.com
>> >     <mailto:lc...@google.com>> wrote:
>> >     >
>> >     > The purpose of the new state API call in BEAM-7000 is to tell the
>> >     runner that the SDK is now blocked waiting for the result of a
>> >     specific state request and it should be used for fetches (not
>> >     updates) and is there to allow for SDKs to differentiate readLater
>> >     (I will need this data at some point in time in the future) from
>> >     read (I need this data now). This comes up commonly where the user
>> >     prefetches multiple state cells and then looks at their content
>> >     allowing the runner to batch up those calls on its end.
>> >     >
>> >     > The way it can be used for clear+append is that the runner can
>> >     store requests in memory up until some time/memory limit or until it
>> >     gets its first "blocked" call and then issue all the requests
>> together.
>> >     >
>> >     >
>> >     > On Thu, Aug 8, 2019 at 9:42 AM Robert Bradshaw
>> >     <rober...@google.com <mailto:rober...@google.com>> wrote:
>> >     >>
>> >     >> On Tue, Aug 6, 2019 at 12:07 AM Thomas Weise <t...@apache.org
>> >     <mailto:t...@apache.org>> wrote:
>> >     >> >
>> >     >> > That would add a synchronization point that forces extra
>> >     latency especially in streaming mode.
>> >     >> >
>> >     >> > Wouldn't it be possible for the runner to assign the token when
>> >     starting the bundle and for the SDK to pass it along the state
>> >     requests? That way, there would be no need to batch and wait for a
>> >     flush.
>> >     >>
>> >     >> I think it makes sense to let the runner pre-assign these state
>> >     update
>> >     >> tokens rather than forcing a synchronization point.
>> >     >>
>> >     >> Here's some pointers for the Python implementation:
>> >     >>
>> >     >> Currently, when a DoFn needs UserState, a StateContext object is
>> used
>> >     >> that converts from a StateSpec to the actual value. When running
>> >     >> portably, this is FnApiUserStateContext [1]. The state handles
>> >     >> themselves are cached at [2] but this context only lives for the
>> >     >> lifetime of a single bundle. Logic could be added here to use the
>> >     >> token to share these across bundles.
>> >     >>
>> >     >> Each of these handles in turn invokes state_handler.get* methods
>> when
>> >     >> its read is called. (Here state_handler is a thin wrapper around
>> the
>> >     >> service itself) and constructs the appropriate result from the
>> >     >> StateResponse. We would need to implement caching at this level
>> as
>> >     >> well, including the deserialization. This will probably require
>> some
>> >     >> restructoring of how _StateBackedIterable is implemented (or,
>> >     >> possibly, making that class itself cache aware). Hopefully that's
>> >     >> enough to get started.
>> >     >>
>> >     >> [1]
>> >
>> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L402
>> >     >> [2]
>> >
>> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L436
>> >     >> .
>> >     >>
>> >     >> > On Mon, Aug 5, 2019 at 2:49 PM Lukasz Cwik <lc...@google.com
>> >     <mailto:lc...@google.com>> wrote:
>> >     >> >>
>> >     >> >> I believe the intent is to add a new state API call telling
>> >     the runner that it is blocked waiting for a response (BEAM-7000).
>> >     >> >>
>> >     >> >> This should allow the runner to wait till it sees one of these
>> >     I'm blocked requests and then merge + batch any state calls it may
>> >     have at that point in time allowing it to convert clear + appends
>> >     into set calls and do any other optimizations as well. By default,
>> >     the runner would have a time and space based limit on how many
>> >     outstanding state calls there are before choosing to resolve them.
>> >     >> >>
>> >     >> >> On Mon, Aug 5, 2019 at 5:43 PM Lukasz Cwik <lc...@google.com
>> >     <mailto:lc...@google.com>> wrote:
>> >     >> >>>
>> >     >> >>> Now I see what you mean.
>> >     >> >>>
>> >     >> >>> On Mon, Aug 5, 2019 at 5:42 PM Thomas Weise <t...@apache.org
>> >     <mailto:t...@apache.org>> wrote:
>> >     >> >>>>
>> >     >> >>>> Hi Luke,
>> >     >> >>>>
>> >     >> >>>> I guess the answer is that it depends on the state backend.
>> >     If a set operation in the state backend is available that is more
>> >     efficient than clear+append, then it would be beneficial to have a
>> >     dedicated fn api operation to allow for such optimization. That's
>> >     something that needs to be determined with a profiler :)
>> >     >> >>>>
>> >     >> >>>> But the low hanging fruit is cross-bundle caching.
>> >     >> >>>>
>> >     >> >>>> Thomas
>> >     >> >>>>
>> >     >> >>>> On Mon, Aug 5, 2019 at 2:06 PM Lukasz Cwik <
>> lc...@google.com
>> >     <mailto:lc...@google.com>> wrote:
>> >     >> >>>>>
>> >     >> >>>>> Thomas, why do you think a single round trip is needed?
>> >     >> >>>>>
>> >     >> >>>>> clear + append can be done blindly from the SDK side and it
>> >     has total knowledge of the state at that point in time till the end
>> >     of the bundle at which point you want to wait to get the cache token
>> >     back from the runner for the append call so that for the next bundle
>> >     you can reuse the state if the key wasn't processed elsewhere.
>> >     >> >>>>>
>> >     >> >>>>> Also, all state calls are "streamed" over gRPC so you don't
>> >     need to wait for clear to complete before being able to send append.
>> >     >> >>>>>
>> >     >> >>>>> On Tue, Jul 30, 2019 at 12:58 AM jincheng sun
>> >     <sunjincheng...@gmail.com <mailto:sunjincheng...@gmail.com>> wrote:
>> >     >> >>>>>>
>> >     >> >>>>>> Hi Rakesh,
>> >     >> >>>>>>
>> >     >> >>>>>> Glad to see you pointer this problem out!
>> >     >> >>>>>> +1 for add this implementation. Manage State by
>> >     write-through-cache is pretty important for Streaming job!
>> >     >> >>>>>>
>> >     >> >>>>>> Best, Jincheng
>> >     >> >>>>>>
>> >     >> >>>>>> Thomas Weise <t...@apache.org <mailto:t...@apache.org>> 于
>> >     2019年7月29日周一 下午8:54写道:
>> >     >> >>>>>>>
>> >     >> >>>>>>> FYI a basic test appears to confirm the importance of the
>> >     cross-bundle caching: I found that the throughput can be increased
>> >     by playing with the bundle size in the Flink runner. Default caps at
>> >     1000 elements (or 1 second). So on a high throughput stream the
>> >     bundles would be capped by the count limit. Bumping the count limit
>> >     increases the throughput by reducing the chatter over the state
>> >     plane (more cache hits due to larger bundle).
>> >     >> >>>>>>>
>> >     >> >>>>>>> The next level of investigation would involve profiling.
>> >     But just by looking at metrics, the CPU utilization on the Python
>> >     worker side dropped significantly while on the Flink side it remains
>> >     nearly same. There are no metrics for state operations on either
>> >     side, I think it would be very helpful to get these in place also.
>> >     >> >>>>>>>
>> >     >> >>>>>>> Below the stateful processing code for reference.
>> >     >> >>>>>>>
>> >     >> >>>>>>> Thomas
>> >     >> >>>>>>>
>> >     >> >>>>>>>
>> >     >> >>>>>>> class StatefulFn(beam.DoFn):
>> >     >> >>>>>>>     count_state_spec = userstate.CombiningValueStateSpec(
>> >     >> >>>>>>>         'count',
>> >     beam.coders.IterableCoder(beam.coders.VarIntCoder()), sum)
>> >     >> >>>>>>>     timer_spec = userstate.TimerSpec('timer',
>> >     userstate.TimeDomain.WATERMARK)
>> >     >> >>>>>>>
>> >     >> >>>>>>>     def process(self, kv,
>> >     count=beam.DoFn.StateParam(count_state_spec),
>> >     timer=beam.DoFn.TimerParam(timer_spec),
>> window=beam.DoFn.WindowParam):
>> >     >> >>>>>>>         count.add(1)
>> >     >> >>>>>>>         timer_seconds = (window.end.micros // 1000000) -
>> 1
>> >     >> >>>>>>>         timer.set(timer_seconds)
>> >     >> >>>>>>>
>> >     >> >>>>>>>     @userstate.on_timer(timer_spec)
>> >     >> >>>>>>>     def process_timer(self,
>> >     count=beam.DoFn.StateParam(count_state_spec),
>> >     window=beam.DoFn.WindowParam):
>> >     >> >>>>>>>         if count.read() == 0:
>> >     >> >>>>>>>             logging.warning("###timer fired with count
>> >     %d, window %s" % (count.read(), window))
>> >     >> >>>>>>>
>> >     >> >>>>>>>
>> >     >> >>>>>>>
>> >     >> >>>>>>> On Thu, Jul 25, 2019 at 5:09 AM Robert Bradshaw
>> >     <rober...@google.com <mailto:rober...@google.com>> wrote:
>> >     >> >>>>>>>>
>> >     >> >>>>>>>> On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar
>> >     <rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>> wrote:
>> >     >> >>>>>>>> >
>> >     >> >>>>>>>> > Thanks Robert,
>> >     >> >>>>>>>> >
>> >     >> >>>>>>>> >  I stumble on the jira that you have created some
>> time ago
>> >     >> >>>>>>>> > https://jira.apache.org/jira/browse/BEAM-5428
>> >     >> >>>>>>>> >
>> >     >> >>>>>>>> > You also marked code where code changes are required:
>> >     >> >>>>>>>> >
>> >
>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
>> >     >> >>>>>>>> >
>> >
>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
>> >     >> >>>>>>>> >
>> >
>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
>> >     >> >>>>>>>> >
>> >     >> >>>>>>>> > I am willing to provide help to implement this. Let me
>> >     know how I can help.
>> >     >> >>>>>>>>
>> >     >> >>>>>>>> As far as I'm aware, no one is actively working on it
>> >     right now.
>> >     >> >>>>>>>> Please feel free to assign yourself the JIRA entry and
>> >     I'll be happy
>> >     >> >>>>>>>> to answer any questions you might have if (well probably
>> >     when) these
>> >     >> >>>>>>>> pointers are insufficient.
>> >     >> >>>>>>>>
>> >     >> >>>>>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw
>> >     <rober...@google.com <mailto:rober...@google.com>> wrote:
>> >     >> >>>>>>>> >>
>> >     >> >>>>>>>> >> This is documented at
>> >     >> >>>>>>>> >>
>> >
>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
>> >     >> >>>>>>>> >> . Note that it requires participation of both the
>> >     runner and the SDK
>> >     >> >>>>>>>> >> (though there are no correctness issues if one or the
>> >     other side does
>> >     >> >>>>>>>> >> not understand the protocol, caching just won't be
>> used).
>> >     >> >>>>>>>> >>
>> >     >> >>>>>>>> >> I don't think it's been implemented anywhere, but
>> >     could be very
>> >     >> >>>>>>>> >> beneficial for performance.
>> >     >> >>>>>>>> >>
>> >     >> >>>>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar
>> >     <rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>> wrote:
>> >     >> >>>>>>>> >> >
>> >     >> >>>>>>>> >> > I checked the python sdk[1] and it has similar
>> >     implementation as Java SDK.
>> >     >> >>>>>>>> >> >
>> >     >> >>>>>>>> >> > I would agree with Thomas. In case of high volume
>> >     event stream and bigger cluster size, network call can potentially
>> >     cause a bottleneck.
>> >     >> >>>>>>>> >> >
>> >     >> >>>>>>>> >> > @Robert
>> >     >> >>>>>>>> >> > I am interested to see the proposal. Can you
>> >     provide me the link of the proposal?
>> >     >> >>>>>>>> >> >
>> >     >> >>>>>>>> >> > [1]:
>> >
>> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
>> >     >> >>>>>>>> >> >
>> >     >> >>>>>>>> >> >
>> >     >> >>>>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise
>> >     <t...@apache.org <mailto:t...@apache.org>> wrote:
>> >     >> >>>>>>>> >> >>
>> >     >> >>>>>>>> >> >> Thanks for the pointer. For streaming, it will be
>> >     important to support caching across bundles. It appears that even
>> >     the Java SDK doesn't support that yet?
>> >     >> >>>>>>>> >> >>
>> >     >> >>>>>>>> >> >>
>> >
>> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
>> >     >> >>>>>>>> >> >>
>> >     >> >>>>>>>> >> >> Regarding clear/append: It would be nice if both
>> >     could occur within a single Fn Api roundtrip when the state is
>> >     persisted.
>> >     >> >>>>>>>> >> >>
>> >     >> >>>>>>>> >> >> Thanks,
>> >     >> >>>>>>>> >> >> Thomas
>> >     >> >>>>>>>> >> >>
>> >     >> >>>>>>>> >> >>
>> >     >> >>>>>>>> >> >>
>> >     >> >>>>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik
>> >     <lc...@google.com <mailto:lc...@google.com>> wrote:
>> >     >> >>>>>>>> >> >>>
>> >     >> >>>>>>>> >> >>> User state is built on top of read, append and
>> >     clear and not off a read and write paradigm to allow for blind
>> appends.
>> >     >> >>>>>>>> >> >>>
>> >     >> >>>>>>>> >> >>> The optimization you speak of can be done
>> >     completely inside the SDK without any additional protocol being
>> >     required as long as you clear the state first and then append all
>> >     your new data. The Beam Java SDK does this for all runners when
>> >     executed portably[1]. You could port the same logic to the Beam
>> >     Python SDK as well.
>> >     >> >>>>>>>> >> >>>
>> >     >> >>>>>>>> >> >>> 1:
>> >
>> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
>> >     >> >>>>>>>> >> >>>
>> >     >> >>>>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw
>> >     <rober...@google.com <mailto:rober...@google.com>> wrote:
>> >     >> >>>>>>>> >> >>>>
>> >     >> >>>>>>>> >> >>>> Python workers also have a per-bundle SDK-side
>> >     cache. A protocol has
>> >     >> >>>>>>>> >> >>>> been proposed, but hasn't yet been implemented
>> >     in any SDKs or runners.
>> >     >> >>>>>>>> >> >>>>
>> >     >> >>>>>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax
>> >     <re...@google.com <mailto:re...@google.com>> wrote:
>> >     >> >>>>>>>> >> >>>> >
>> >     >> >>>>>>>> >> >>>> > It's runner dependent. Some runners (e.g. the
>> >     Dataflow runner) do have such a cache, though I think it's currently
>> >     has a cap for large bags.
>> >     >> >>>>>>>> >> >>>> >
>> >     >> >>>>>>>> >> >>>> > Reuven
>> >     >> >>>>>>>> >> >>>> >
>> >     >> >>>>>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar
>> >     <rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>> wrote:
>> >     >> >>>>>>>> >> >>>> >>
>> >     >> >>>>>>>> >> >>>> >> Hi,
>> >     >> >>>>>>>> >> >>>> >>
>> >     >> >>>>>>>> >> >>>> >> I have been using python sdk for the
>> >     application and also using BagState in production. I was wondering
>> >     whether state logic has any write-through-cache implemented or not.
>> >     If we are sending every read and write request through network then
>> >     it comes with a performance cost. We can avoid network call for a
>> >     read operation if we have write-through-cache.
>> >     >> >>>>>>>> >> >>>> >> I have superficially looked into the
>> >     implementation and I didn't see any cache implementation.
>> >     >> >>>>>>>> >> >>>> >>
>> >     >> >>>>>>>> >> >>>> >> is it possible to have this cache? would it
>> >     cause any issue if we have the caching layer?
>> >     >> >>>>>>>> >> >>>> >>
>> >
>>
>

Reply via email to