For the purpose of my own understanding of the matter, I've created a document: https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/
It could make sense to clarify and specify things in there for now. I'm more than willing to consolidate this document with the caching section in the Fn API document. -Max On 14.08.19 17:13, Lukasz Cwik wrote: > Instead of starting a new doc, could we add/update the caching segment > of https://s.apache.org/beam-fn-state-api-and-bundle-processing? > > Everyone has comment access and all Apache Beam PMC can add themselves > to be editors since the doc is owned by the Apache Beam PMC gmail acocunt. > > On Wed, Aug 14, 2019 at 7:01 AM Maximilian Michels <m...@apache.org > <mailto:m...@apache.org>> wrote: > > Yes, that makes sense. What do you think about creating a document to > summarize the ideas presented here? Also, it would be good to capture > the status quo regarding caching in the Python SDK. > > -Max > > On 13.08.19 22:44, Thomas Weise wrote: > > The token would be needed in general to invalidate the cache when > > bundles are processed by different workers. > > > > In the case of the Flink runner we don't have a scenario of SDK worker > > surviving the runner in the case of a failure, so there is no > > possibility of inconsistent state as result of a checkpoint failure. > > > > -- > > sent from mobile > > > > On Tue, Aug 13, 2019, 1:18 PM Maximilian Michels <m...@apache.org > <mailto:m...@apache.org> > > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote: > > > > Thanks for clarifying. Cache-invalidation for side inputs > makes sense. > > > > In case the Runner fails to checkpoint, could it not > re-attempt the > > checkpoint? At least in the case of Flink, the cache would > still be > > valid until another checkpoint is attempted. For other Runners > that may > > not be the case. Also, rolling back state while keeping the > SDK Harness > > running requires to invalidate the cache. > > > > -Max > > > > On 13.08.19 18:09, Lukasz Cwik wrote: > > > > > > > > > On Tue, Aug 13, 2019 at 4:36 AM Maximilian Michels > <m...@apache.org <mailto:m...@apache.org> > > <mailto:m...@apache.org <mailto:m...@apache.org>> > > > <mailto:m...@apache.org <mailto:m...@apache.org> > <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote: > > > > > > Agree that we have to be able to flush before a > checkpoint to > > avoid > > > caching too many elements. Also good point about > checkpoint costs > > > increasing with flushing the cache on checkpoints. A LRU > cache > > policy in > > > the SDK seems desirable. > > > > > > What is the role of the cache token in the design > document[1]? > > It looks > > > to me that the token is used to give the Runner control over > > which and > > > how many elements can be cached by the SDK. Why is that > necessary? > > > Shouldn't this be up to the SDK? > > > > > > > > > We want to be able to handle the case where the SDK > completes the > > bundle > > > successfully but the runner fails to checkpoint the information. > > > We also want the runner to be able to pass in cache tokens > for things > > > like side inputs which may change over time (and the SDK > would not > > know > > > that this happened). > > > > > > > > > -Max > > > > > > [1] > > > > > > > https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m > > > > > > Is it simply to > > > On 12.08.19 19:55, Lukasz Cwik wrote: > > > > > > > > > > > > On Mon, Aug 12, 2019 at 10:09 AM Thomas Weise > > <t...@apache.org <mailto:t...@apache.org> <mailto:t...@apache.org > <mailto:t...@apache.org>> > > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>> > > > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>>>> wrote: > > > > > > > > > > > > On Mon, Aug 12, 2019 at 8:53 AM Maximilian Michels > > > <m...@apache.org <mailto:m...@apache.org> > <mailto:m...@apache.org <mailto:m...@apache.org>> > <mailto:m...@apache.org <mailto:m...@apache.org> > > <mailto:m...@apache.org <mailto:m...@apache.org>>> > > > > <mailto:m...@apache.org <mailto:m...@apache.org> > <mailto:m...@apache.org <mailto:m...@apache.org>> > > <mailto:m...@apache.org <mailto:m...@apache.org> > <mailto:m...@apache.org <mailto:m...@apache.org>>>>> wrote: > > > > > > > > Thanks for starting this discussion Rakesh. An > > efficient cache > > > > layer is > > > > one of the missing pieces for good performance in > > stateful > > > > pipelines. > > > > The good news are that there is a level of caching > > already > > > > present in > > > > Python which batches append requests until the > bundle is > > > finished. > > > > > > > > Thomas, in your example indeed we would have to > > profile to see > > > > why CPU > > > > utilization is high on the Flink side but not > in the > > > Python SDK > > > > harness. > > > > For example, older versions of Flink (<=1.5) > have a high > > > cost of > > > > deleting existing instances of a timer when > setting > > a timer. > > > > Nevertheless, cross-bundle caching would likely > > result in > > > increased > > > > performance. > > > > > > > > > > > > CPU on the Flink side was unchanged, and that's > > important. The > > > > throughout improvement comes from the extended bundle > > caching > > > on the > > > > SDK side. That's what tells me that cross-bundle > caching is > > > needed. > > > > Of course, it will require a good solution for the > write > > also > > > and I > > > > like your idea of using the checkpoint boundary > for that, > > > especially > > > > since that already aligns with the bundle boundary and > > is under > > > > runner control. Of course we also want to be > careful to > > not cause > > > > overly bursty writes. > > > > > > > > Profiling will be useful for the timer processing, > that > > is also on > > > > my list of suspects. > > > > > > > > > > > > Luke, I think the idea to merge pending state > requests > > > could be > > > > complementary to caching across bundles. > > > > > > > > Question: Couldn't we defer flushing back > state from the > > > SDK to the > > > > Runner indefinitely, provided that we add a way to > > flush the > > > > state in > > > > case of a checkpoint? > > > > > > > > > > > > Flushing is needed to prevent the SDK from running out of > > memory. > > > Having > > > > a fixed budget for state inside the SDK would have > flushing > > happen > > > under > > > > certain state usage scenarios. > > > > I could also see that only flushing at checkpoint may lead > > to slow > > > > checkpoint performance so we may want to flush state that > > hasn't been > > > > used in a while as well. > > > > > > > > > > > > Another performance improvement would be caching > > read requests > > > > because > > > > these first go to the Runner regardless of already > > cached > > > appends. > > > > > > > > -Max > > > > > > > > On 09.08.19 17:12, Lukasz Cwik wrote: > > > > > > > > > > > > > > > On Fri, Aug 9, 2019 at 2:32 AM Robert Bradshaw > > > > <rober...@google.com > <mailto:rober...@google.com> <mailto:rober...@google.com > <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>> > > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>>> > > > > > <mailto:rober...@google.com > <mailto:rober...@google.com> > > <mailto:rober...@google.com <mailto:rober...@google.com>> > <mailto:rober...@google.com <mailto:rober...@google.com> > > <mailto:rober...@google.com <mailto:rober...@google.com>>> > > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>>>>> wrote: > > > > > > > > > > The question is whether the SDK needs to > wait > > for the > > > > StateResponse to > > > > > come back before declaring the bundle done. > > The proposal > > > > was to not > > > > > send the cache token back as part of an > append > > > > StateResponse [1], but > > > > > pre-provide it as part of the bundle > request. > > > > > > > > > > > > > > > Agree, the purpose of the I'm Blocked message is > > to occur > > > > during bundle > > > > > processing. > > > > > > > > > > > > > > > Thinking about this some more, if we assume > > the state > > > > response was > > > > > successfully applied, there's no reason for > > the SDK to > > > > block the > > > > > bundle until it has its hands on the cache > > token--we can > > > > update the > > > > > cache once the StateResponse comes back > whether or > > > not the > > > > bundle is > > > > > still active. On the other hand, the runner > > needs a > > > way to > > > > assert it > > > > > has received and processed all > StateRequests from > > > the SDK > > > > associated > > > > > with a bundle before it can declare the > bundle > > complete > > > > (regardless of > > > > > the cache tokens), so this might not be safe > > without > > > some > > > > extra > > > > > coordination (e.g. the ProcessBundleResponse > > indicating > > > > the number of > > > > > state requests associated with a bundle). > > > > > > > > > > > > > > > Since the state request stream is ordered, > we can > > add the id > > > > of the last > > > > > state request as part of the > ProcessBundleResponse. > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L627 > > > > > > > > > > On Thu, Aug 8, 2019 at 6:57 PM Lukasz Cwik > > > > <lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>> > > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>>> > > > > > <mailto:lc...@google.com > <mailto:lc...@google.com> > > <mailto:lc...@google.com <mailto:lc...@google.com>> > <mailto:lc...@google.com <mailto:lc...@google.com> > > <mailto:lc...@google.com <mailto:lc...@google.com>>> > > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>>>>> wrote: > > > > > > > > > > > > The purpose of the new state API call in > > BEAM-7000 > > > is to > > > > tell the > > > > > runner that the SDK is now blocked > waiting for the > > > result of a > > > > > specific state request and it should be > used for > > > fetches (not > > > > > updates) and is there to allow for SDKs to > > differentiate > > > > readLater > > > > > (I will need this data at some point in time > > in the > > > > future) from > > > > > read (I need this data now). This comes up > > commonly > > > where > > > > the user > > > > > prefetches multiple state cells and then > looks > > at their > > > > content > > > > > allowing the runner to batch up those > calls on > > its end. > > > > > > > > > > > > The way it can be used for clear+append is > > that the > > > > runner can > > > > > store requests in memory up until some > > time/memory limit > > > > or until it > > > > > gets its first "blocked" call and then issue > > all the > > > > requests together. > > > > > > > > > > > > > > > > > > On Thu, Aug 8, 2019 at 9:42 AM Robert > Bradshaw > > > > > <rober...@google.com > <mailto:rober...@google.com> > > <mailto:rober...@google.com <mailto:rober...@google.com>> > <mailto:rober...@google.com <mailto:rober...@google.com> > > <mailto:rober...@google.com <mailto:rober...@google.com>>> > > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>>> > > > > <mailto:rober...@google.com > <mailto:rober...@google.com> > > <mailto:rober...@google.com <mailto:rober...@google.com>> > <mailto:rober...@google.com <mailto:rober...@google.com> > > <mailto:rober...@google.com <mailto:rober...@google.com>>> > > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>>>>> wrote: > > > > > >> > > > > > >> On Tue, Aug 6, 2019 at 12:07 AM > Thomas Weise > > > > <t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>> > > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>>> > > > > > <mailto:t...@apache.org > <mailto:t...@apache.org> <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>> > > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>>>>> wrote: > > > > > >> > > > > > > >> > That would add a synchronization > point that > > > forces extra > > > > > latency especially in streaming mode. > > > > > >> > > > > > > >> > Wouldn't it be possible for the > runner to > > > assign the > > > > token when > > > > > starting the bundle and for the SDK to > pass it > > along > > > the state > > > > > requests? That way, there would be no > need to > > batch and > > > > wait for a > > > > > flush. > > > > > >> > > > > > >> I think it makes sense to let the runner > > pre-assign > > > > these state > > > > > update > > > > > >> tokens rather than forcing a > > synchronization point. > > > > > >> > > > > > >> Here's some pointers for the Python > > implementation: > > > > > >> > > > > > >> Currently, when a DoFn needs UserState, a > > > StateContext > > > > object is used > > > > > >> that converts from a StateSpec to the > > actual value. > > > > When running > > > > > >> portably, this is FnApiUserStateContext > > [1]. The > > > state > > > > handles > > > > > >> themselves are cached at [2] but this > > context only > > > > lives for the > > > > > >> lifetime of a single bundle. Logic > could be > > added > > > here > > > > to use the > > > > > >> token to share these across bundles. > > > > > >> > > > > > >> Each of these handles in turn invokes > > > > state_handler.get* methods when > > > > > >> its read is called. (Here > state_handler is > > a thin > > > > wrapper around the > > > > > >> service itself) and constructs the > > appropriate result > > > > from the > > > > > >> StateResponse. We would need to implement > > caching at > > > > this level as > > > > > >> well, including the deserialization. > This will > > > probably > > > > require some > > > > > >> restructoring of how > _StateBackedIterable is > > > > implemented (or, > > > > > >> possibly, making that class itself cache > > aware). > > > > Hopefully that's > > > > > >> enough to get started. > > > > > >> > > > > > >> [1] > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L402 > > > > > >> [2] > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L436 > > > > > >> . > > > > > >> > > > > > >> > On Mon, Aug 5, 2019 at 2:49 PM > Lukasz Cwik > > > > <lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>> > > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>>> > > > > > <mailto:lc...@google.com > <mailto:lc...@google.com> > > <mailto:lc...@google.com <mailto:lc...@google.com>> > <mailto:lc...@google.com <mailto:lc...@google.com> > > <mailto:lc...@google.com <mailto:lc...@google.com>>> > > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>>>>> wrote: > > > > > >> >> > > > > > >> >> I believe the intent is to add a new > > state API > > > call > > > > telling > > > > > the runner that it is blocked waiting for a > > response > > > > (BEAM-7000). > > > > > >> >> > > >�� > > >> >> This should allow the runner to wait > > till it sees > > > > one of these > > > > > I'm blocked requests and then merge + batch > > any state > > > > calls it may > > > > > have at that point in time allowing it > to convert > > > clear + > > > > appends > > > > > into set calls and do any other > optimizations as > > > well. By > > > > default, > > > > > the runner would have a time and space > based limit > > > on how many > > > > > outstanding state calls there are before > > choosing to > > > > resolve them. > > > > > >> >> > > > > > >> >> On Mon, Aug 5, 2019 at 5:43 PM > Lukasz Cwik > > > > <lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>> > > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>>> > > > > > <mailto:lc...@google.com > <mailto:lc...@google.com> > > <mailto:lc...@google.com <mailto:lc...@google.com>> > <mailto:lc...@google.com <mailto:lc...@google.com> > > <mailto:lc...@google.com <mailto:lc...@google.com>>> > > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>>>>> wrote: > > > > > >> >>> > > > > > >> >>> Now I see what you mean. > > > > > >> >>> > > > > > >> >>> On Mon, Aug 5, 2019 at 5:42 PM > Thomas Weise > > > > <t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>> > > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>>> > > > > > <mailto:t...@apache.org > <mailto:t...@apache.org> <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>> > > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>>>>> wrote: > > > > > >> >>>> > > > > > >> >>>> Hi Luke, > > > > > >> >>>> > > > > > >> >>>> I guess the answer is that it > depends > > on the > > > state > > > > backend. > > > > > If a set operation in the state backend is > > available > > > that > > > > is more > > > > > efficient than clear+append, then it > would be > > beneficial > > > > to have a > > > > > dedicated fn api operation to allow for such > > > optimization. > > > > That's > > > > > something that needs to be determined with a > > profiler :) > > > > > >> >>>> > > > > > >> >>>> But the low hanging fruit is > cross-bundle > > > caching. > > > > > >> >>>> > > > > > >> >>>> Thomas > > > > > >> >>>> > > > > > >> >>>> On Mon, Aug 5, 2019 at 2:06 PM > Lukasz Cwik > > > > <lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>> > > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>>> > > > > > <mailto:lc...@google.com > <mailto:lc...@google.com> > > <mailto:lc...@google.com <mailto:lc...@google.com>> > <mailto:lc...@google.com <mailto:lc...@google.com> > > <mailto:lc...@google.com <mailto:lc...@google.com>>> > > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>>>>> wrote: > > > > > >> >>>>> > > > > > >> >>>>> Thomas, why do you think a single > > round trip is > > > > needed? > > > > > >> >>>>> > > > > > >> >>>>> clear + append can be done blindly > > from the SDK > > > > side and it > > > > > has total knowledge of the state at that > point > > in time > > > > till the end > > > > > of the bundle at which point you want to > wait > > to get the > > > > cache token > > > > > back from the runner for the append call so > > that for the > > > > next bundle > > > > > you can reuse the state if the key wasn't > > processed > > > elsewhere. > > > > > >> >>>>> > > > > > >> >>>>> Also, all state calls are > "streamed" over > > > gRPC so > > > > you don't > > > > > need to wait for clear to complete before > > being able to > > > > send append. > > > > > >> >>>>> > > > > > >> >>>>> On Tue, Jul 30, 2019 at 12:58 AM > > jincheng sun > > > > > <sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com> > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com>> > > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com> > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com>>> > > > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com> > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com>> > > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com> > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com>>>> > > > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com> > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com>> > > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com> > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com>>> > > > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com> > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com>> > > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com> > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com>>>>>> wrote: > > > > > >> >>>>>> > > > > > >> >>>>>> Hi Rakesh, > > > > > >> >>>>>> > > > > > >> >>>>>> Glad to see you pointer this > problem > > out! > > > > > >> >>>>>> +1 for add this > implementation. Manage > > > State by > > > > > write-through-cache is pretty important for > > > Streaming job! > > > > > >> >>>>>> > > > > > >> >>>>>> Best, Jincheng > > > > > >> >>>>>> > > > > > >> >>>>>> Thomas Weise <t...@apache.org > <mailto:t...@apache.org> > > <mailto:t...@apache.org <mailto:t...@apache.org>> > > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>> > > > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>>> > > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>> > > > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>>>>> 于 > > > > > 2019年7月29日周一 下午8:54写道: > > > > > >> >>>>>>> > > > > > >> >>>>>>> FYI a basic test appears to > confirm the > > > > importance of the > > > > > cross-bundle caching: I found that the > > throughput can be > > > > increased > > > > > by playing with the bundle size in the Flink > > runner. > > > > Default caps at > > > > > 1000 elements (or 1 second). So on a high > > throughput > > > > stream the > > > > > bundles would be capped by the count limit. > > Bumping the > > > > count limit > > > > > increases the throughput by reducing the > chatter > > > over the > > > > state > > > > > plane (more cache hits due to larger > bundle). > > > > > >> >>>>>>> > > > > > >> >>>>>>> The next level of investigation > > would involve > > > > profiling. > > > > > But just by looking at metrics, the CPU > > utilization > > > on the > > > > Python > > > > > worker side dropped significantly while > on the > > Flink > > > side > > > > it remains > > > > > nearly same. There are no metrics for state > > > operations on > > > > either > > > > > side, I think it would be very helpful > to get > > these in > > > > place also. > > > > > >> >>>>>>> > > > > > >> >>>>>>> Below the stateful processing > code for > > > reference. > > > > > >> >>>>>>> > > > > > >> >>>>>>> Thomas > > > > > >> >>>>>>> > > > > > >> >>>>>>> > > > > > >> >>>>>>> class StatefulFn(beam.DoFn): > > > > > >> >>>>>>> count_state_spec = > > > > userstate.CombiningValueStateSpec( > > > > > >> >>>>>>> 'count', > > > > > > > > beam.coders.IterableCoder(beam.coders.VarIntCoder()), sum) > > > > > >> >>>>>>> timer_spec = > > userstate.TimerSpec('timer', > > > > > userstate.TimeDomain.WATERMARK) > > > > > >> >>>>>>> > > > > > >> >>>>>>> def process(self, kv, > > > > > > count=beam.DoFn.StateParam(count_state_spec), > > > > > timer=beam.DoFn.TimerParam(timer_spec), > > > > window=beam.DoFn.WindowParam): > > > > > >> >>>>>>> count.add(1) > > > > > >> >>>>>>> timer_seconds = > > (window.end.micros // > > > > 1000000) - 1 > > > > > >> >>>>>>> timer.set(timer_seconds) > > > > > >> >>>>>>> > > > > > >> >>>>>>> > @userstate.on_timer(timer_spec) > > > > > >> >>>>>>> def process_timer(self, > > > > > > count=beam.DoFn.StateParam(count_state_spec), > > > > > window=beam.DoFn.WindowParam): > > > > > >> >>>>>>> if count.read() == 0: > > > > > >> >>>>>>> > > logging.warning("###timer fired > > > > with count > > > > > %d, window %s" % (count.read(), window)) > > > > > >> >>>>>>> > > > > > >> >>>>>>> > > > > > >> >>>>>>> > > > > > >> >>>>>>> On Thu, Jul 25, 2019 at 5:09 > AM Robert > > > Bradshaw > > > > > <rober...@google.com > <mailto:rober...@google.com> > > <mailto:rober...@google.com <mailto:rober...@google.com>> > <mailto:rober...@google.com <mailto:rober...@google.com> > > <mailto:rober...@google.com <mailto:rober...@google.com>>> > > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>>> > > > > <mailto:rober...@google.com > <mailto:rober...@google.com> > > <mailto:rober...@google.com <mailto:rober...@google.com>> > <mailto:rober...@google.com <mailto:rober...@google.com> > > <mailto:rober...@google.com <mailto:rober...@google.com>>> > > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>>>>> wrote: > > > > > >> >>>>>>>> > > > > > >> >>>>>>>> On Wed, Jul 24, 2019 at 6:21 AM > > Rakesh Kumar > > > > > <rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>> > > > <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com> <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com>> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>> > > > > <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>> > > > <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com> <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com>> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>>>> wrote: > > > > > >> >>>>>>>> > > > > > > >> >>>>>>>> > Thanks Robert, > > > > > >> >>>>>>>> > > > > > > >> >>>>>>>> > I stumble on the jira > that you have > > > created > > > > some time ago > > > > > >> >>>>>>>> > > > > https://jira.apache.org/jira/browse/BEAM-5428 > > > > > >> >>>>>>>> > > > > > > >> >>>>>>>> > You also marked code where > code > > > changes are > > > > required: > > > > > >> >>>>>>>> > > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291 > > > > > >> >>>>>>>> > > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349 > > > > > >> >>>>>>>> > > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465 > > > > > >> >>>>>>>> > > > > > > >> >>>>>>>> > I am willing to provide > help to > > implement > > > > this. Let me > > > > > know how I can help. > > > > > >> >>>>>>>> > > > > > >> >>>>>>>> As far as I'm aware, no one is > > actively > > > > working on it > > > > > right now. > > > > > >> >>>>>>>> Please feel free to assign > > yourself the JIRA > > > > entry and > > > > > I'll be happy > > > > > >> >>>>>>>> to answer any questions you > might > > have if > > > > (well probably > > > > > when) these > > > > > >> >>>>>>>> pointers are insufficient. > > > > > >> >>>>>>>> > > > > > >> >>>>>>>> > On Tue, Jul 23, 2019 at > 3:47 AM > > Robert > > > Bradshaw > > > > > <rober...@google.com > <mailto:rober...@google.com> > > <mailto:rober...@google.com <mailto:rober...@google.com>> > <mailto:rober...@google.com <mailto:rober...@google.com> > > <mailto:rober...@google.com <mailto:rober...@google.com>>> > > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>>> > > > > <mailto:rober...@google.com > <mailto:rober...@google.com> > > <mailto:rober...@google.com <mailto:rober...@google.com>> > <mailto:rober...@google.com <mailto:rober...@google.com> > > <mailto:rober...@google.com <mailto:rober...@google.com>>> > > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>>>>> wrote: > > > > > >> >>>>>>>> >> > > > > > >> >>>>>>>> >> This is documented at > > > > > >> >>>>>>>> >> > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m > > > > > >> >>>>>>>> >> . Note that it requires > > participation of > > > > both the > > > > > runner and the SDK > > > > > >> >>>>>>>> >> (though there are no > correctness > > > issues if > > > > one or the > > > > > other side does > > > > > >> >>>>>>>> >> not understand the protocol, > > caching just > > > > won't be used). > > > > > >> >>>>>>>> >> > > > > > >> >>>>>>>> >> I don't think it's been > implemented > > > > anywhere, but > > > > > could be very > > > > > >> >>>>>>>> >> beneficial for performance. > > > > > >> >>>>>>>> >> > > > > > >> >>>>>>>> >> On Wed, Jul 17, 2019 at > 6:00 PM > > > Rakesh Kumar > > > > > <rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>> > > > <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com> <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com>> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>> > > > > <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>> > > > <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com> <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com>> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>>>> wrote: > > > > > >> >>>>>>>> >> > > > > > > >> >>>>>>>> >> > I checked the python sdk[1] > > and it has > > > > similar > > > > > implementation as Java SDK. > > > > > >> >>>>>>>> >> > > > > > > >> >>>>>>>> >> > I would agree with > Thomas. In > > case of > > > > high volume > > > > > event stream and bigger cluster size, > network > > call can > > > > potentially > > > > > cause a bottleneck. > > > > > >> >>>>>>>> >> > > > > > > >> >>>>>>>> >> > @Robert > > > > > >> >>>>>>>> >> > I am interested to see the > > > proposal. Can you > > > > > provide me the link of the proposal? > > > > > >> >>>>>>>> >> > > > > > > >> >>>>>>>> >> > [1]: > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295 > > > > > >> >>>>>>>> >> > > > > > > >> >>>>>>>> >> > > > > > > >> >>>>>>>> >> > On Tue, Jul 16, 2019 at > 9:43 AM > > > Thomas Weise > > > > > <t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>> > > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>>> > > > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>> > > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>>>>> wrote: > > > > > >> >>>>>>>> >> >> > > > > > >> >>>>>>>> >> >> Thanks for the > pointer. For > > streaming, > > > > it will be > > > > > important to support caching across bundles. > > It appears > > > > that even > > > > > the Java SDK doesn't support that yet? > > > > > >> >>>>>>>> >> >> > > > > > >> >>>>>>>> >> >> > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221 > > > > > >> >>>>>>>> >> >> > > > > > >> >>>>>>>> >> >> Regarding > clear/append: It would > > > be nice > > > > if both > > > > > could occur within a single Fn Api > roundtrip when > > > the state is > > > > > persisted. > > > > > >> >>>>>>>> >> >> > > > > > >> >>>>>>>> >> >> Thanks, > > > > > >> >>>>>>>> >> >> Thomas > > > > > >> >>>>>>>> >> >> > > > > > >> >>>>>>>> >> >> > > > > > >> >>>>>>>> >> >> > > > > > >> >>>>>>>> >> >> On Tue, Jul 16, 2019 > at 6:58 AM > > > Lukasz Cwik > > > > > <lc...@google.com > <mailto:lc...@google.com> <mailto:lc...@google.com > <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>> > > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>>> > > > > <mailto:lc...@google.com > <mailto:lc...@google.com> <mailto:lc...@google.com > <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>> > > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>>>>> wrote: > > > > > >> >>>>>>>> >> >>> > > > > > >> >>>>>>>> >> >>> User state is built > on top > > of read, > > > > append and > > > > > clear and not off a read and write > paradigm to > > allow for > > > > blind appends. > > > > > >> >>>>>>>> >> >>> > > > > > >> >>>>>>>> >> >>> The optimization you > speak > > of can > > > be done > > > > > completely inside the SDK without any > additional > > > protocol > > > > being > > > > > required as long as you clear the state > first > > and then > > > > append all > > > > > your new data. The Beam Java SDK does > this for all > > > runners > > > > when > > > > > executed portably[1]. You could port the > same > > logic > > > to the > > > > Beam > > > > > Python SDK as well. > > > > > >> >>>>>>>> >> >>> > > > > > >> >>>>>>>> >> >>> 1: > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84 > > > > > >> >>>>>>>> >> >>> > > > > > >> >>>>>>>> >> >>> On Tue, Jul 16, 2019 > at 5:54 AM > > > Robert > > > > Bradshaw > > > > > <rober...@google.com > <mailto:rober...@google.com> > > <mailto:rober...@google.com <mailto:rober...@google.com>> > <mailto:rober...@google.com <mailto:rober...@google.com> > > <mailto:rober...@google.com <mailto:rober...@google.com>>> > > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>>> > > > > <mailto:rober...@google.com > <mailto:rober...@google.com> > > <mailto:rober...@google.com <mailto:rober...@google.com>> > <mailto:rober...@google.com <mailto:rober...@google.com> > > <mailto:rober...@google.com <mailto:rober...@google.com>>> > > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>>>>> wrote: > > > > > >> >>>>>>>> >> >>>> > > > > > >> >>>>>>>> >> >>>> Python workers also > have a > > > per-bundle > > > > SDK-side > > > > > cache. A protocol has > > > > > >> >>>>>>>> >> >>>> been proposed, but > hasn't > > yet been > > > > implemented > > > > > in any SDKs or runners. > > > > > >> >>>>>>>> >> >>>> > > > > > >> >>>>>>>> >> >>>> On Tue, Jul 16, 2019 at > > 6:02 AM > > > Reuven Lax > > > > > <re...@google.com > <mailto:re...@google.com> <mailto:re...@google.com > <mailto:re...@google.com>> > > <mailto:re...@google.com <mailto:re...@google.com> > <mailto:re...@google.com <mailto:re...@google.com>>> > > > <mailto:re...@google.com <mailto:re...@google.com> > <mailto:re...@google.com <mailto:re...@google.com>> > > <mailto:re...@google.com <mailto:re...@google.com> > <mailto:re...@google.com <mailto:re...@google.com>>>> > > > > <mailto:re...@google.com > <mailto:re...@google.com> <mailto:re...@google.com > <mailto:re...@google.com>> > > <mailto:re...@google.com <mailto:re...@google.com> > <mailto:re...@google.com <mailto:re...@google.com>>> > > > <mailto:re...@google.com <mailto:re...@google.com> > <mailto:re...@google.com <mailto:re...@google.com>> > > <mailto:re...@google.com <mailto:re...@google.com> > <mailto:re...@google.com <mailto:re...@google.com>>>>>> wrote: > > > > > >> >>>>>>>> >> >>>> > > > > > > >> >>>>>>>> >> >>>> > It's runner > dependent. Some > > > runners > > > > (e.g. the > > > > > Dataflow runner) do have such a cache, > though I > > > think it's > > > > currently > > > > > has a cap for large bags. > > > > > >> >>>>>>>> >> >>>> > > > > > > >> >>>>>>>> >> >>>> > Reuven > > > > > >> >>>>>>>> >> >>>> > > > > > > >> >>>>>>>> >> >>>> > On Mon, Jul 15, > 2019 at > > 8:48 PM > > > > Rakesh Kumar > > > > > <rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>> > > > <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com> <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com>> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>> > > > > <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>> > > > <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com> <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com>> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>>>> wrote: > > > > > >> >>>>>>>> >> >>>> >> > > > > > >> >>>>>>>> >> >>>> >> Hi, > > > > > >> >>>>>>>> >> >>>> >> > > > > > >> >>>>>>>> >> >>>> >> I have been using > > python sdk > > > for the > > > > > application and also using BagState in > > production. I was > > > > wondering > > > > > whether state logic has any > write-through-cache > > > > implemented or not. > > > > > If we are sending every read and write > request > > through > > > > network then > > > > > it comes with a performance cost. We can > avoid > > network > > > > call for a > > > > > read operation if we have > write-through-cache. > > > > > >> >>>>>>>> >> >>>> >> I have > superficially looked > > > into the > > > > > implementation and I didn't see any cache > > > implementation. > > > > > >> >>>>>>>> >> >>>> >> > > > > > >> >>>>>>>> >> >>>> >> is it possible to > have this > > > cache? > > > > would it > > > > > cause any issue if we have the caching > layer? > > > > > >> >>>>>>>> >> >>>> >> > > > > > > > > > > > > > > >