Open up a PR for the proto changes and we can work through any minor
comments there.

On Tue, Aug 27, 2019 at 11:33 AM Maximilian Michels <m...@apache.org> wrote:

> Thanks. Updated:
>
> message ProcessBundleRequest {
>   // (Required) A reference to the process bundle descriptor that must be  // 
> instantiated and executed by the SDK harness.  string 
> process_bundle_descriptor_reference = 1;
>
>   // A cache token which can be used by an SDK to check for the validity  // 
> of cached elements which have a cache token associated.  message CacheToken {
>
>     // A flag to indicate a cache token is valid for user state.    message 
> UserState {}
>
>     // A flag to indicate a cache token is valid for a side input.    message 
> SideInput {
>       // The id of a side input.      string side_input = 1;
>     }
>
>     // The scope of a cache token.    oneof type {
>       UserState user_state = 1;
>       SideInput side_input = 2;
>     }
>
>     // The cache token identifier which should be globally unique.    bytes 
> token = 10;
>   }
>
>   // (Optional) A list of cache tokens that can be used by an SDK to reuse  
> // cached data returned by the State API across multiple bundles.  repeated 
> CacheToken cache_tokens = 2;
> }
>
> On 27.08.19 19:22, Lukasz Cwik wrote:
>
> SideInputState -> SideInput (side_input_state -> side_input)
> + more comments around the messages and the fields.
>
>
> On Tue, Aug 27, 2019 at 10:18 AM Maximilian Michels <m...@apache.org>
> wrote:
>
>> We would have to differentiate cache tokens for user state and side
>> inputs. How about something like this?
>>
>> message ProcessBundleRequest {
>>   // (Required) A reference to the process bundle descriptor that must be  
>> // instantiated and executed by the SDK harness.  string 
>> process_bundle_descriptor_reference = 1;
>>
>>   message CacheToken {
>>
>>     message UserState {
>>     }
>>
>>     message SideInputState {
>>       string side_input_id = 1;
>>     }
>>
>>     oneof type {
>>       UserState user_state = 1;
>>       SideInputState side_input_state = 2;
>>     }
>>
>>     bytes token = 10;
>>   }
>>
>>   // (Optional) A list of cache tokens that can be used by an SDK to reuse  
>> // cached data returned by the State API across multiple bundles.  repeated 
>> CacheToken cache_tokens = 2;
>> }
>>
>> -Max
>>
>> On 27.08.19 18:43, Lukasz Cwik wrote:
>>
>> The bundles view of side inputs should never change during processing and
>> should have a point in time snapshot.
>>
>> I was just trying to say that the cache token for side inputs being
>> deferred till side input request time simplified the runners implementation
>> since that is conclusively when the runner would need to take a look at the
>> side input. Putting them as part of the ProcesBundleRequest complicates
>> that but does make the SDK implementation significantly simpler which is a
>> win.
>>
>> On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels <m...@apache.org>
>> wrote:
>>
>>> Thanks for the quick response.
>>>
>>> Just to clarify, the issue with versioning side input is also present
>>> when supplying the cache tokens on a request basis instead of per
>>> bundle. The SDK never knows when the Runner receives a new version of
>>> the side input. Like you pointed out, it needs to mark side inputs as
>>> stale and generate new cache tokens for the stale side inputs.
>>>
>>> The difference between per-request tokens and per-bundle tokens would be
>>> that the side input can only change after a bundle completes vs. during
>>> the bundle. Side inputs are always fuzzy in that regard because there is
>>> no precise instance where side inputs are atomically updated, other than
>>> the assumption that they eventually will be updated. In that regard
>>> per-bundle tokens for side input seem to be fine.
>>>
>>> All of the above is not an issue for user state, as its cache can remain
>>> valid for the lifetime of a Runner<=>SDK Harness connection. A simple
>>> solution would be to not cache side input because there are many cases
>>> where the caching just adds additional overhead. However, I can also
>>> imagine cases where side input is valid forever and caching would be
>>> very beneficial.
>>>
>>> For the first version I want to focus on user state because that's where
>>> I see the most benefit for caching. I don't see a problem though for the
>>> Runner to detect new side input and reflect that in the cache tokens
>>> supplied for a new bundle.
>>>
>>> -Max
>>>
>>> On 26.08.19 22:27, Lukasz Cwik wrote:
>>> > Your summary below makes sense to me. I can see that recovery from
>>> > rolling back doesn't need to be a priority and simplifies the solution
>>> > for user state caching down to one token.
>>> >
>>> > Providing cache tokens upfront does require the Runner to know what
>>> > "version" of everything it may supply to the SDK upfront (instead of
>>> on
>>> > request) which would mean that the Runner may need to have a mapping
>>> > from cache token to internal version identifier for things like side
>>> > inputs which are typically broadcast. The Runner would also need to
>>> poll
>>> > to see if the side input has changed in the background to not block
>>> > processing bundles with "stale" side input data.
>>> >
>>> > Ping me once you have the Runner PR updated and I'll take a look again.
>>> >
>>> > On Mon, Aug 26, 2019 at 12:20 PM Maximilian Michels <m...@apache.org
>>> > <mailto:m...@apache.org>> wrote:
>>> >
>>> >     Thank you for the summary Luke. I really appreciate the effort you
>>> put
>>> >     into this!
>>> >
>>> >      > Based upon your discussion you seem to want option #1
>>> >
>>> >     I'm actually for option #2. The option to cache/invalidate side
>>> inputs
>>> >     is important, and we should incorporate this in the design. That's
>>> why
>>> >     option #1 is not flexible enough. However, a first implementation
>>> could
>>> >     defer caching of side inputs.
>>> >
>>> >     Option #3 was my initial thinking and the first version of the PR,
>>> but I
>>> >     think we agreed that there wouldn't be much gain from keeping a
>>> cache
>>> >     token per state id.
>>> >
>>> >     Option #4 is what is specifically documented in the reference doc
>>> and
>>> >     already part of the Proto, where valid tokens are provided for
>>> each new
>>> >     bundle and also as part of the response of a get/put/clear. We
>>> mentioned
>>> >     that the reply does not have to be waited on synchronously (I
>>> mentioned
>>> >     it even), but it complicates the implementation. The idea Thomas
>>> and I
>>> >     expressed was that a response is not even necessary if we assume
>>> >     validity of the upfront provided cache tokens for the lifetime of a
>>> >     bundle and that cache tokens will be invalidated as soon as the
>>> Runner
>>> >     fails in any way. This is naturally the case for Flink because it
>>> will
>>> >     simply "forget" its current cache tokens.
>>> >
>>> >     I currently envision the following schema:
>>> >
>>> >     Runner
>>> >     ======
>>> >
>>> >     - Runner generates a globally unique cache token, one for user
>>> state and
>>> >     one for each side input
>>> >
>>> >     - The token is supplied to the SDK Harness for each bundle request
>>> >
>>> >     - For the lifetime of a Runner<=>SDK Harness connection this cache
>>> token
>>> >     will not change
>>> >     - Runner will generate a new token if the connection/key space
>>> changes
>>> >     between Runner and SDK Harness
>>> >
>>> >
>>> >     SDK
>>> >     ===
>>> >
>>> >     - For each bundle the SDK worker stores the list of valid cache
>>> tokens
>>> >     - The SDK Harness keep a global cache across all its (local)
>>> workers
>>> >     which is a LRU cache: state_key => (cache_token, value)
>>> >     - get: Lookup cache using the valid cache token for the state. If
>>> no
>>> >     match, then fetch from Runner and use the already available token
>>> for
>>> >     caching
>>> >     - put: Put value in cache with a valid cache token, put value to
>>> pending
>>> >     writes which will be flushed out latest when the bundle ends
>>> >     - clear: same as put but clear cache
>>> >
>>> >     It does look like this is not too far off from what you were
>>> describing.
>>> >     The main difference is that we just work with a single cache
>>> token. In
>>> >     my opinion we do not need the second cache token for writes, as
>>> long as
>>> >     we ensure that we generate a new cache token if the
>>> >     bundle/checkpoint fails.
>>> >
>>> >     I have a draft PR
>>> >        for the Runner: https://github.com/apache/beam/pull/9374
>>> >        for the SDK: https://github.com/apache/beam/pull/9418
>>> >
>>> >     Note that the Runner PR needs to be updated to fully reflected the
>>> above
>>> >     scheme. The SDK implementation is WIP. I want to make sure that we
>>> >     clarify the design before this gets finalized.
>>> >
>>> >     Thanks again for all your comments. Much appreciated!
>>> >
>>> >     Cheers,
>>> >     Max
>>> >
>>> >     On 26.08.19 19:58, Lukasz Cwik wrote:
>>> >      > There were originally a couple of ideas around how caching could
>>> >     work:
>>> >      > 1) One cache token for the entire bundle that is supplied up
>>> >     front. The
>>> >      > SDK caches everything using the given token. All
>>> >     reads/clear/append for
>>> >      > all types of state happen under this token. Anytime a side input
>>> >      > changes, key processing partition range changes or a bundle
>>> fails to
>>> >      > process, the runner chooses a new cache token effectively
>>> >     invalidating
>>> >      > everything in the past>
>>> >      > 2) One cache token per type of state that is supplied up front.
>>> >      > The SDK caches all requests for a given type using the given
>>> cache
>>> >      > token. The runner can selectively choose which type to keep and
>>> >     which to
>>> >      > invalidate. Bundle failure and key processing partition changes
>>> >      > invalidate all user state, side input change invalidates all
>>> side
>>> >     inputs.
>>> >      >
>>> >      > 3) One cache token per state id that is supplied up front.
>>> >      > The SDK caches all requests for the given state id using the
>>> >     given cache
>>> >      > token. The runner can selectively choose which to invalidate and
>>> >     which
>>> >      > to keep. Bundle failure and key processing partition changes
>>> >     invalidate
>>> >      > all user state, side input changes only invalidate the side
>>> input
>>> >     that
>>> >      > changed.
>>> >      >
>>> >      > 4) A cache token on each read/clear/append that is supplied on
>>> the
>>> >      > response of the call with an initial valid set that is supplied
>>> at
>>> >      > start. The runner can selectively choose which to keep on start.
>>> >     Bundle
>>> >      > failure allows runners to "roll back" to a known good state by
>>> >     selecting
>>> >      > the previous valid cache token as part of the initial set. Key
>>> >      > processing partition changes allow runners to keep cached state
>>> that
>>> >      > hasn't changed since it can be tied to a version number of the
>>> state
>>> >      > itself as part of the initial set. Side input changes only
>>> invalidate
>>> >      > the side input that changed.
>>> >      >
>>> >      > Based upon your discussion you seem to want option #1 which
>>> >     doesn't work
>>> >      > well with side inputs clearing cached state. If we want to have
>>> user
>>> >      > state survive a changing side input, we would want one of the
>>> other
>>> >      > options. I do agree that supplying the cache token upfront is
>>> >      > significantly simpler. Currently the protos are setup for #4
>>> since it
>>> >      > was the most flexible and at the time the pros outweighed the
>>> cons.
>>> >      >
>>> >      > I don't understand why you think you need to wait for a response
>>> >     for the
>>> >      > append/clear to get its cache token since the only reason you
>>> >     need the
>>> >      > cache token is that you want to use that cached data when
>>> >     processing a
>>> >      > different bundle. I was thinking that the flow on the SDK side
>>> >     would be
>>> >      > something like (assuming there is a global cache of cache token
>>> >     -> (map
>>> >      > of state key -> data))
>>> >      > 1) Create a local cache of (map of state key -> data) using the
>>> >     initial
>>> >      > set of valid cache tokens
>>> >      > 2) Make all mutations in place on local cache without waiting
>>> for
>>> >     response.
>>> >      > 3) When response comes back, update global cache with new cache
>>> >     token ->
>>> >      > (map of state key -> data)) (this is when the data becomes
>>> visible to
>>> >      > other bundles that start processing)
>>> >      > 4) Before the bundle finishes processing, wait for all
>>> >     outstanding state
>>> >      > calls to finish.
>>> >      >
>>> >      > To implement caching on the runner side, you would keep track of
>>> >     at most
>>> >      > 2 cache tokens per state key, one cache token represents the
>>> initial
>>> >      > value when the bundle started while the second represents the
>>> >     modified
>>> >      > state. If the bundle succeeds the runner passes in the set of
>>> tokens
>>> >      > which represent the new state, if the bundle fails you process
>>> >     using the
>>> >      > original ones.
>>> >      >
>>> >      > After thinking through the implementation again, we could
>>> supply two
>>> >      > cache tokens for each state id, the first being the set of
>>> initial
>>> >      > tokens if no writes happen while the second represents the token
>>> >     to use
>>> >      > if the SDK changes the state. This gives us the simplification
>>> >     where we
>>> >      > don't need to wait for the response before we update the global
>>> cache
>>> >      > making a typical blocking cache much easier to do. We also get
>>> the
>>> >      > benefit that runners can supply either the same cache token for
>>> a
>>> >     state
>>> >      > id or different ones. If the runner supplies the same one then
>>> its
>>> >      > telling the SDK to make modifications in place without any
>>> rollback
>>> >      > (which is good on memory since we are reducing copies of stuff)
>>> or if
>>> >      > the runner supplies two different ones then its telling the SDK
>>> >     to keep
>>> >      > the old data around. If we went through with this new option
>>> the SDK
>>> >      > side logic would be (assuming there is a global cache of cache
>>> >     token ->
>>> >      > (map of state key -> data)):
>>> >      >
>>> >      > 1) Create an empty local set of state ids that are dirty when
>>> >     starting a
>>> >      > new bundle (dirty set)
>>> >      >
>>> >      > For reads/gets:
>>> >      > 2A) If the request is a read (get), use dirty set to choose
>>> which
>>> >     cache
>>> >      > token to lookup and use in the global cache. If the global
>>> cache is
>>> >      > missing data issue the appropriate request providing the result.
>>> >      >
>>> >      > For writes/appends/clear:
>>> >      > 2B) if the cache tokens are different for the state id, add the
>>> >     state id
>>> >      > to the dirty set if it isn't there and perform the appropriate
>>> >      > modification to convert the old cached state data to the new
>>> >     state data
>>> >      > 3B) modify the global caches data
>>> >      > 4B) issue the request to the runner
>>> >      > 5B*) add this request to the set of requests to block on before
>>> >      > completing the bundle.
>>> >      >
>>> >      > (* Note, there was another idea to update the process bundle
>>> >     response to
>>> >      > contain the id of the last state request which would allow the
>>> >     runner to
>>> >      > know when it has seen the last state request allowing the SDK
>>> to not
>>> >      > block at all when finishing the bundle)
>>> >      >
>>> >      > On Thu, Aug 22, 2019 at 10:11 AM Maximilian Michels
>>> >     <m...@apache.org <mailto:m...@apache.org>
>>> >      > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>>> >      >
>>> >      >     Just to give a quick update here. Rakesh, Thomas, and I had
>>> a
>>> >     discussion
>>> >      >     about async writes from the Python SDK to the Runner. Robert
>>> >     was also
>>> >      >     present for some parts of the discussion.
>>> >      >
>>> >      >     We concluded that blocking writes with the need to refresh
>>> >     the cache
>>> >      >     token each time are not going to provide enough
>>> >     throughput/latency.
>>> >      >
>>> >      >     We figured that it will be enough to use a single cache
>>> token per
>>> >      >     Runner<=>SDK Harness connection. This cache token will be
>>> >     provided by
>>> >      >     the Runner in the ProcessBundleRequest. Writes will not
>>> yield
>>> >     a new
>>> >      >     cache token. The advantage is that we can use one cache
>>> token
>>> >     for the
>>> >      >     life time of the bundle and also across bundles, unless the
>>> >     Runner
>>> >      >     switches to a new Runner<=>SDK Harness connection; then the
>>> >     Runner would
>>> >      >     have to generate a new cache token.
>>> >      >
>>> >      >     We might require additional cache tokens for the side
>>> inputs.
>>> >     For now,
>>> >      >     I'm planning to only tackle user state which seems to be the
>>> >     area where
>>> >      >     users have expressed the most need for caching.
>>> >      >
>>> >      >     -Max
>>> >      >
>>> >      >     On 21.08.19 20:05, Maximilian Michels wrote:
>>> >      >     >> There is probably a misunderstanding here: I'm suggesting
>>> >     to use
>>> >      >     a worker ID instead of cache tokens, not additionally.
>>> >      >     >
>>> >      >     > Ah! Misread that. We need a changing token to indicate
>>> that the
>>> >      >     cache is
>>> >      >     > stale, e.g. checkpoint has failed / restoring from an old
>>> >      >     checkpoint. If
>>> >      >     > the _Runner_ generates a new unique token/id for workers
>>> >     which outlast
>>> >      >     > the Runner, then this should work fine. I don't think it
>>> is
>>> >     safe
>>> >      >     for the
>>> >      >     > worker to supply the id. The Runner should be in control
>>> of
>>> >     cache
>>> >      >     tokens
>>> >      >     > to avoid invalid tokens.
>>> >      >     >
>>> >      >     >> In the PR the token is modified as part of updating the
>>> state.
>>> >      >     Doesn't the SDK need the new token to update it's cache
>>> entry
>>> >     also?
>>> >      >     That's where it would help the SDK to know the new token
>>> upfront.
>>> >      >     >
>>> >      >     > If the state is updated in the Runner, a new token has to
>>> be
>>> >      >     generated.
>>> >      >     > The old one is not valid anymore. The SDK will use the
>>> updated
>>> >      >     token to
>>> >      >     > store the new value in the cache. I understand that it
>>> would be
>>> >      >     nice to
>>> >      >     > know the token upfront. That could be possible with some
>>> token
>>> >      >     > generation scheme. On the other hand, writes can be
>>> >     asynchronous and
>>> >      >     > thus not block the UDF.
>>> >      >     >
>>> >      >     >> But I believe there is no need to change the token in
>>> first
>>> >      >     place, unless bundles for the same key (ranges) can be
>>> >     processed by
>>> >      >     different workers.
>>> >      >     >
>>> >      >     > That's certainly possible, e.g. two workers A and B take
>>> turn
>>> >      >     processing
>>> >      >     > a certain key range, one bundle after another:
>>> >      >     >
>>> >      >     > You process a bundle with a token T with A, then worker B
>>> >     takes over.
>>> >      >     > Both have an entry with cache token T. So B goes on to
>>> >     modify the
>>> >      >     state
>>> >      >     > and uses the same cache token T. Then A takes over again.
>>> A
>>> >     would
>>> >      >     have a
>>> >      >     > stale cache entry but T would still be a valid cache
>>> token.
>>> >      >     >
>>> >      >     >> Indeed the fact that Dataflow can dynamically split and
>>> merge
>>> >      >     these ranges is what makes it trickier. If Flink does not
>>> >      >     repartition the ranges, then things are much easier.
>>> >      >     >
>>> >      >     > Flink does not dynamically repartition key ranges (yet).
>>> If
>>> >     it started
>>> >      >     > to support that, we would invalidate the cache tokens for
>>> >     the changed
>>> >      >     > partitions.
>>> >      >     >
>>> >      >     >
>>> >      >     > I'd suggest the following cache token generation scheme:
>>> >      >     >
>>> >      >     > One cache token per key range for user state and one cache
>>> >     token for
>>> >      >     > each side input. On writes to user state or changing side
>>> >     input, the
>>> >      >     > associated cache token will be renewed.
>>> >      >     >
>>> >      >     > On the SDK side, it should be sufficient to let the SDK
>>> >      >     re-associate all
>>> >      >     > its cached data belonging to a valid cache token with a
>>> new
>>> >     cache
>>> >      >     token
>>> >      >     > returned by a successful write. This has to happen in the
>>> >     active scope
>>> >      >     > (i.e. user state, or a particular side input).
>>> >      >     >
>>> >      >     > If the key range changes, new cache tokens have to
>>> >     generated. This
>>> >      >     > should happen automatically because the Runner does not
>>> >     checkpoint
>>> >      >     cache
>>> >      >     > tokens and will generate new ones when it restarts from an
>>> >     earlier
>>> >      >     > checkpoint.
>>> >      >     >
>>> >      >     > The current PR needs to be changed to (1) only keep a
>>> >     single cache
>>> >      >     token
>>> >      >     > per user state and key range (2) add support for cache
>>> >     tokens for each
>>> >      >     > side input.
>>> >      >     >
>>> >      >     > Hope that makes sense.
>>> >      >     >
>>> >      >     > -Max
>>> >      >     >
>>> >      >     > On 21.08.19 17:27, Reuven Lax wrote:
>>> >      >     >>
>>> >      >     >>
>>> >      >     >> On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels
>>> >      >     <m...@apache.org <mailto:m...@apache.org>
>>> >     <mailto:m...@apache.org <mailto:m...@apache.org>>
>>> >      >     >> <mailto:m...@apache.org <mailto:m...@apache.org>
>>> >     <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
>>> >      >     >>
>>> >      >     >>     Appreciate all your comments! Replying below.
>>> >      >     >>
>>> >      >     >>
>>> >      >     >>     @Luke:
>>> >      >     >>
>>> >      >     >>     > Having cache tokens per key would be very expensive
>>> >     indeed
>>> >      >     and I
>>> >      >     >>     believe we should go with a single cache token "per"
>>> >     bundle.
>>> >      >     >>
>>> >      >     >>     Thanks for your comments on the PR. I was thinking to
>>> >     propose
>>> >      >     something
>>> >      >     >>     along this lines of having cache tokens valid for a
>>> >     particular
>>> >      >     >>     checkpointing "epoch". That would require even less
>>> token
>>> >      >     renewal than
>>> >      >     >>     the per-bundle approach.
>>> >      >     >>
>>> >      >     >>
>>> >      >     >>     @Thomas, thanks for the input. Some remarks:
>>> >      >     >>
>>> >      >     >>     > Wouldn't it be simpler to have the runner just
>>> track a
>>> >      >     unique ID
>>> >      >     >>     for each worker and use that to communicate if the
>>> >     cache is
>>> >      >     valid or
>>> >      >     >>     not?
>>> >      >     >>
>>> >      >     >>     We do not need a unique id per worker. If a cache
>>> token is
>>> >      >     valid for a
>>> >      >     >>     particular worker, it is also valid for another
>>> >     worker. That
>>> >      >     is with the
>>> >      >     >>     assumption that key ranges are always disjoint
>>> between the
>>> >      >     workers.
>>> >      >     >>
>>> >      >     >>     > * When the bundle is started, the runner tells the
>>> >     worker
>>> >      >     if the
>>> >      >     >>     cache has become invalid (since it knows if another
>>> >     worker has
>>> >      >     >>     mutated state)
>>> >      >     >>
>>> >      >     >>     This is simply done by not transferring the
>>> particular
>>> >     cache
>>> >      >     token. No
>>> >      >     >>     need to declare it invalid explicitly.
>>> >      >     >>
>>> >      >     >>     > * When the worker sends mutation requests to the
>>> >     runner, it
>>> >      >     >>     includes its own ID (or the runner already has it as
>>> >     contextual
>>> >      >     >>     information). No need to wait for a response.
>>> >      >     >>
>>> >      >     >>     Mutations of cached values can be freely done as long
>>> >     as the
>>> >      >     cache token
>>> >      >     >>     associated with the state is valid for a particular
>>> >     bundle.
>>> >      >     Only the
>>> >      >     >>     first time, the Runner needs to wait on the response
>>> >     to store
>>> >      >     the cache
>>> >      >     >>     token. This can also be done asynchronously.
>>> >      >     >>
>>> >      >     >>     > * When the bundle is finished, the runner records
>>> >     the last
>>> >      >     writer
>>> >      >     >>     (only if a change occurred)
>>> >      >     >>
>>> >      >     >>     I believe this is not necessary because there will
>>> only be
>>> >      >     one writer at
>>> >      >     >>     a time for a particular bundle and key range, hence
>>> >     only one
>>> >      >     writer
>>> >      >     >>     holds a valid cache token for a particular state and
>>> >     key range.
>>> >      >     >>
>>> >      >     >>
>>> >      >     >>     @Reuven:
>>> >      >     >>
>>> >      >     >>     >  Dataflow divides the keyspace up into
>>> lexicographic
>>> >      >     ranges, and
>>> >      >     >>     creates a cache token per range.
>>> >      >     >>
>>> >      >     >>     State is always processed partitioned by the Flink
>>> workers
>>> >      >     (hash-based,
>>> >      >     >>     not lexicopgrahical). I don't think that matters
>>> though
>>> >      >     because the key
>>> >      >     >>     ranges do not overlap between the workers. Flink does
>>> >     not support
>>> >      >     >>     dynamically repartitioning the key ranges. Even in
>>> case of
>>> >      >     fine-grained
>>> >      >     >>     recovery of workers and their key ranges, we would
>>> simply
>>> >      >     generate new
>>> >      >     >>     cache tokens for a particular worker.
>>> >      >     >>
>>> >      >     >>
>>> >      >     >> Dataflow's ranges are also hash based. When I said
>>> >     lexicographical, I
>>> >      >     >> meant lexicographical based on the hexadecimal hash
>>> value.
>>> >      >     >>
>>> >      >     >> Indeed the fact that Dataflow can dynamically split and
>>> >     merge these
>>> >      >     >> ranges is what makes it trickier. If Flink does not
>>> >     repartition the
>>> >      >     >> ranges, then things are much easier.
>>> >      >     >>
>>> >      >     >>
>>> >      >     >>
>>> >      >     >>     Thanks,
>>> >      >     >>     Max
>>> >      >     >>
>>> >      >     >>     On 21.08.19 09:33, Reuven Lax wrote:
>>> >      >     >>     > Dataflow does something like this, however since
>>> work is
>>> >      >     >>     > load balanced across workers a per-worker id
>>> doesn't
>>> >     work
>>> >      >     very well.
>>> >      >     >>     > Dataflow divides the keyspace up into lexicographic
>>> >     ranges, and
>>> >      >     >>     creates
>>> >      >     >>     > a cache token per range.
>>> >      >     >>     >
>>> >      >     >>     > On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise
>>> >      >     <t...@apache.org <mailto:t...@apache.org>
>>> >     <mailto:t...@apache.org <mailto:t...@apache.org>>
>>> >      >     >>     <mailto:t...@apache.org <mailto:t...@apache.org>
>>> >     <mailto:t...@apache.org <mailto:t...@apache.org>>>
>>> >      >     >>     > <mailto:t...@apache.org <mailto:t...@apache.org>
>>> >     <mailto:t...@apache.org <mailto:t...@apache.org>>
>>> >      >     <mailto:t...@apache.org <mailto:t...@apache.org>
>>> >     <mailto:t...@apache.org <mailto:t...@apache.org>>>>> wrote:
>>> >      >     >>     >
>>> >      >     >>     >     Commenting here vs. on the PR since related to
>>> >     the overall
>>> >      >     >>     approach.
>>> >      >     >>     >
>>> >      >     >>     >     Wouldn't it be simpler to have the runner just
>>> >     track a
>>> >      >     unique
>>> >      >     >>     ID for
>>> >      >     >>     >     each worker and use that to communicate if the
>>> >     cache is
>>> >      >     valid
>>> >      >     >>     or not?
>>> >      >     >>     >
>>> >      >     >>     >     * When the bundle is started, the runner tells
>>> the
>>> >      >     worker if the
>>> >      >     >>     >     cache has become invalid (since it knows if
>>> another
>>> >      >     worker has
>>> >      >     >>     >     mutated state)
>>> >      >     >>     >     * When the worker sends mutation requests to
>>> the
>>> >     runner, it
>>> >      >     >>     includes
>>> >      >     >>     >     its own ID (or the runner already has it as
>>> >     contextual
>>> >      >     >>     information).
>>> >      >     >>     >     No need to wait for a response.
>>> >      >     >>     >     * When the bundle is finished, the runner
>>> >     records the
>>> >      >     last writer
>>> >      >     >>     >     (only if a change occurred)
>>> >      >     >>     >
>>> >      >     >>     >     Whenever current worker ID and last writer ID
>>> >     doesn't
>>> >      >     match, cache
>>> >      >     >>     >     is invalid.
>>> >      >     >>     >
>>> >      >     >>     >     Thomas
>>> >      >     >>     >
>>> >      >     >>     >
>>> >      >     >>     >     On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik
>>> >      >     <lc...@google.com <mailto:lc...@google.com>
>>> >     <mailto:lc...@google.com <mailto:lc...@google.com>>
>>> >      >     >>     <mailto:lc...@google.com <mailto:lc...@google.com>
>>> >     <mailto:lc...@google.com <mailto:lc...@google.com>>>
>>> >      >     >>     >     <mailto:lc...@google.com
>>> >     <mailto:lc...@google.com> <mailto:lc...@google.com
>>> >     <mailto:lc...@google.com>>
>>> >      >     <mailto:lc...@google.com <mailto:lc...@google.com>
>>> >     <mailto:lc...@google.com <mailto:lc...@google.com>>>>> wrote:
>>> >      >     >>     >
>>> >      >     >>     >         Having cache tokens per key would be very
>>> >     expensive
>>> >      >     indeed
>>> >      >     >>     and I
>>> >      >     >>     >         believe we should go with a single cache
>>> token
>>> >      >     "per" bundle.
>>> >      >     >>     >
>>> >      >     >>     >         On Mon, Aug 19, 2019 at 11:36 AM Maximilian
>>> >     Michels
>>> >      >     >>     >         <m...@apache.org <mailto:m...@apache.org>
>>> >     <mailto:m...@apache.org <mailto:m...@apache.org>>
>>> >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
>>> >     <mailto:m...@apache.org <mailto:m...@apache.org>>>
>>> >      >     >>     <mailto:m...@apache.org <mailto:m...@apache.org>
>>> >     <mailto:m...@apache.org <mailto:m...@apache.org>>
>>> >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
>>> >     <mailto:m...@apache.org <mailto:m...@apache.org>>>>> wrote:
>>> >      >     >>     >
>>> >      >     >>     >             Maybe a Beam Python expert can chime
>>> in for
>>> >      >     Rakesh's
>>> >      >     >>     question?
>>> >      >     >>     >
>>> >      >     >>     >             Luke, I was assuming cache tokens to be
>>> >     per key
>>> >      >     and state
>>> >      >     >>     >             id. During
>>> >      >     >>     >             implementing an initial support on the
>>> >     Runner
>>> >      >     side, I
>>> >      >     >>     >             realized that we
>>> >      >     >>     >             probably want cache tokens to only be
>>> >     per state
>>> >      >     id. Note
>>> >      >     >>     >             that if we had
>>> >      >     >>     >             per-key cache tokens, the number of
>>> cache
>>> >      >     tokens would
>>> >      >     >>     >             approach the
>>> >      >     >>     >             total number of keys in an application.
>>> >      >     >>     >
>>> >      >     >>     >             If anyone wants to have a look, here is
>>> >     a first
>>> >      >     version of
>>> >      >     >>     >             the Runner
>>> >      >     >>     >             side for cache tokens. Note that I only
>>> >      >     implemented cache
>>> >      >     >>     >             tokens for
>>> >      >     >>     >             BagUserState for now, but it can be
>>> easily
>>> >      >     added for side
>>> >      >     >>     >             inputs as well.
>>> >      >     >>     >
>>> >      >     >>     > https://github.com/apache/beam/pull/9374
>>> >      >     >>     >
>>> >      >     >>     >             -Max
>>> >      >     >>     >
>>> >      >     >>     >
>>> >      >     >>
>>> >      >
>>> >
>>>
>>

Reply via email to