The token would be needed in general to invalidate the cache when bundles
are processed by different workers.

In the case of the Flink runner we don't have a scenario of SDK worker
surviving the runner in the case of a failure, so there is no possibility
of inconsistent state as result of a checkpoint failure.

--
sent from mobile

On Tue, Aug 13, 2019, 1:18 PM Maximilian Michels <m...@apache.org> wrote:

> Thanks for clarifying. Cache-invalidation for side inputs makes sense.
>
> In case the Runner fails to checkpoint, could it not re-attempt the
> checkpoint? At least in the case of Flink, the cache would still be
> valid until another checkpoint is attempted. For other Runners that may
> not be the case. Also, rolling back state while keeping the SDK Harness
> running requires to invalidate the cache.
>
> -Max
>
> On 13.08.19 18:09, Lukasz Cwik wrote:
> >
> >
> > On Tue, Aug 13, 2019 at 4:36 AM Maximilian Michels <m...@apache.org
> > <mailto:m...@apache.org>> wrote:
> >
> >     Agree that we have to be able to flush before a checkpoint to avoid
> >     caching too many elements. Also good point about checkpoint costs
> >     increasing with flushing the cache on checkpoints. A LRU cache
> policy in
> >     the SDK seems desirable.
> >
> >     What is the role of the cache token in the design document[1]? It
> looks
> >     to me that the token is used to give the Runner control over which
> and
> >     how many elements can be cached by the SDK. Why is that necessary?
> >     Shouldn't this be up to the SDK?
> >
> >
> > We want to be able to handle the case where the SDK completes the bundle
> > successfully but the runner fails to checkpoint the information.
> > We also want the runner to be able to pass in cache tokens for things
> > like side inputs which may change over time (and the SDK would not know
> > that this happened).
> >
> >
> >     -Max
> >
> >     [1]
> >
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
> >
> >     Is it simply to
> >     On 12.08.19 19:55, Lukasz Cwik wrote:
> >     >
> >     >
> >     > On Mon, Aug 12, 2019 at 10:09 AM Thomas Weise <t...@apache.org
> >     <mailto:t...@apache.org>
> >     > <mailto:t...@apache.org <mailto:t...@apache.org>>> wrote:
> >     >
> >     >
> >     >     On Mon, Aug 12, 2019 at 8:53 AM Maximilian Michels
> >     <m...@apache.org <mailto:m...@apache.org>
> >     >     <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
> >     >
> >     >         Thanks for starting this discussion Rakesh. An efficient
> cache
> >     >         layer is
> >     >         one of the missing pieces for good performance in stateful
> >     >         pipelines.
> >     >         The good news are that there is a level of caching already
> >     >         present in
> >     >         Python which batches append requests until the bundle is
> >     finished.
> >     >
> >     >         Thomas, in your example indeed we would have to profile to
> see
> >     >         why CPU
> >     >         utilization is high on the Flink side but not in the
> >     Python SDK
> >     >         harness.
> >     >         For example, older versions of Flink (<=1.5) have a high
> >     cost of
> >     >         deleting existing instances of a timer when setting a
> timer.
> >     >         Nevertheless, cross-bundle caching would likely result in
> >     increased
> >     >         performance.
> >     >
> >     >
> >     >     CPU on the Flink side was unchanged, and that's important. The
> >     >     throughout improvement comes from the extended bundle caching
> >     on the
> >     >     SDK side. That's what tells me that cross-bundle caching is
> >     needed.
> >     >     Of course, it will require a good solution for the write also
> >     and I
> >     >     like your idea of using the checkpoint boundary for that,
> >     especially
> >     >     since that already aligns with the bundle boundary and is under
> >     >     runner control. Of course we also want to be careful to not
> cause
> >     >     overly bursty writes.
> >     >
> >     >     Profiling will be useful for the timer processing, that is
> also on
> >     >     my list of suspects.
> >     >
> >     >
> >     >         Luke, I think the idea to merge pending state requests
> >     could be
> >     >         complementary to caching across bundles.
> >     >
> >     >         Question: Couldn't we defer flushing back state from the
> >     SDK to the
> >     >         Runner indefinitely, provided that we add a way to flush
> the
> >     >         state in
> >     >         case of a checkpoint?
> >     >
> >     >
> >     > Flushing is needed to prevent the SDK from running out of memory.
> >     Having
> >     > a fixed budget for state inside the SDK would have flushing happen
> >     under
> >     > certain state usage scenarios.
> >     > I could also see that only flushing at checkpoint may lead to slow
> >     > checkpoint performance so we may want to flush state that hasn't
> been
> >     > used in a while as well.
> >     >
> >     >
> >     >         Another performance improvement would be caching read
> requests
> >     >         because
> >     >         these first go to the Runner regardless of already cached
> >     appends.
> >     >
> >     >         -Max
> >     >
> >     >         On 09.08.19 17:12, Lukasz Cwik wrote:
> >     >         >
> >     >         >
> >     >         > On Fri, Aug 9, 2019 at 2:32 AM Robert Bradshaw
> >     >         <rober...@google.com <mailto:rober...@google.com>
> >     <mailto:rober...@google.com <mailto:rober...@google.com>>
> >     >         > <mailto:rober...@google.com <mailto:rober...@google.com>
> >     <mailto:rober...@google.com <mailto:rober...@google.com>>>> wrote:
> >     >         >
> >     >         >     The question is whether the SDK needs to wait for the
> >     >         StateResponse to
> >     >         >     come back before declaring the bundle done. The
> proposal
> >     >         was to not
> >     >         >     send the cache token back as part of an append
> >     >         StateResponse [1], but
> >     >         >     pre-provide it as part of the bundle request.
> >     >         >
> >     >         >
> >     >         > Agree, the purpose of the I'm Blocked message is to occur
> >     >         during bundle
> >     >         > processing.
> >     >         >
> >     >         >
> >     >         >     Thinking about this some more, if we assume the state
> >     >         response was
> >     >         >     successfully applied, there's no reason for the SDK
> to
> >     >         block the
> >     >         >     bundle until it has its hands on the cache token--we
> can
> >     >         update the
> >     >         >     cache once the StateResponse comes back whether or
> >     not the
> >     >         bundle is
> >     >         >     still active. On the other hand, the runner needs a
> >     way to
> >     >         assert it
> >     >         >     has received and processed all StateRequests from
> >     the SDK
> >     >         associated
> >     >         >     with a bundle before it can declare the bundle
> complete
> >     >         (regardless of
> >     >         >     the cache tokens), so this might not be safe without
> >     some
> >     >         extra
> >     >         >     coordination (e.g. the ProcessBundleResponse
> indicating
> >     >         the number of
> >     >         >     state requests associated with a bundle).
> >     >         >
> >     >         >
> >     >         > Since the state request stream is ordered, we can add
> the id
> >     >         of the last
> >     >         > state request as part of the ProcessBundleResponse.
> >     >         >
> >     >         >
> >     >         >     [1]
> >     >         >
> >     >
> >
> https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L627
> >     >         >
> >     >         >     On Thu, Aug 8, 2019 at 6:57 PM Lukasz Cwik
> >     >         <lc...@google.com <mailto:lc...@google.com>
> >     <mailto:lc...@google.com <mailto:lc...@google.com>>
> >     >         >     <mailto:lc...@google.com <mailto:lc...@google.com>
> >     <mailto:lc...@google.com <mailto:lc...@google.com>>>> wrote:
> >     >         >     >
> >     >         >     > The purpose of the new state API call in BEAM-7000
> >     is to
> >     >         tell the
> >     >         >     runner that the SDK is now blocked waiting for the
> >     result of a
> >     >         >     specific state request and it should be used for
> >     fetches (not
> >     >         >     updates) and is there to allow for SDKs to
> differentiate
> >     >         readLater
> >     >         >     (I will need this data at some point in time in the
> >     >         future) from
> >     >         >     read (I need this data now). This comes up commonly
> >     where
> >     >         the user
> >     >         >     prefetches multiple state cells and then looks at
> their
> >     >         content
> >     >         >     allowing the runner to batch up those calls on its
> end.
> >     >         >     >
> >     >         >     > The way it can be used for clear+append is that the
> >     >         runner can
> >     >         >     store requests in memory up until some time/memory
> limit
> >     >         or until it
> >     >         >     gets its first "blocked" call and then issue all the
> >     >         requests together.
> >     >         >     >
> >     >         >     >
> >     >         >     > On Thu, Aug 8, 2019 at 9:42 AM Robert Bradshaw
> >     >         >     <rober...@google.com <mailto:rober...@google.com>
> >     <mailto:rober...@google.com <mailto:rober...@google.com>>
> >     >         <mailto:rober...@google.com <mailto:rober...@google.com>
> >     <mailto:rober...@google.com <mailto:rober...@google.com>>>> wrote:
> >     >         >     >>
> >     >         >     >> On Tue, Aug 6, 2019 at 12:07 AM Thomas Weise
> >     >         <t...@apache.org <mailto:t...@apache.org>
> >     <mailto:t...@apache.org <mailto:t...@apache.org>>
> >     >         >     <mailto:t...@apache.org <mailto:t...@apache.org>
> >     <mailto:t...@apache.org <mailto:t...@apache.org>>>> wrote:
> >     >         >     >> >
> >     >         >     >> > That would add a synchronization point that
> >     forces extra
> >     >         >     latency especially in streaming mode.
> >     >         >     >> >
> >     >         >     >> > Wouldn't it be possible for the runner to
> >     assign the
> >     >         token when
> >     >         >     starting the bundle and for the SDK to pass it along
> >     the state
> >     >         >     requests? That way, there would be no need to batch
> and
> >     >         wait for a
> >     >         >     flush.
> >     >         >     >>
> >     >         >     >> I think it makes sense to let the runner
> pre-assign
> >     >         these state
> >     >         >     update
> >     >         >     >> tokens rather than forcing a synchronization
> point.
> >     >         >     >>
> >     >         >     >> Here's some pointers for the Python
> implementation:
> >     >         >     >>
> >     >         >     >> Currently, when a DoFn needs UserState, a
> >     StateContext
> >     >         object is used
> >     >         >     >> that converts from a StateSpec to the actual
> value.
> >     >         When running
> >     >         >     >> portably, this is FnApiUserStateContext [1]. The
> >     state
> >     >         handles
> >     >         >     >> themselves are cached at [2] but this context only
> >     >         lives for the
> >     >         >     >> lifetime of a single bundle. Logic could be added
> >     here
> >     >         to use the
> >     >         >     >> token to share these across bundles.
> >     >         >     >>
> >     >         >     >> Each of these handles in turn invokes
> >     >         state_handler.get* methods when
> >     >         >     >> its read is called. (Here state_handler is a thin
> >     >         wrapper around the
> >     >         >     >> service itself) and constructs the appropriate
> result
> >     >         from the
> >     >         >     >> StateResponse. We would need to implement caching
> at
> >     >         this level as
> >     >         >     >> well, including the deserialization. This will
> >     probably
> >     >         require some
> >     >         >     >> restructoring of how _StateBackedIterable is
> >     >         implemented (or,
> >     >         >     >> possibly, making that class itself cache aware).
> >     >         Hopefully that's
> >     >         >     >> enough to get started.
> >     >         >     >>
> >     >         >     >> [1]
> >     >         >
> >     >
> >
> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L402
> >     >         >     >> [2]
> >     >         >
> >     >
> >
> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L436
> >     >         >     >> .
> >     >         >     >>
> >     >         >     >> > On Mon, Aug 5, 2019 at 2:49 PM Lukasz Cwik
> >     >         <lc...@google.com <mailto:lc...@google.com>
> >     <mailto:lc...@google.com <mailto:lc...@google.com>>
> >     >         >     <mailto:lc...@google.com <mailto:lc...@google.com>
> >     <mailto:lc...@google.com <mailto:lc...@google.com>>>> wrote:
> >     >         >     >> >>
> >     >         >     >> >> I believe the intent is to add a new state API
> >     call
> >     >         telling
> >     >         >     the runner that it is blocked waiting for a response
> >     >         (BEAM-7000).
> >     >         >     >> >>
> >     >         >     >> >> This should allow the runner to wait till it
> sees
> >     >         one of these
> >     >         >     I'm blocked requests and then merge + batch any state
> >     >         calls it may
> >     >         >     have at that point in time allowing it to convert
> >     clear +
> >     >         appends
> >     >         >     into set calls and do any other optimizations as
> >     well. By
> >     >         default,
> >     >         >     the runner would have a time and space based limit
> >     on how many
> >     >         >     outstanding state calls there are before choosing to
> >     >         resolve them.
> >     >         >     >> >>
> >     >         >     >> >> On Mon, Aug 5, 2019 at 5:43 PM Lukasz Cwik
> >     >         <lc...@google.com <mailto:lc...@google.com>
> >     <mailto:lc...@google.com <mailto:lc...@google.com>>
> >     >         >     <mailto:lc...@google.com <mailto:lc...@google.com>
> >     <mailto:lc...@google.com <mailto:lc...@google.com>>>> wrote:
> >     >         >     >> >>>
> >     >         >     >> >>> Now I see what you mean.
> >     >         >     >> >>>
> >     >         >     >> >>> On Mon, Aug 5, 2019 at 5:42 PM Thomas Weise
> >     >         <t...@apache.org <mailto:t...@apache.org>
> >     <mailto:t...@apache.org <mailto:t...@apache.org>>
> >     >         >     <mailto:t...@apache.org <mailto:t...@apache.org>
> >     <mailto:t...@apache.org <mailto:t...@apache.org>>>> wrote:
> >     >         >     >> >>>>
> >     >         >     >> >>>> Hi Luke,
> >     >         >     >> >>>>
> >     >         >     >> >>>> I guess the answer is that it depends on the
> >     state
> >     >         backend.
> >     >         >     If a set operation in the state backend is available
> >     that
> >     >         is more
> >     >         >     efficient than clear+append, then it would be
> beneficial
> >     >         to have a
> >     >         >     dedicated fn api operation to allow for such
> >     optimization.
> >     >         That's
> >     >         >     something that needs to be determined with a
> profiler :)
> >     >         >     >> >>>>
> >     >         >     >> >>>> But the low hanging fruit is cross-bundle
> >     caching.
> >     >         >     >> >>>>
> >     >         >     >> >>>> Thomas
> >     >         >     >> >>>>
> >     >         >     >> >>>> On Mon, Aug 5, 2019 at 2:06 PM Lukasz Cwik
> >     >         <lc...@google.com <mailto:lc...@google.com>
> >     <mailto:lc...@google.com <mailto:lc...@google.com>>
> >     >         >     <mailto:lc...@google.com <mailto:lc...@google.com>
> >     <mailto:lc...@google.com <mailto:lc...@google.com>>>> wrote:
> >     >         >     >> >>>>>
> >     >         >     >> >>>>> Thomas, why do you think a single round
> trip is
> >     >         needed?
> >     >         >     >> >>>>>
> >     >         >     >> >>>>> clear + append can be done blindly from the
> SDK
> >     >         side and it
> >     >         >     has total knowledge of the state at that point in
> time
> >     >         till the end
> >     >         >     of the bundle at which point you want to wait to get
> the
> >     >         cache token
> >     >         >     back from the runner for the append call so that for
> the
> >     >         next bundle
> >     >         >     you can reuse the state if the key wasn't processed
> >     elsewhere.
> >     >         >     >> >>>>>
> >     >         >     >> >>>>> Also, all state calls are "streamed" over
> >     gRPC so
> >     >         you don't
> >     >         >     need to wait for clear to complete before being able
> to
> >     >         send append.
> >     >         >     >> >>>>>
> >     >         >     >> >>>>> On Tue, Jul 30, 2019 at 12:58 AM jincheng
> sun
> >     >         >     <sunjincheng...@gmail.com
> >     <mailto:sunjincheng...@gmail.com>
> >     >         <mailto:sunjincheng...@gmail.com
> >     <mailto:sunjincheng...@gmail.com>>
> >     >         <mailto:sunjincheng...@gmail.com
> >     <mailto:sunjincheng...@gmail.com>
> >     >         <mailto:sunjincheng...@gmail.com
> >     <mailto:sunjincheng...@gmail.com>>>> wrote:
> >     >         >     >> >>>>>>
> >     >         >     >> >>>>>> Hi Rakesh,
> >     >         >     >> >>>>>>
> >     >         >     >> >>>>>> Glad to see you pointer this problem out!
> >     >         >     >> >>>>>> +1 for add this implementation. Manage
> >     State by
> >     >         >     write-through-cache is pretty important for
> >     Streaming job!
> >     >         >     >> >>>>>>
> >     >         >     >> >>>>>> Best, Jincheng
> >     >         >     >> >>>>>>
> >     >         >     >> >>>>>> Thomas Weise <t...@apache.org
> >     <mailto:t...@apache.org>
> >     >         <mailto:t...@apache.org <mailto:t...@apache.org>>
> >     <mailto:t...@apache.org <mailto:t...@apache.org>
> >     >         <mailto:t...@apache.org <mailto:t...@apache.org>>>> 于
> >     >         >     2019年7月29日周一 下午8:54写道:
> >     >         >     >> >>>>>>>
> >     >         >     >> >>>>>>> FYI a basic test appears to confirm the
> >     >         importance of the
> >     >         >     cross-bundle caching: I found that the throughput
> can be
> >     >         increased
> >     >         >     by playing with the bundle size in the Flink runner.
> >     >         Default caps at
> >     >         >     1000 elements (or 1 second). So on a high throughput
> >     >         stream the
> >     >         >     bundles would be capped by the count limit. Bumping
> the
> >     >         count limit
> >     >         >     increases the throughput by reducing the chatter
> >     over the
> >     >         state
> >     >         >     plane (more cache hits due to larger bundle).
> >     >         >     >> >>>>>>>
> >     >         >     >> >>>>>>> The next level of investigation would
> involve
> >     >         profiling.
> >     >         >     But just by looking at metrics, the CPU utilization
> >     on the
> >     >         Python
> >     >         >     worker side dropped significantly while on the Flink
> >     side
> >     >         it remains
> >     >         >     nearly same. There are no metrics for state
> >     operations on
> >     >         either
> >     >         >     side, I think it would be very helpful to get these
> in
> >     >         place also.
> >     >         >     >> >>>>>>>
> >     >         >     >> >>>>>>> Below the stateful processing code for
> >     reference.
> >     >         >     >> >>>>>>>
> >     >         >     >> >>>>>>> Thomas
> >     >         >     >> >>>>>>>
> >     >         >     >> >>>>>>>
> >     >         >     >> >>>>>>> class StatefulFn(beam.DoFn):
> >     >         >     >> >>>>>>>     count_state_spec =
> >     >         userstate.CombiningValueStateSpec(
> >     >         >     >> >>>>>>>         'count',
> >     >         >
> >      beam.coders.IterableCoder(beam.coders.VarIntCoder()), sum)
> >     >         >     >> >>>>>>>     timer_spec =
> userstate.TimerSpec('timer',
> >     >         >     userstate.TimeDomain.WATERMARK)
> >     >         >     >> >>>>>>>
> >     >         >     >> >>>>>>>     def process(self, kv,
> >     >         >     count=beam.DoFn.StateParam(count_state_spec),
> >     >         >     timer=beam.DoFn.TimerParam(timer_spec),
> >     >         window=beam.DoFn.WindowParam):
> >     >         >     >> >>>>>>>         count.add(1)
> >     >         >     >> >>>>>>>         timer_seconds =
> (window.end.micros //
> >     >         1000000) - 1
> >     >         >     >> >>>>>>>         timer.set(timer_seconds)
> >     >         >     >> >>>>>>>
> >     >         >     >> >>>>>>>     @userstate.on_timer(timer_spec)
> >     >         >     >> >>>>>>>     def process_timer(self,
> >     >         >     count=beam.DoFn.StateParam(count_state_spec),
> >     >         >     window=beam.DoFn.WindowParam):
> >     >         >     >> >>>>>>>         if count.read() == 0:
> >     >         >     >> >>>>>>>             logging.warning("###timer
> fired
> >     >         with count
> >     >         >     %d, window %s" % (count.read(), window))
> >     >         >     >> >>>>>>>
> >     >         >     >> >>>>>>>
> >     >         >     >> >>>>>>>
> >     >         >     >> >>>>>>> On Thu, Jul 25, 2019 at 5:09 AM Robert
> >     Bradshaw
> >     >         >     <rober...@google.com <mailto:rober...@google.com>
> >     <mailto:rober...@google.com <mailto:rober...@google.com>>
> >     >         <mailto:rober...@google.com <mailto:rober...@google.com>
> >     <mailto:rober...@google.com <mailto:rober...@google.com>>>> wrote:
> >     >         >     >> >>>>>>>>
> >     >         >     >> >>>>>>>> On Wed, Jul 24, 2019 at 6:21 AM Rakesh
> Kumar
> >     >         >     <rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>
> >     <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>
> >     >         <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>
> >     <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>> wrote:
> >     >         >     >> >>>>>>>> >
> >     >         >     >> >>>>>>>> > Thanks Robert,
> >     >         >     >> >>>>>>>> >
> >     >         >     >> >>>>>>>> >  I stumble on the jira that you have
> >     created
> >     >         some time ago
> >     >         >     >> >>>>>>>> >
> >     https://jira.apache.org/jira/browse/BEAM-5428
> >     >         >     >> >>>>>>>> >
> >     >         >     >> >>>>>>>> > You also marked code where code
> >     changes are
> >     >         required:
> >     >         >     >> >>>>>>>> >
> >     >         >
> >     >
> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
> >     >         >     >> >>>>>>>> >
> >     >         >
> >     >
> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
> >     >         >     >> >>>>>>>> >
> >     >         >
> >     >
> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
> >     >         >     >> >>>>>>>> >
> >     >         >     >> >>>>>>>> > I am willing to provide help to
> implement
> >     >         this. Let me
> >     >         >     know how I can help.
> >     >         >     >> >>>>>>>>
> >     >         >     >> >>>>>>>> As far as I'm aware, no one is actively
> >     >         working on it
> >     >         >     right now.
> >     >         >     >> >>>>>>>> Please feel free to assign yourself the
> JIRA
> >     >         entry and
> >     >         >     I'll be happy
> >     >         >     >> >>>>>>>> to answer any questions you might have if
> >     >         (well probably
> >     >         >     when) these
> >     >         >     >> >>>>>>>> pointers are insufficient.
> >     >         >     >> >>>>>>>>
> >     >         >     >> >>>>>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert
> >     Bradshaw
> >     >         >     <rober...@google.com <mailto:rober...@google.com>
> >     <mailto:rober...@google.com <mailto:rober...@google.com>>
> >     >         <mailto:rober...@google.com <mailto:rober...@google.com>
> >     <mailto:rober...@google.com <mailto:rober...@google.com>>>> wrote:
> >     >         >     >> >>>>>>>> >>
> >     >         >     >> >>>>>>>> >> This is documented at
> >     >         >     >> >>>>>>>> >>
> >     >         >
> >     >
> >
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
> >     >         >     >> >>>>>>>> >> . Note that it requires participation
> of
> >     >         both the
> >     >         >     runner and the SDK
> >     >         >     >> >>>>>>>> >> (though there are no correctness
> >     issues if
> >     >         one or the
> >     >         >     other side does
> >     >         >     >> >>>>>>>> >> not understand the protocol, caching
> just
> >     >         won't be used).
> >     >         >     >> >>>>>>>> >>
> >     >         >     >> >>>>>>>> >> I don't think it's been implemented
> >     >         anywhere, but
> >     >         >     could be very
> >     >         >     >> >>>>>>>> >> beneficial for performance.
> >     >         >     >> >>>>>>>> >>
> >     >         >     >> >>>>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM
> >     Rakesh Kumar
> >     >         >     <rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>
> >     <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>
> >     >         <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>
> >     <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>> wrote:
> >     >         >     >> >>>>>>>> >> >
> >     >         >     >> >>>>>>>> >> > I checked the python sdk[1] and it
> has
> >     >         similar
> >     >         >     implementation as Java SDK.
> >     >         >     >> >>>>>>>> >> >
> >     >         >     >> >>>>>>>> >> > I would agree with Thomas. In case
> of
> >     >         high volume
> >     >         >     event stream and bigger cluster size, network call
> can
> >     >         potentially
> >     >         >     cause a bottleneck.
> >     >         >     >> >>>>>>>> >> >
> >     >         >     >> >>>>>>>> >> > @Robert
> >     >         >     >> >>>>>>>> >> > I am interested to see the
> >     proposal. Can you
> >     >         >     provide me the link of the proposal?
> >     >         >     >> >>>>>>>> >> >
> >     >         >     >> >>>>>>>> >> > [1]:
> >     >         >
> >     >
> >
> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
> >     >         >     >> >>>>>>>> >> >
> >     >         >     >> >>>>>>>> >> >
> >     >         >     >> >>>>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM
> >     Thomas Weise
> >     >         >     <t...@apache.org <mailto:t...@apache.org>
> >     <mailto:t...@apache.org <mailto:t...@apache.org>>
> >     >         <mailto:t...@apache.org <mailto:t...@apache.org>
> >     <mailto:t...@apache.org <mailto:t...@apache.org>>>> wrote:
> >     >         >     >> >>>>>>>> >> >>
> >     >         >     >> >>>>>>>> >> >> Thanks for the pointer. For
> streaming,
> >     >         it will be
> >     >         >     important to support caching across bundles. It
> appears
> >     >         that even
> >     >         >     the Java SDK doesn't support that yet?
> >     >         >     >> >>>>>>>> >> >>
> >     >         >     >> >>>>>>>> >> >>
> >     >         >
> >     >
> >
> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
> >     >         >     >> >>>>>>>> >> >>
> >     >         >     >> >>>>>>>> >> >> Regarding clear/append: It would
> >     be nice
> >     >         if both
> >     >         >     could occur within a single Fn Api roundtrip when
> >     the state is
> >     >         >     persisted.
> >     >         >     >> >>>>>>>> >> >>
> >     >         >     >> >>>>>>>> >> >> Thanks,
> >     >         >     >> >>>>>>>> >> >> Thomas
> >     >         >     >> >>>>>>>> >> >>
> >     >         >     >> >>>>>>>> >> >>
> >     >         >     >> >>>>>>>> >> >>
> >     >         >     >> >>>>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM
> >     Lukasz Cwik
> >     >         >     <lc...@google.com <mailto:lc...@google.com>
> >     <mailto:lc...@google.com <mailto:lc...@google.com>>
> >     >         <mailto:lc...@google.com <mailto:lc...@google.com>
> >     <mailto:lc...@google.com <mailto:lc...@google.com>>>> wrote:
> >     >         >     >> >>>>>>>> >> >>>
> >     >         >     >> >>>>>>>> >> >>> User state is built on top of
> read,
> >     >         append and
> >     >         >     clear and not off a read and write paradigm to allow
> for
> >     >         blind appends.
> >     >         >     >> >>>>>>>> >> >>>
> >     >         >     >> >>>>>>>> >> >>> The optimization you speak of can
> >     be done
> >     >         >     completely inside the SDK without any additional
> >     protocol
> >     >         being
> >     >         >     required as long as you clear the state first and
> then
> >     >         append all
> >     >         >     your new data. The Beam Java SDK does this for all
> >     runners
> >     >         when
> >     >         >     executed portably[1]. You could port the same logic
> >     to the
> >     >         Beam
> >     >         >     Python SDK as well.
> >     >         >     >> >>>>>>>> >> >>>
> >     >         >     >> >>>>>>>> >> >>> 1:
> >     >         >
> >     >
> >
> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
> >     >         >     >> >>>>>>>> >> >>>
> >     >         >     >> >>>>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM
> >     Robert
> >     >         Bradshaw
> >     >         >     <rober...@google.com <mailto:rober...@google.com>
> >     <mailto:rober...@google.com <mailto:rober...@google.com>>
> >     >         <mailto:rober...@google.com <mailto:rober...@google.com>
> >     <mailto:rober...@google.com <mailto:rober...@google.com>>>> wrote:
> >     >         >     >> >>>>>>>> >> >>>>
> >     >         >     >> >>>>>>>> >> >>>> Python workers also have a
> >     per-bundle
> >     >         SDK-side
> >     >         >     cache. A protocol has
> >     >         >     >> >>>>>>>> >> >>>> been proposed, but hasn't yet
> been
> >     >         implemented
> >     >         >     in any SDKs or runners.
> >     >         >     >> >>>>>>>> >> >>>>
> >     >         >     >> >>>>>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM
> >     Reuven Lax
> >     >         >     <re...@google.com <mailto:re...@google.com>
> >     <mailto:re...@google.com <mailto:re...@google.com>>
> >     >         <mailto:re...@google.com <mailto:re...@google.com>
> >     <mailto:re...@google.com <mailto:re...@google.com>>>> wrote:
> >     >         >     >> >>>>>>>> >> >>>> >
> >     >         >     >> >>>>>>>> >> >>>> > It's runner dependent. Some
> >     runners
> >     >         (e.g. the
> >     >         >     Dataflow runner) do have such a cache, though I
> >     think it's
> >     >         currently
> >     >         >     has a cap for large bags.
> >     >         >     >> >>>>>>>> >> >>>> >
> >     >         >     >> >>>>>>>> >> >>>> > Reuven
> >     >         >     >> >>>>>>>> >> >>>> >
> >     >         >     >> >>>>>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM
> >     >         Rakesh Kumar
> >     >         >     <rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>
> >     <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>
> >     >         <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>
> >     <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>> wrote:
> >     >         >     >> >>>>>>>> >> >>>> >>
> >     >         >     >> >>>>>>>> >> >>>> >> Hi,
> >     >         >     >> >>>>>>>> >> >>>> >>
> >     >         >     >> >>>>>>>> >> >>>> >> I have been using python sdk
> >     for the
> >     >         >     application and also using BagState in production. I
> was
> >     >         wondering
> >     >         >     whether state logic has any write-through-cache
> >     >         implemented or not.
> >     >         >     If we are sending every read and write request
> through
> >     >         network then
> >     >         >     it comes with a performance cost. We can avoid
> network
> >     >         call for a
> >     >         >     read operation if we have write-through-cache.
> >     >         >     >> >>>>>>>> >> >>>> >> I have superficially looked
> >     into the
> >     >         >     implementation and I didn't see any cache
> >     implementation.
> >     >         >     >> >>>>>>>> >> >>>> >>
> >     >         >     >> >>>>>>>> >> >>>> >> is it possible to have this
> >     cache?
> >     >         would it
> >     >         >     cause any issue if we have the caching layer?
> >     >         >     >> >>>>>>>> >> >>>> >>
> >     >         >
> >     >
> >
>

Reply via email to