The bundles view of side inputs should never change during processing
and should have a point in time snapshot.
I was just trying to say that the cache token for side inputs being
deferred till side input request time simplified the runners
implementation since that is conclusively when the runner would need
to take a look at the side input. Putting them as part of the
ProcesBundleRequest complicates that but does make the SDK
implementation significantly simpler which is a win.
On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels <[email protected]
<mailto:[email protected]>> wrote:
Thanks for the quick response.
Just to clarify, the issue with versioning side input is also present
when supplying the cache tokens on a request basis instead of per
bundle. The SDK never knows when the Runner receives a new version of
the side input. Like you pointed out, it needs to mark side inputs as
stale and generate new cache tokens for the stale side inputs.
The difference between per-request tokens and per-bundle tokens
would be
that the side input can only change after a bundle completes vs.
during
the bundle. Side inputs are always fuzzy in that regard because
there is
no precise instance where side inputs are atomically updated,
other than
the assumption that they eventually will be updated. In that regard
per-bundle tokens for side input seem to be fine.
All of the above is not an issue for user state, as its cache can
remain
valid for the lifetime of a Runner<=>SDK Harness connection. A simple
solution would be to not cache side input because there are many
cases
where the caching just adds additional overhead. However, I can also
imagine cases where side input is valid forever and caching would be
very beneficial.
For the first version I want to focus on user state because that's
where
I see the most benefit for caching. I don't see a problem though
for the
Runner to detect new side input and reflect that in the cache tokens
supplied for a new bundle.
-Max
On 26.08.19 22:27, Lukasz Cwik wrote:
> Your summary below makes sense to me. I can see that recovery from
> rolling back doesn't need to be a priority and simplifies the
solution
> for user state caching down to one token.
>
> Providing cache tokens upfront does require the Runner to know what
> "version" of everything it may supply to the SDK upfront
(instead of on
> request) which would mean that the Runner may need to have a
mapping
> from cache token to internal version identifier for things like
side
> inputs which are typically broadcast. The Runner would also need
to poll
> to see if the side input has changed in the background to not block
> processing bundles with "stale" side input data.
>
> Ping me once you have the Runner PR updated and I'll take a look
again.
>
> On Mon, Aug 26, 2019 at 12:20 PM Maximilian Michels
<[email protected] <mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>> wrote:
>
> Thank you for the summary Luke. I really appreciate the
effort you put
> into this!
>
> > Based upon your discussion you seem to want option #1
>
> I'm actually for option #2. The option to cache/invalidate
side inputs
> is important, and we should incorporate this in the design.
That's why
> option #1 is not flexible enough. However, a first
implementation could
> defer caching of side inputs.
>
> Option #3 was my initial thinking and the first version of
the PR, but I
> think we agreed that there wouldn't be much gain from
keeping a cache
> token per state id.
>
> Option #4 is what is specifically documented in the
reference doc and
> already part of the Proto, where valid tokens are provided
for each new
> bundle and also as part of the response of a get/put/clear.
We mentioned
> that the reply does not have to be waited on synchronously
(I mentioned
> it even), but it complicates the implementation. The idea
Thomas and I
> expressed was that a response is not even necessary if we assume
> validity of the upfront provided cache tokens for the
lifetime of a
> bundle and that cache tokens will be invalidated as soon as
the Runner
> fails in any way. This is naturally the case for Flink
because it will
> simply "forget" its current cache tokens.
>
> I currently envision the following schema:
>
> Runner
> ======
>
> - Runner generates a globally unique cache token, one for
user state and
> one for each side input
>
> - The token is supplied to the SDK Harness for each bundle
request
>
> - For the lifetime of a Runner<=>SDK Harness connection this
cache token
> will not change
> - Runner will generate a new token if the connection/key
space changes
> between Runner and SDK Harness
>
>
> SDK
> ===
>
> - For each bundle the SDK worker stores the list of valid
cache tokens
> - The SDK Harness keep a global cache across all its (local)
workers
> which is a LRU cache: state_key => (cache_token, value)
> - get: Lookup cache using the valid cache token for the
state. If no
> match, then fetch from Runner and use the already available
token for
> caching
> - put: Put value in cache with a valid cache token, put
value to pending
> writes which will be flushed out latest when the bundle ends
> - clear: same as put but clear cache
>
> It does look like this is not too far off from what you were
describing.
> The main difference is that we just work with a single cache
token. In
> my opinion we do not need the second cache token for writes,
as long as
> we ensure that we generate a new cache token if the
> bundle/checkpoint fails.
>
> I have a draft PR
> for the Runner: https://github.com/apache/beam/pull/9374
> for the SDK: https://github.com/apache/beam/pull/9418
>
> Note that the Runner PR needs to be updated to fully
reflected the above
> scheme. The SDK implementation is WIP. I want to make sure
that we
> clarify the design before this gets finalized.
>
> Thanks again for all your comments. Much appreciated!
>
> Cheers,
> Max
>
> On 26.08.19 19:58, Lukasz Cwik wrote:
> > There were originally a couple of ideas around how
caching could
> work:
> > 1) One cache token for the entire bundle that is supplied up
> front. The
> > SDK caches everything using the given token. All
> reads/clear/append for
> > all types of state happen under this token. Anytime a
side input
> > changes, key processing partition range changes or a
bundle fails to
> > process, the runner chooses a new cache token effectively
> invalidating
> > everything in the past>
> > 2) One cache token per type of state that is supplied up
front.
> > The SDK caches all requests for a given type using the
given cache
> > token. The runner can selectively choose which type to
keep and
> which to
> > invalidate. Bundle failure and key processing partition
changes
> > invalidate all user state, side input change invalidates
all side
> inputs.
> >
> > 3) One cache token per state id that is supplied up front.
> > The SDK caches all requests for the given state id using the
> given cache
> > token. The runner can selectively choose which to
invalidate and
> which
> > to keep. Bundle failure and key processing partition changes
> invalidate
> > all user state, side input changes only invalidate the
side input
> that
> > changed.
> >
> > 4) A cache token on each read/clear/append that is
supplied on the
> > response of the call with an initial valid set that is
supplied at
> > start. The runner can selectively choose which to keep on
start.
> Bundle
> > failure allows runners to "roll back" to a known good
state by
> selecting
> > the previous valid cache token as part of the initial
set. Key
> > processing partition changes allow runners to keep cached
state that
> > hasn't changed since it can be tied to a version number
of the state
> > itself as part of the initial set. Side input changes
only invalidate
> > the side input that changed.
> >
> > Based upon your discussion you seem to want option #1 which
> doesn't work
> > well with side inputs clearing cached state. If we want
to have user
> > state survive a changing side input, we would want one of
the other
> > options. I do agree that supplying the cache token upfront is
> > significantly simpler. Currently the protos are setup for
#4 since it
> > was the most flexible and at the time the pros outweighed
the cons.
> >
> > I don't understand why you think you need to wait for a
response
> for the
> > append/clear to get its cache token since the only reason you
> need the
> > cache token is that you want to use that cached data when
> processing a
> > different bundle. I was thinking that the flow on the SDK
side
> would be
> > something like (assuming there is a global cache of cache
token
> -> (map
> > of state key -> data))
> > 1) Create a local cache of (map of state key -> data)
using the
> initial
> > set of valid cache tokens
> > 2) Make all mutations in place on local cache without
waiting for
> response.
> > 3) When response comes back, update global cache with new
cache
> token ->
> > (map of state key -> data)) (this is when the data
becomes visible to
> > other bundles that start processing)
> > 4) Before the bundle finishes processing, wait for all
> outstanding state
> > calls to finish.
> >
> > To implement caching on the runner side, you would keep
track of
> at most
> > 2 cache tokens per state key, one cache token represents
the initial
> > value when the bundle started while the second represents the
> modified
> > state. If the bundle succeeds the runner passes in the
set of tokens
> > which represent the new state, if the bundle fails you
process
> using the
> > original ones.
> >
> > After thinking through the implementation again, we could
supply two
> > cache tokens for each state id, the first being the set
of initial
> > tokens if no writes happen while the second represents
the token
> to use
> > if the SDK changes the state. This gives us the
simplification
> where we
> > don't need to wait for the response before we update the
global cache
> > making a typical blocking cache much easier to do. We
also get the
> > benefit that runners can supply either the same cache
token for a
> state
> > id or different ones. If the runner supplies the same one
then its
> > telling the SDK to make modifications in place without
any rollback
> > (which is good on memory since we are reducing copies of
stuff) or if
> > the runner supplies two different ones then its telling
the SDK
> to keep
> > the old data around. If we went through with this new
option the SDK
> > side logic would be (assuming there is a global cache of
cache
> token ->
> > (map of state key -> data)):
> >
> > 1) Create an empty local set of state ids that are dirty when
> starting a
> > new bundle (dirty set)
> >
> > For reads/gets:
> > 2A) If the request is a read (get), use dirty set to
choose which
> cache
> > token to lookup and use in the global cache. If the
global cache is
> > missing data issue the appropriate request providing the
result.
> >
> > For writes/appends/clear:
> > 2B) if the cache tokens are different for the state id,
add the
> state id
> > to the dirty set if it isn't there and perform the
appropriate
> > modification to convert the old cached state data to the new
> state data
> > 3B) modify the global caches data
> > 4B) issue the request to the runner
> > 5B*) add this request to the set of requests to block on
before
> > completing the bundle.
> >
> > (* Note, there was another idea to update the process bundle
> response to
> > contain the id of the last state request which would
allow the
> runner to
> > know when it has seen the last state request allowing the
SDK to not
> > block at all when finishing the bundle)
> >
> > On Thu, Aug 22, 2019 at 10:11 AM Maximilian Michels
> <[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> > <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>> wrote:
> >
> > Just to give a quick update here. Rakesh, Thomas, and
I had a
> discussion
> > about async writes from the Python SDK to the Runner.
Robert
> was also
> > present for some parts of the discussion.
> >
> > We concluded that blocking writes with the need to
refresh
> the cache
> > token each time are not going to provide enough
> throughput/latency.
> >
> > We figured that it will be enough to use a single
cache token per
> > Runner<=>SDK Harness connection. This cache token will be
> provided by
> > the Runner in the ProcessBundleRequest. Writes will
not yield
> a new
> > cache token. The advantage is that we can use one
cache token
> for the
> > life time of the bundle and also across bundles,
unless the
> Runner
> > switches to a new Runner<=>SDK Harness connection;
then the
> Runner would
> > have to generate a new cache token.
> >
> > We might require additional cache tokens for the side
inputs.
> For now,
> > I'm planning to only tackle user state which seems to
be the
> area where
> > users have expressed the most need for caching.
> >
> > -Max
> >
> > On 21.08.19 20:05, Maximilian Michels wrote:
> > >> There is probably a misunderstanding here: I'm
suggesting
> to use
> > a worker ID instead of cache tokens, not additionally.
> > >
> > > Ah! Misread that. We need a changing token to
indicate that the
> > cache is
> > > stale, e.g. checkpoint has failed / restoring from
an old
> > checkpoint. If
> > > the _Runner_ generates a new unique token/id for
workers
> which outlast
> > > the Runner, then this should work fine. I don't
think it is
> safe
> > for the
> > > worker to supply the id. The Runner should be in
control of
> cache
> > tokens
> > > to avoid invalid tokens.
> > >
> > >> In the PR the token is modified as part of
updating the state.
> > Doesn't the SDK need the new token to update it's
cache entry
> also?
> > That's where it would help the SDK to know the new
token upfront.
> > >
> > > If the state is updated in the Runner, a new token
has to be
> > generated.
> > > The old one is not valid anymore. The SDK will use
the updated
> > token to
> > > store the new value in the cache. I understand that
it would be
> > nice to
> > > know the token upfront. That could be possible with
some token
> > > generation scheme. On the other hand, writes can be
> asynchronous and
> > > thus not block the UDF.
> > >
> > >> But I believe there is no need to change the token
in first
> > place, unless bundles for the same key (ranges) can be
> processed by
> > different workers.
> > >
> > > That's certainly possible, e.g. two workers A and B
take turn
> > processing
> > > a certain key range, one bundle after another:
> > >
> > > You process a bundle with a token T with A, then
worker B
> takes over.
> > > Both have an entry with cache token T. So B goes on to
> modify the
> > state
> > > and uses the same cache token T. Then A takes over
again. A
> would
> > have a
> > > stale cache entry but T would still be a valid
cache token.
> > >
> > >> Indeed the fact that Dataflow can dynamically
split and merge
> > these ranges is what makes it trickier. If Flink does not
> > repartition the ranges, then things are much easier.
> > >
> > > Flink does not dynamically repartition key ranges
(yet). If
> it started
> > > to support that, we would invalidate the cache
tokens for
> the changed
> > > partitions.
> > >
> > >
> > > I'd suggest the following cache token generation
scheme:
> > >
> > > One cache token per key range for user state and
one cache
> token for
> > > each side input. On writes to user state or
changing side
> input, the
> > > associated cache token will be renewed.
> > >
> > > On the SDK side, it should be sufficient to let the SDK
> > re-associate all
> > > its cached data belonging to a valid cache token
with a new
> cache
> > token
> > > returned by a successful write. This has to happen
in the
> active scope
> > > (i.e. user state, or a particular side input).
> > >
> > > If the key range changes, new cache tokens have to
> generated. This
> > > should happen automatically because the Runner does not
> checkpoint
> > cache
> > > tokens and will generate new ones when it restarts
from an
> earlier
> > > checkpoint.
> > >
> > > The current PR needs to be changed to (1) only keep a
> single cache
> > token
> > > per user state and key range (2) add support for cache
> tokens for each
> > > side input.
> > >
> > > Hope that makes sense.
> > >
> > > -Max
> > >
> > > On 21.08.19 17:27, Reuven Lax wrote:
> > >>
> > >>
> > >> On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels
> > <[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>
> > >> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>>> wrote:
> > >>
> > >> Appreciate all your comments! Replying below.
> > >>
> > >>
> > >> @Luke:
> > >>
> > >> > Having cache tokens per key would be very
expensive
> indeed
> > and I
> > >> believe we should go with a single cache token
"per"
> bundle.
> > >>
> > >> Thanks for your comments on the PR. I was
thinking to
> propose
> > something
> > >> along this lines of having cache tokens valid
for a
> particular
> > >> checkpointing "epoch". That would require even
less token
> > renewal than
> > >> the per-bundle approach.
> > >>
> > >>
> > >> @Thomas, thanks for the input. Some remarks:
> > >>
> > >> > Wouldn't it be simpler to have the runner
just track a
> > unique ID
> > >> for each worker and use that to communicate if the
> cache is
> > valid or
> > >> not?
> > >>
> > >> We do not need a unique id per worker. If a
cache token is
> > valid for a
> > >> particular worker, it is also valid for another
> worker. That
> > is with the
> > >> assumption that key ranges are always disjoint
between the
> > workers.
> > >>
> > >> > * When the bundle is started, the runner
tells the
> worker
> > if the
> > >> cache has become invalid (since it knows if
another
> worker has
> > >> mutated state)
> > >>
> > >> This is simply done by not transferring the
particular
> cache
> > token. No
> > >> need to declare it invalid explicitly.
> > >>
> > >> > * When the worker sends mutation requests to the
> runner, it
> > >> includes its own ID (or the runner already has
it as
> contextual
> > >> information). No need to wait for a response.
> > >>
> > >> Mutations of cached values can be freely done
as long
> as the
> > cache token
> > >> associated with the state is valid for a
particular
> bundle.
> > Only the
> > >> first time, the Runner needs to wait on the
response
> to store
> > the cache
> > >> token. This can also be done asynchronously.
> > >>
> > >> > * When the bundle is finished, the runner
records
> the last
> > writer
> > >> (only if a change occurred)
> > >>
> > >> I believe this is not necessary because there
will only be
> > one writer at
> > >> a time for a particular bundle and key range,
hence
> only one
> > writer
> > >> holds a valid cache token for a particular
state and
> key range.
> > >>
> > >>
> > >> @Reuven:
> > >>
> > >> > Dataflow divides the keyspace up into
lexicographic
> > ranges, and
> > >> creates a cache token per range.
> > >>
> > >> State is always processed partitioned by the
Flink workers
> > (hash-based,
> > >> not lexicopgrahical). I don't think that
matters though
> > because the key
> > >> ranges do not overlap between the workers.
Flink does
> not support
> > >> dynamically repartitioning the key ranges.
Even in case of
> > fine-grained
> > >> recovery of workers and their key ranges, we
would simply
> > generate new
> > >> cache tokens for a particular worker.
> > >>
> > >>
> > >> Dataflow's ranges are also hash based. When I said
> lexicographical, I
> > >> meant lexicographical based on the hexadecimal
hash value.
> > >>
> > >> Indeed the fact that Dataflow can dynamically
split and
> merge these
> > >> ranges is what makes it trickier. If Flink does not
> repartition the
> > >> ranges, then things are much easier.
> > >>
> > >>
> > >>
> > >> Thanks,
> > >> Max
> > >>
> > >> On 21.08.19 09:33, Reuven Lax wrote:
> > >> > Dataflow does something like this, however
since work is
> > >> > load balanced across workers a per-worker id
doesn't
> work
> > very well.
> > >> > Dataflow divides the keyspace up into
lexicographic
> ranges, and
> > >> creates
> > >> > a cache token per range.
> > >> >
> > >> > On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise
> > <[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>
> > >> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>>
> > >> > <mailto:[email protected]
<mailto:[email protected]> <mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>
> > <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>>>> wrote:
> > >> >
> > >> > Commenting here vs. on the PR since
related to
> the overall
> > >> approach.
> > >> >
> > >> > Wouldn't it be simpler to have the
runner just
> track a
> > unique
> > >> ID for
> > >> > each worker and use that to communicate
if the
> cache is
> > valid
> > >> or not?
> > >> >
> > >> > * When the bundle is started, the runner
tells the
> > worker if the
> > >> > cache has become invalid (since it knows
if another
> > worker has
> > >> > mutated state)
> > >> > * When the worker sends mutation
requests to the
> runner, it
> > >> includes
> > >> > its own ID (or the runner already has it as
> contextual
> > >> information).
> > >> > No need to wait for a response.
> > >> > * When the bundle is finished, the runner
> records the
> > last writer
> > >> > (only if a change occurred)
> > >> >
> > >> > Whenever current worker ID and last
writer ID
> doesn't
> > match, cache
> > >> > is invalid.
> > >> >
> > >> > Thomas
> > >> >
> > >> >
> > >> > On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik
> > <[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>
> > >> <mailto:[email protected]
<mailto:[email protected]> <mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>>
> > >> > <mailto:[email protected]
<mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>
<mailto:[email protected] <mailto:[email protected]>
> <mailto:[email protected] <mailto:[email protected]>>>
> > <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>>>> wrote:
> > >> >
> > >> > Having cache tokens per key would be
very
> expensive
> > indeed
> > >> and I
> > >> > believe we should go with a single
cache token
> > "per" bundle.
> > >> >
> > >> > On Mon, Aug 19, 2019 at 11:36 AM
Maximilian
> Michels
> > >> > <[email protected]
<mailto:[email protected]> <mailto:[email protected]
<mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>
> > <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>>
> > >> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>
> > <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
> <mailto:[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>>>>> wrote:
> > >> >
> > >> > Maybe a Beam Python expert can
chime in for
> > Rakesh's
> > >> question?
> > >> >
> > >> > Luke, I was assuming cache
tokens to be
> per key
> > and state
> > >> > id. During
> > >> > implementing an initial support
on the
> Runner
> > side, I
> > >> > realized that we
> > >> > probably want cache tokens to
only be
> per state
> > id. Note
> > >> > that if we had
> > >> > per-key cache tokens, the number
of cache
> > tokens would
> > >> > approach the
> > >> > total number of keys in an
application.
> > >> >
> > >> > If anyone wants to have a look,
here is
> a first
> > version of
> > >> > the Runner
> > >> > side for cache tokens. Note that
I only
> > implemented cache
> > >> > tokens for
> > >> > BagUserState for now, but it can
be easily
> > added for side
> > >> > inputs as well.
> > >> >
> > >> > https://github.com/apache/beam/pull/9374
> > >> >
> > >> > -Max
> > >> >
> > >> >
> > >>
> >
>