Your summary below makes sense to me. I can see that recovery from
rolling back doesn't need to be a priority and simplifies the
solution
for user state caching down to one token.
Providing cache tokens upfront does require the Runner to know what
"version" of everything it may supply to the SDK upfront (instead
of on
request) which would mean that the Runner may need to have a mapping
from cache token to internal version identifier for things like side
inputs which are typically broadcast. The Runner would also need
to poll
to see if the side input has changed in the background to not block
processing bundles with "stale" side input data.
Ping me once you have the Runner PR updated and I'll take a look
again.
On Mon, Aug 26, 2019 at 12:20 PM Maximilian Michels <m...@apache.org
<mailto:m...@apache.org>> wrote:
Thank you for the summary Luke. I really appreciate the
effort you put
into this!
> Based upon your discussion you seem to want option #1
I'm actually for option #2. The option to cache/invalidate
side inputs
is important, and we should incorporate this in the design.
That's why
option #1 is not flexible enough. However, a first
implementation could
defer caching of side inputs.
Option #3 was my initial thinking and the first version of
the PR, but I
think we agreed that there wouldn't be much gain from keeping
a cache
token per state id.
Option #4 is what is specifically documented in the reference
doc and
already part of the Proto, where valid tokens are provided
for each new
bundle and also as part of the response of a get/put/clear.
We mentioned
that the reply does not have to be waited on synchronously (I
mentioned
it even), but it complicates the implementation. The idea
Thomas and I
expressed was that a response is not even necessary if we assume
validity of the upfront provided cache tokens for the
lifetime of a
bundle and that cache tokens will be invalidated as soon as
the Runner
fails in any way. This is naturally the case for Flink
because it will
simply "forget" its current cache tokens.
I currently envision the following schema:
Runner
======
- Runner generates a globally unique cache token, one for
user state and
one for each side input
- The token is supplied to the SDK Harness for each bundle
request
- For the lifetime of a Runner<=>SDK Harness connection this
cache token
will not change
- Runner will generate a new token if the connection/key
space changes
between Runner and SDK Harness
SDK
===
- For each bundle the SDK worker stores the list of valid
cache tokens
- The SDK Harness keep a global cache across all its (local)
workers
which is a LRU cache: state_key => (cache_token, value)
- get: Lookup cache using the valid cache token for the
state. If no
match, then fetch from Runner and use the already available
token for
caching
- put: Put value in cache with a valid cache token, put value
to pending
writes which will be flushed out latest when the bundle ends
- clear: same as put but clear cache
It does look like this is not too far off from what you were
describing.
The main difference is that we just work with a single cache
token. In
my opinion we do not need the second cache token for writes,
as long as
we ensure that we generate a new cache token if the
bundle/checkpoint fails.
I have a draft PR
for the Runner: https://github.com/apache/beam/pull/9374
for the SDK: https://github.com/apache/beam/pull/9418
Note that the Runner PR needs to be updated to fully
reflected the above
scheme. The SDK implementation is WIP. I want to make sure
that we
clarify the design before this gets finalized.
Thanks again for all your comments. Much appreciated!
Cheers,
Max
On 26.08.19 19:58, Lukasz Cwik wrote:
> There were originally a couple of ideas around how caching
could
work:
> 1) One cache token for the entire bundle that is supplied up
front. The
> SDK caches everything using the given token. All
reads/clear/append for
> all types of state happen under this token. Anytime a side
input
> changes, key processing partition range changes or a
bundle fails to
> process, the runner chooses a new cache token effectively
invalidating
> everything in the past>
> 2) One cache token per type of state that is supplied up
front.
> The SDK caches all requests for a given type using the
given cache
> token. The runner can selectively choose which type to
keep and
which to
> invalidate. Bundle failure and key processing partition
changes
> invalidate all user state, side input change invalidates
all side
inputs.
>
> 3) One cache token per state id that is supplied up front.
> The SDK caches all requests for the given state id using the
given cache
> token. The runner can selectively choose which to
invalidate and
which
> to keep. Bundle failure and key processing partition changes
invalidate
> all user state, side input changes only invalidate the
side input
that
> changed.
>
> 4) A cache token on each read/clear/append that is
supplied on the
> response of the call with an initial valid set that is
supplied at
> start. The runner can selectively choose which to keep on
start.
Bundle
> failure allows runners to "roll back" to a known good
state by
selecting
> the previous valid cache token as part of the initial set.
Key
> processing partition changes allow runners to keep cached
state that
> hasn't changed since it can be tied to a version number of
the state
> itself as part of the initial set. Side input changes only
invalidate
> the side input that changed.
>
> Based upon your discussion you seem to want option #1 which
doesn't work
> well with side inputs clearing cached state. If we want to
have user
> state survive a changing side input, we would want one of
the other
> options. I do agree that supplying the cache token upfront is
> significantly simpler. Currently the protos are setup for
#4 since it
> was the most flexible and at the time the pros outweighed
the cons.
>
> I don't understand why you think you need to wait for a
response
for the
> append/clear to get its cache token since the only reason you
need the
> cache token is that you want to use that cached data when
processing a
> different bundle. I was thinking that the flow on the SDK
side
would be
> something like (assuming there is a global cache of cache
token
-> (map
> of state key -> data))
> 1) Create a local cache of (map of state key -> data)
using the
initial
> set of valid cache tokens
> 2) Make all mutations in place on local cache without
waiting for
response.
> 3) When response comes back, update global cache with new
cache
token ->
> (map of state key -> data)) (this is when the data becomes
visible to
> other bundles that start processing)
> 4) Before the bundle finishes processing, wait for all
outstanding state
> calls to finish.
>
> To implement caching on the runner side, you would keep
track of
at most
> 2 cache tokens per state key, one cache token represents
the initial
> value when the bundle started while the second represents the
modified
> state. If the bundle succeeds the runner passes in the set
of tokens
> which represent the new state, if the bundle fails you
process
using the
> original ones.
>
> After thinking through the implementation again, we could
supply two
> cache tokens for each state id, the first being the set of
initial
> tokens if no writes happen while the second represents the
token
to use
> if the SDK changes the state. This gives us the
simplification
where we
> don't need to wait for the response before we update the
global cache
> making a typical blocking cache much easier to do. We also
get the
> benefit that runners can supply either the same cache
token for a
state
> id or different ones. If the runner supplies the same one
then its
> telling the SDK to make modifications in place without any
rollback
> (which is good on memory since we are reducing copies of
stuff) or if
> the runner supplies two different ones then its telling
the SDK
to keep
> the old data around. If we went through with this new
option the SDK
> side logic would be (assuming there is a global cache of
cache
token ->
> (map of state key -> data)):
>
> 1) Create an empty local set of state ids that are dirty when
starting a
> new bundle (dirty set)
>
> For reads/gets:
> 2A) If the request is a read (get), use dirty set to
choose which
cache
> token to lookup and use in the global cache. If the global
cache is
> missing data issue the appropriate request providing the
result.
>
> For writes/appends/clear:
> 2B) if the cache tokens are different for the state id,
add the
state id
> to the dirty set if it isn't there and perform the
appropriate
> modification to convert the old cached state data to the new
state data
> 3B) modify the global caches data
> 4B) issue the request to the runner
> 5B*) add this request to the set of requests to block on
before
> completing the bundle.
>
> (* Note, there was another idea to update the process bundle
response to
> contain the id of the last state request which would allow
the
runner to
> know when it has seen the last state request allowing the
SDK to not
> block at all when finishing the bundle)
>
> On Thu, Aug 22, 2019 at 10:11 AM Maximilian Michels
<m...@apache.org <mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>
> Just to give a quick update here. Rakesh, Thomas, and
I had a
discussion
> about async writes from the Python SDK to the Runner.
Robert
was also
> present for some parts of the discussion.
>
> We concluded that blocking writes with the need to
refresh
the cache
> token each time are not going to provide enough
throughput/latency.
>
> We figured that it will be enough to use a single
cache token per
> Runner<=>SDK Harness connection. This cache token will be
provided by
> the Runner in the ProcessBundleRequest. Writes will
not yield
a new
> cache token. The advantage is that we can use one
cache token
for the
> life time of the bundle and also across bundles,
unless the
Runner
> switches to a new Runner<=>SDK Harness connection;
then the
Runner would
> have to generate a new cache token.
>
> We might require additional cache tokens for the side
inputs.
For now,
> I'm planning to only tackle user state which seems to
be the
area where
> users have expressed the most need for caching.
>
> -Max
>
> On 21.08.19 20:05, Maximilian Michels wrote:
> >> There is probably a misunderstanding here: I'm
suggesting
to use
> a worker ID instead of cache tokens, not additionally.
> >
> > Ah! Misread that. We need a changing token to
indicate that the
> cache is
> > stale, e.g. checkpoint has failed / restoring from
an old
> checkpoint. If
> > the _Runner_ generates a new unique token/id for
workers
which outlast
> > the Runner, then this should work fine. I don't
think it is
safe
> for the
> > worker to supply the id. The Runner should be in
control of
cache
> tokens
> > to avoid invalid tokens.
> >
> >> In the PR the token is modified as part of updating
the state.
> Doesn't the SDK need the new token to update it's
cache entry
also?
> That's where it would help the SDK to know the new
token upfront.
> >
> > If the state is updated in the Runner, a new token
has to be
> generated.
> > The old one is not valid anymore. The SDK will use
the updated
> token to
> > store the new value in the cache. I understand that
it would be
> nice to
> > know the token upfront. That could be possible with
some token
> > generation scheme. On the other hand, writes can be
asynchronous and
> > thus not block the UDF.
> >
> >> But I believe there is no need to change the token
in first
> place, unless bundles for the same key (ranges) can be
processed by
> different workers.
> >
> > That's certainly possible, e.g. two workers A and B
take turn
> processing
> > a certain key range, one bundle after another:
> >
> > You process a bundle with a token T with A, then
worker B
takes over.
> > Both have an entry with cache token T. So B goes on to
modify the
> state
> > and uses the same cache token T. Then A takes over
again. A
would
> have a
> > stale cache entry but T would still be a valid cache
token.
> >
> >> Indeed the fact that Dataflow can dynamically split
and merge
> these ranges is what makes it trickier. If Flink does not
> repartition the ranges, then things are much easier.
> >
> > Flink does not dynamically repartition key ranges
(yet). If
it started
> > to support that, we would invalidate the cache
tokens for
the changed
> > partitions.
> >
> >
> > I'd suggest the following cache token generation
scheme:
> >
> > One cache token per key range for user state and one
cache
token for
> > each side input. On writes to user state or changing
side
input, the
> > associated cache token will be renewed.
> >
> > On the SDK side, it should be sufficient to let the SDK
> re-associate all
> > its cached data belonging to a valid cache token
with a new
cache
> token
> > returned by a successful write. This has to happen
in the
active scope
> > (i.e. user state, or a particular side input).
> >
> > If the key range changes, new cache tokens have to
generated. This
> > should happen automatically because the Runner does not
checkpoint
> cache
> > tokens and will generate new ones when it restarts
from an
earlier
> > checkpoint.
> >
> > The current PR needs to be changed to (1) only keep a
single cache
> token
> > per user state and key range (2) add support for cache
tokens for each
> > side input.
> >
> > Hope that makes sense.
> >
> > -Max
> >
> > On 21.08.19 17:27, Reuven Lax wrote:
> >>
> >>
> >> On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels
> <m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> >> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
> >>
> >> Appreciate all your comments! Replying below.
> >>
> >>
> >> @Luke:
> >>
> >> > Having cache tokens per key would be very
expensive
indeed
> and I
> >> believe we should go with a single cache token
"per"
bundle.
> >>
> >> Thanks for your comments on the PR. I was
thinking to
propose
> something
> >> along this lines of having cache tokens valid
for a
particular
> >> checkpointing "epoch". That would require even
less token
> renewal than
> >> the per-bundle approach.
> >>
> >>
> >> @Thomas, thanks for the input. Some remarks:
> >>
> >> > Wouldn't it be simpler to have the runner
just track a
> unique ID
> >> for each worker and use that to communicate if the
cache is
> valid or
> >> not?
> >>
> >> We do not need a unique id per worker. If a
cache token is
> valid for a
> >> particular worker, it is also valid for another
worker. That
> is with the
> >> assumption that key ranges are always disjoint
between the
> workers.
> >>
> >> > * When the bundle is started, the runner
tells the
worker
> if the
> >> cache has become invalid (since it knows if
another
worker has
> >> mutated state)
> >>
> >> This is simply done by not transferring the
particular
cache
> token. No
> >> need to declare it invalid explicitly.
> >>
> >> > * When the worker sends mutation requests to the
runner, it
> >> includes its own ID (or the runner already has
it as
contextual
> >> information). No need to wait for a response.
> >>
> >> Mutations of cached values can be freely done
as long
as the
> cache token
> >> associated with the state is valid for a
particular
bundle.
> Only the
> >> first time, the Runner needs to wait on the
response
to store
> the cache
> >> token. This can also be done asynchronously.
> >>
> >> > * When the bundle is finished, the runner
records
the last
> writer
> >> (only if a change occurred)
> >>
> >> I believe this is not necessary because there
will only be
> one writer at
> >> a time for a particular bundle and key range,
hence
only one
> writer
> >> holds a valid cache token for a particular
state and
key range.
> >>
> >>
> >> @Reuven:
> >>
> >> > Dataflow divides the keyspace up into
lexicographic
> ranges, and
> >> creates a cache token per range.
> >>
> >> State is always processed partitioned by the
Flink workers
> (hash-based,
> >> not lexicopgrahical). I don't think that
matters though
> because the key
> >> ranges do not overlap between the workers.
Flink does
not support
> >> dynamically repartitioning the key ranges. Even
in case of
> fine-grained
> >> recovery of workers and their key ranges, we
would simply
> generate new
> >> cache tokens for a particular worker.
> >>
> >>
> >> Dataflow's ranges are also hash based. When I said
lexicographical, I
> >> meant lexicographical based on the hexadecimal hash
value.
> >>
> >> Indeed the fact that Dataflow can dynamically split
and
merge these
> >> ranges is what makes it trickier. If Flink does not
repartition the
> >> ranges, then things are much easier.
> >>
> >>
> >>
> >> Thanks,
> >> Max
> >>
> >> On 21.08.19 09:33, Reuven Lax wrote:
> >> > Dataflow does something like this, however
since work is
> >> > load balanced across workers a per-worker id
doesn't
work
> very well.
> >> > Dataflow divides the keyspace up into
lexicographic
ranges, and
> >> creates
> >> > a cache token per range.
> >> >
> >> > On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise
> <t...@apache.org <mailto:t...@apache.org>
<mailto:t...@apache.org <mailto:t...@apache.org>>
> >> <mailto:t...@apache.org <mailto:t...@apache.org>
<mailto:t...@apache.org <mailto:t...@apache.org>>>
> >> > <mailto:t...@apache.org <mailto:t...@apache.org>
<mailto:t...@apache.org <mailto:t...@apache.org>>
> <mailto:t...@apache.org <mailto:t...@apache.org>
<mailto:t...@apache.org <mailto:t...@apache.org>>>>> wrote:
> >> >
> >> > Commenting here vs. on the PR since
related to
the overall
> >> approach.
> >> >
> >> > Wouldn't it be simpler to have the runner
just
track a
> unique
> >> ID for
> >> > each worker and use that to communicate
if the
cache is
> valid
> >> or not?
> >> >
> >> > * When the bundle is started, the runner
tells the
> worker if the
> >> > cache has become invalid (since it knows
if another
> worker has
> >> > mutated state)
> >> > * When the worker sends mutation requests
to the
runner, it
> >> includes
> >> > its own ID (or the runner already has it as
contextual
> >> information).
> >> > No need to wait for a response.
> >> > * When the bundle is finished, the runner
records the
> last writer
> >> > (only if a change occurred)
> >> >
> >> > Whenever current worker ID and last
writer ID
doesn't
> match, cache
> >> > is invalid.
> >> >
> >> > Thomas
> >> >
> >> >
> >> > On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik
> <lc...@google.com <mailto:lc...@google.com>
<mailto:lc...@google.com <mailto:lc...@google.com>>
> >> <mailto:lc...@google.com <mailto:lc...@google.com>
<mailto:lc...@google.com <mailto:lc...@google.com>>>
> >> > <mailto:lc...@google.com
<mailto:lc...@google.com> <mailto:lc...@google.com
<mailto:lc...@google.com>>
> <mailto:lc...@google.com <mailto:lc...@google.com>
<mailto:lc...@google.com <mailto:lc...@google.com>>>>> wrote:
> >> >
> >> > Having cache tokens per key would be
very
expensive
> indeed
> >> and I
> >> > believe we should go with a single
cache token
> "per" bundle.
> >> >
> >> > On Mon, Aug 19, 2019 at 11:36 AM
Maximilian
Michels
> >> > <m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>
> >> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>>> wrote:
> >> >
> >> > Maybe a Beam Python expert can
chime in for
> Rakesh's
> >> question?
> >> >
> >> > Luke, I was assuming cache tokens
to be
per key
> and state
> >> > id. During
> >> > implementing an initial support
on the
Runner
> side, I
> >> > realized that we
> >> > probably want cache tokens to
only be
per state
> id. Note
> >> > that if we had
> >> > per-key cache tokens, the number
of cache
> tokens would
> >> > approach the
> >> > total number of keys in an
application.
> >> >
> >> > If anyone wants to have a look,
here is
a first
> version of
> >> > the Runner
> >> > side for cache tokens. Note that
I only
> implemented cache
> >> > tokens for
> >> > BagUserState for now, but it can
be easily
> added for side
> >> > inputs as well.
> >> >
> >> > https://github.com/apache/beam/pull/9374
> >> >
> >> > -Max
> >> >
> >> >
> >>
>