Thanks for the quick response.

Just to clarify, the issue with versioning side input is also present when supplying the cache tokens on a request basis instead of per bundle. The SDK never knows when the Runner receives a new version of the side input. Like you pointed out, it needs to mark side inputs as stale and generate new cache tokens for the stale side inputs.

The difference between per-request tokens and per-bundle tokens would be that the side input can only change after a bundle completes vs. during the bundle. Side inputs are always fuzzy in that regard because there is no precise instance where side inputs are atomically updated, other than the assumption that they eventually will be updated. In that regard per-bundle tokens for side input seem to be fine.

All of the above is not an issue for user state, as its cache can remain valid for the lifetime of a Runner<=>SDK Harness connection. A simple solution would be to not cache side input because there are many cases where the caching just adds additional overhead. However, I can also imagine cases where side input is valid forever and caching would be very beneficial.

For the first version I want to focus on user state because that's where I see the most benefit for caching. I don't see a problem though for the Runner to detect new side input and reflect that in the cache tokens supplied for a new bundle.

-Max

On 26.08.19 22:27, Lukasz Cwik wrote:
Your summary below makes sense to me. I can see that recovery from rolling back doesn't need to be a priority and simplifies the solution for user state caching down to one token.

Providing cache tokens upfront does require the Runner to know what "version" of everything it may supply to the SDK upfront (instead of on request) which would mean that the Runner may need to have a mapping from cache token to internal version identifier for things like side inputs which are typically broadcast. The Runner would also need to poll to see if the side input has changed in the background to not block processing bundles with "stale" side input data.

Ping me once you have the Runner PR updated and I'll take a look again.

On Mon, Aug 26, 2019 at 12:20 PM Maximilian Michels <m...@apache.org <mailto:m...@apache.org>> wrote:

    Thank you for the summary Luke. I really appreciate the effort you put
    into this!

     > Based upon your discussion you seem to want option #1

    I'm actually for option #2. The option to cache/invalidate side inputs
    is important, and we should incorporate this in the design. That's why
    option #1 is not flexible enough. However, a first implementation could
    defer caching of side inputs.

    Option #3 was my initial thinking and the first version of the PR, but I
    think we agreed that there wouldn't be much gain from keeping a cache
    token per state id.

    Option #4 is what is specifically documented in the reference doc and
    already part of the Proto, where valid tokens are provided for each new
    bundle and also as part of the response of a get/put/clear. We mentioned
    that the reply does not have to be waited on synchronously (I mentioned
    it even), but it complicates the implementation. The idea Thomas and I
    expressed was that a response is not even necessary if we assume
    validity of the upfront provided cache tokens for the lifetime of a
    bundle and that cache tokens will be invalidated as soon as the Runner
    fails in any way. This is naturally the case for Flink because it will
    simply "forget" its current cache tokens.

    I currently envision the following schema:

    Runner
    ======

    - Runner generates a globally unique cache token, one for user state and
    one for each side input

- The token is supplied to the SDK Harness for each bundle request
    - For the lifetime of a Runner<=>SDK Harness connection this cache token
    will not change
    - Runner will generate a new token if the connection/key space changes
    between Runner and SDK Harness


    SDK
    ===

    - For each bundle the SDK worker stores the list of valid cache tokens
    - The SDK Harness keep a global cache across all its (local) workers
    which is a LRU cache: state_key => (cache_token, value)
    - get: Lookup cache using the valid cache token for the state. If no
    match, then fetch from Runner and use the already available token for
    caching
    - put: Put value in cache with a valid cache token, put value to pending
    writes which will be flushed out latest when the bundle ends
    - clear: same as put but clear cache

    It does look like this is not too far off from what you were describing.
    The main difference is that we just work with a single cache token. In
    my opinion we do not need the second cache token for writes, as long as
    we ensure that we generate a new cache token if the
    bundle/checkpoint fails.

    I have a draft PR
       for the Runner: https://github.com/apache/beam/pull/9374
       for the SDK: https://github.com/apache/beam/pull/9418

    Note that the Runner PR needs to be updated to fully reflected the above
    scheme. The SDK implementation is WIP. I want to make sure that we
    clarify the design before this gets finalized.

    Thanks again for all your comments. Much appreciated!

    Cheers,
    Max

    On 26.08.19 19:58, Lukasz Cwik wrote:
     > There were originally a couple of ideas around how caching could
    work:
     > 1) One cache token for the entire bundle that is supplied up
    front. The
     > SDK caches everything using the given token. All
    reads/clear/append for
     > all types of state happen under this token. Anytime a side input
     > changes, key processing partition range changes or a bundle fails to
     > process, the runner chooses a new cache token effectively
    invalidating
     > everything in the past>
     > 2) One cache token per type of state that is supplied up front.
     > The SDK caches all requests for a given type using the given cache
     > token. The runner can selectively choose which type to keep and
    which to
     > invalidate. Bundle failure and key processing partition changes
     > invalidate all user state, side input change invalidates all side
    inputs.
     >
     > 3) One cache token per state id that is supplied up front.
     > The SDK caches all requests for the given state id using the
    given cache
     > token. The runner can selectively choose which to invalidate and
    which
     > to keep. Bundle failure and key processing partition changes
    invalidate
     > all user state, side input changes only invalidate the side input
    that
     > changed.
     >
     > 4) A cache token on each read/clear/append that is supplied on the
     > response of the call with an initial valid set that is supplied at
     > start. The runner can selectively choose which to keep on start.
    Bundle
     > failure allows runners to "roll back" to a known good state by
    selecting
     > the previous valid cache token as part of the initial set. Key
     > processing partition changes allow runners to keep cached state that
     > hasn't changed since it can be tied to a version number of the state
     > itself as part of the initial set. Side input changes only invalidate
     > the side input that changed.
     >
     > Based upon your discussion you seem to want option #1 which
    doesn't work
     > well with side inputs clearing cached state. If we want to have user
     > state survive a changing side input, we would want one of the other
     > options. I do agree that supplying the cache token upfront is
     > significantly simpler. Currently the protos are setup for #4 since it
     > was the most flexible and at the time the pros outweighed the cons.
     >
     > I don't understand why you think you need to wait for a response
    for the
     > append/clear to get its cache token since the only reason you
    need the
     > cache token is that you want to use that cached data when
    processing a
     > different bundle. I was thinking that the flow on the SDK side
    would be
     > something like (assuming there is a global cache of cache token
    -> (map
     > of state key -> data))
     > 1) Create a local cache of (map of state key -> data) using the
    initial
     > set of valid cache tokens
     > 2) Make all mutations in place on local cache without waiting for
    response.
     > 3) When response comes back, update global cache with new cache
    token ->
     > (map of state key -> data)) (this is when the data becomes visible to
     > other bundles that start processing)
     > 4) Before the bundle finishes processing, wait for all
    outstanding state
     > calls to finish.
     >
     > To implement caching on the runner side, you would keep track of
    at most
     > 2 cache tokens per state key, one cache token represents the initial
     > value when the bundle started while the second represents the
    modified
     > state. If the bundle succeeds the runner passes in the set of tokens
     > which represent the new state, if the bundle fails you process
    using the
     > original ones.
     >
     > After thinking through the implementation again, we could supply two
     > cache tokens for each state id, the first being the set of initial
     > tokens if no writes happen while the second represents the token
    to use
     > if the SDK changes the state. This gives us the simplification
    where we
     > don't need to wait for the response before we update the global cache
     > making a typical blocking cache much easier to do. We also get the
     > benefit that runners can supply either the same cache token for a
    state
     > id or different ones. If the runner supplies the same one then its
     > telling the SDK to make modifications in place without any rollback
     > (which is good on memory since we are reducing copies of stuff) or if
     > the runner supplies two different ones then its telling the SDK
    to keep
     > the old data around. If we went through with this new option the SDK
     > side logic would be (assuming there is a global cache of cache
    token ->
     > (map of state key -> data)):
     >
     > 1) Create an empty local set of state ids that are dirty when
    starting a
     > new bundle (dirty set)
     >
     > For reads/gets:
     > 2A) If the request is a read (get), use dirty set to choose which
    cache
     > token to lookup and use in the global cache. If the global cache is
     > missing data issue the appropriate request providing the result.
     >
     > For writes/appends/clear:
     > 2B) if the cache tokens are different for the state id, add the
    state id
     > to the dirty set if it isn't there and perform the appropriate
     > modification to convert the old cached state data to the new
    state data
     > 3B) modify the global caches data
     > 4B) issue the request to the runner
     > 5B*) add this request to the set of requests to block on before
     > completing the bundle.
     >
     > (* Note, there was another idea to update the process bundle
    response to
     > contain the id of the last state request which would allow the
    runner to
     > know when it has seen the last state request allowing the SDK to not
     > block at all when finishing the bundle)
     >
     > On Thu, Aug 22, 2019 at 10:11 AM Maximilian Michels
    <m...@apache.org <mailto:m...@apache.org>
     > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
     >
     >     Just to give a quick update here. Rakesh, Thomas, and I had a
    discussion
     >     about async writes from the Python SDK to the Runner. Robert
    was also
     >     present for some parts of the discussion.
     >
     >     We concluded that blocking writes with the need to refresh
    the cache
     >     token each time are not going to provide enough
    throughput/latency.
     >
     >     We figured that it will be enough to use a single cache token per
     >     Runner<=>SDK Harness connection. This cache token will be
    provided by
     >     the Runner in the ProcessBundleRequest. Writes will not yield
    a new
     >     cache token. The advantage is that we can use one cache token
    for the
     >     life time of the bundle and also across bundles, unless the
    Runner
     >     switches to a new Runner<=>SDK Harness connection; then the
    Runner would
     >     have to generate a new cache token.
     >
     >     We might require additional cache tokens for the side inputs.
    For now,
     >     I'm planning to only tackle user state which seems to be the
    area where
     >     users have expressed the most need for caching.
     >
     >     -Max
     >
     >     On 21.08.19 20:05, Maximilian Michels wrote:
     >     >> There is probably a misunderstanding here: I'm suggesting
    to use
     >     a worker ID instead of cache tokens, not additionally.
     >     >
     >     > Ah! Misread that. We need a changing token to indicate that the
     >     cache is
     >     > stale, e.g. checkpoint has failed / restoring from an old
     >     checkpoint. If
     >     > the _Runner_ generates a new unique token/id for workers
    which outlast
     >     > the Runner, then this should work fine. I don't think it is
    safe
     >     for the
     >     > worker to supply the id. The Runner should be in control of
    cache
     >     tokens
     >     > to avoid invalid tokens.
     >     >
     >     >> In the PR the token is modified as part of updating the state.
     >     Doesn't the SDK need the new token to update it's cache entry
    also?
     >     That's where it would help the SDK to know the new token upfront.
     >     >
     >     > If the state is updated in the Runner, a new token has to be
     >     generated.
     >     > The old one is not valid anymore. The SDK will use the updated
     >     token to
     >     > store the new value in the cache. I understand that it would be
     >     nice to
     >     > know the token upfront. That could be possible with some token
     >     > generation scheme. On the other hand, writes can be
    asynchronous and
     >     > thus not block the UDF.
     >     >
     >     >> But I believe there is no need to change the token in first
     >     place, unless bundles for the same key (ranges) can be
    processed by
     >     different workers.
     >     >
     >     > That's certainly possible, e.g. two workers A and B take turn
     >     processing
     >     > a certain key range, one bundle after another:
     >     >
     >     > You process a bundle with a token T with A, then worker B
    takes over.
     >     > Both have an entry with cache token T. So B goes on to
    modify the
     >     state
     >     > and uses the same cache token T. Then A takes over again. A
    would
     >     have a
     >     > stale cache entry but T would still be a valid cache token.
     >     >
     >     >> Indeed the fact that Dataflow can dynamically split and merge
     >     these ranges is what makes it trickier. If Flink does not
     >     repartition the ranges, then things are much easier.
     >     >
     >     > Flink does not dynamically repartition key ranges (yet). If
    it started
     >     > to support that, we would invalidate the cache tokens for
    the changed
     >     > partitions.
     >     >
     >     >
     >     > I'd suggest the following cache token generation scheme:
     >     >
     >     > One cache token per key range for user state and one cache
    token for
     >     > each side input. On writes to user state or changing side
    input, the
     >     > associated cache token will be renewed.
     >     >
     >     > On the SDK side, it should be sufficient to let the SDK
     >     re-associate all
     >     > its cached data belonging to a valid cache token with a new
    cache
     >     token
     >     > returned by a successful write. This has to happen in the
    active scope
     >     > (i.e. user state, or a particular side input).
     >     >
     >     > If the key range changes, new cache tokens have to
    generated. This
     >     > should happen automatically because the Runner does not
    checkpoint
     >     cache
     >     > tokens and will generate new ones when it restarts from an
    earlier
     >     > checkpoint.
     >     >
     >     > The current PR needs to be changed to (1) only keep a
    single cache
     >     token
     >     > per user state and key range (2) add support for cache
    tokens for each
     >     > side input.
     >     >
     >     > Hope that makes sense.
     >     >
     >     > -Max
     >     >
     >     > On 21.08.19 17:27, Reuven Lax wrote:
     >     >>
     >     >>
     >     >> On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels
     >     <m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>
     >     >> <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
     >     >>
     >     >>     Appreciate all your comments! Replying below.
     >     >>
     >     >>
     >     >>     @Luke:
     >     >>
     >     >>     > Having cache tokens per key would be very expensive
    indeed
     >     and I
     >     >>     believe we should go with a single cache token "per"
    bundle.
     >     >>
     >     >>     Thanks for your comments on the PR. I was thinking to
    propose
     >     something
     >     >>     along this lines of having cache tokens valid for a
    particular
     >     >>     checkpointing "epoch". That would require even less token
     >     renewal than
     >     >>     the per-bundle approach.
     >     >>
     >     >>
     >     >>     @Thomas, thanks for the input. Some remarks:
     >     >>
     >     >>     > Wouldn't it be simpler to have the runner just track a
     >     unique ID
     >     >>     for each worker and use that to communicate if the
    cache is
     >     valid or
     >     >>     not?
     >     >>
     >     >>     We do not need a unique id per worker. If a cache token is
     >     valid for a
     >     >>     particular worker, it is also valid for another
    worker. That
     >     is with the
     >     >>     assumption that key ranges are always disjoint between the
     >     workers.
     >     >>
     >     >>     > * When the bundle is started, the runner tells the
    worker
     >     if the
     >     >>     cache has become invalid (since it knows if another
    worker has
     >     >>     mutated state)
     >     >>
     >     >>     This is simply done by not transferring the particular
    cache
     >     token. No
     >     >>     need to declare it invalid explicitly.
     >     >>
     >     >>     > * When the worker sends mutation requests to the
    runner, it
     >     >>     includes its own ID (or the runner already has it as
    contextual
     >     >>     information). No need to wait for a response.
     >     >>
     >     >>     Mutations of cached values can be freely done as long
    as the
     >     cache token
     >     >>     associated with the state is valid for a particular
    bundle.
     >     Only the
     >     >>     first time, the Runner needs to wait on the response
    to store
     >     the cache
     >     >>     token. This can also be done asynchronously.
     >     >>
     >     >>     > * When the bundle is finished, the runner records
    the last
     >     writer
     >     >>     (only if a change occurred)
     >     >>
     >     >>     I believe this is not necessary because there will only be
     >     one writer at
     >     >>     a time for a particular bundle and key range, hence
    only one
     >     writer
     >     >>     holds a valid cache token for a particular state and
    key range.
     >     >>
     >     >>
     >     >>     @Reuven:
     >     >>
     >     >>     >  Dataflow divides the keyspace up into lexicographic
     >     ranges, and
     >     >>     creates a cache token per range.
     >     >>
     >     >>     State is always processed partitioned by the Flink workers
     >     (hash-based,
     >     >>     not lexicopgrahical). I don't think that matters though
     >     because the key
     >     >>     ranges do not overlap between the workers. Flink does
    not support
     >     >>     dynamically repartitioning the key ranges. Even in case of
     >     fine-grained
     >     >>     recovery of workers and their key ranges, we would simply
     >     generate new
     >     >>     cache tokens for a particular worker.
     >     >>
     >     >>
     >     >> Dataflow's ranges are also hash based. When I said
    lexicographical, I
     >     >> meant lexicographical based on the hexadecimal hash value.
     >     >>
     >     >> Indeed the fact that Dataflow can dynamically split and
    merge these
     >     >> ranges is what makes it trickier. If Flink does not
    repartition the
     >     >> ranges, then things are much easier.
     >     >>
     >     >>
     >     >>
     >     >>     Thanks,
     >     >>     Max
     >     >>
     >     >>     On 21.08.19 09:33, Reuven Lax wrote:
     >     >>     > Dataflow does something like this, however since work is
     >     >>     > load balanced across workers a per-worker id doesn't
    work
     >     very well.
     >     >>     > Dataflow divides the keyspace up into lexicographic
    ranges, and
     >     >>     creates
     >     >>     > a cache token per range.
     >     >>     >
     >     >>     > On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise
     >     <t...@apache.org <mailto:t...@apache.org>
    <mailto:t...@apache.org <mailto:t...@apache.org>>
     >     >>     <mailto:t...@apache.org <mailto:t...@apache.org>
    <mailto:t...@apache.org <mailto:t...@apache.org>>>
     >     >>     > <mailto:t...@apache.org <mailto:t...@apache.org>
    <mailto:t...@apache.org <mailto:t...@apache.org>>
     >     <mailto:t...@apache.org <mailto:t...@apache.org>
    <mailto:t...@apache.org <mailto:t...@apache.org>>>>> wrote:
     >     >>     >
     >     >>     >     Commenting here vs. on the PR since related to
    the overall
     >     >>     approach.
     >     >>     >
     >     >>     >     Wouldn't it be simpler to have the runner just
    track a
     >     unique
     >     >>     ID for
     >     >>     >     each worker and use that to communicate if the
    cache is
     >     valid
     >     >>     or not?
     >     >>     >
     >     >>     >     * When the bundle is started, the runner tells the
     >     worker if the
     >     >>     >     cache has become invalid (since it knows if another
     >     worker has
     >     >>     >     mutated state)
     >     >>     >     * When the worker sends mutation requests to the
    runner, it
     >     >>     includes
     >     >>     >     its own ID (or the runner already has it as
    contextual
     >     >>     information).
     >     >>     >     No need to wait for a response.
     >     >>     >     * When the bundle is finished, the runner
    records the
     >     last writer
     >     >>     >     (only if a change occurred)
     >     >>     >
     >     >>     >     Whenever current worker ID and last writer ID
    doesn't
     >     match, cache
     >     >>     >     is invalid.
     >     >>     >
     >     >>     >     Thomas
     >     >>     >
     >     >>     >
     >     >>     >     On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik
     >     <lc...@google.com <mailto:lc...@google.com>
    <mailto:lc...@google.com <mailto:lc...@google.com>>
     >     >>     <mailto:lc...@google.com <mailto:lc...@google.com>
    <mailto:lc...@google.com <mailto:lc...@google.com>>>
     >     >>     >     <mailto:lc...@google.com
    <mailto:lc...@google.com> <mailto:lc...@google.com
    <mailto:lc...@google.com>>
     >     <mailto:lc...@google.com <mailto:lc...@google.com>
    <mailto:lc...@google.com <mailto:lc...@google.com>>>>> wrote:
     >     >>     >
     >     >>     >         Having cache tokens per key would be very
    expensive
     >     indeed
     >     >>     and I
     >     >>     >         believe we should go with a single cache token
     >     "per" bundle.
     >     >>     >
     >     >>     >         On Mon, Aug 19, 2019 at 11:36 AM Maximilian
    Michels
     >     >>     >         <m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>
     >     >>     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>
     >     <mailto:m...@apache.org <mailto:m...@apache.org>
    <mailto:m...@apache.org <mailto:m...@apache.org>>>>> wrote:
     >     >>     >
     >     >>     >             Maybe a Beam Python expert can chime in for
     >     Rakesh's
     >     >>     question?
     >     >>     >
     >     >>     >             Luke, I was assuming cache tokens to be
    per key
     >     and state
     >     >>     >             id. During
     >     >>     >             implementing an initial support on the
    Runner
     >     side, I
     >     >>     >             realized that we
     >     >>     >             probably want cache tokens to only be
    per state
     >     id. Note
     >     >>     >             that if we had
     >     >>     >             per-key cache tokens, the number of cache
     >     tokens would
     >     >>     >             approach the
     >     >>     >             total number of keys in an application.
     >     >>     >
     >     >>     >             If anyone wants to have a look, here is
    a first
     >     version of
     >     >>     >             the Runner
     >     >>     >             side for cache tokens. Note that I only
     >     implemented cache
     >     >>     >             tokens for
     >     >>     >             BagUserState for now, but it can be easily
     >     added for side
     >     >>     >             inputs as well.
     >     >>     >
     >     >>     > https://github.com/apache/beam/pull/9374
     >     >>     >
     >     >>     >             -Max
     >     >>     >
     >     >>     >
     >     >>
     >

Reply via email to