> There is probably a misunderstanding here: I'm suggesting to use a
worker ID instead of cache tokens, not additionally.

Ah! Misread that. We need a changing token to indicate that the cache is
stale, e.g. checkpoint has failed / restoring from an old checkpoint. If
the _Runner_ generates a new unique token/id for workers which outlast
the Runner, then this should work fine. I don't think it is safe for the
worker to supply the id. The Runner should be in control of cache tokens
to avoid invalid tokens.

> In the PR the token is modified as part of updating the state. Doesn't
the SDK need the new token to update it's cache entry also? That's where
it would help the SDK to know the new token upfront.

If the state is updated in the Runner, a new token has to be generated.
The old one is not valid anymore. The SDK will use the updated token to
store the new value in the cache. I understand that it would be nice to
know the token upfront. That could be possible with some token
generation scheme. On the other hand, writes can be asynchronous and
thus not block the UDF.

> But I believe there is no need to change the token in first place,
unless bundles for the same key (ranges) can be processed by different
workers.

That's certainly possible, e.g. two workers A and B take turn processing
a certain key range, one bundle after another:

You process a bundle with a token T with A, then worker B takes over.
Both have an entry with cache token T. So B goes on to modify the state
and uses the same cache token T. Then A takes over again. A would have a
stale cache entry but T would still be a valid cache token.

> Indeed the fact that Dataflow can dynamically split and merge these
ranges is what makes it trickier. If Flink does not repartition the
ranges, then things are much easier.

Flink does not dynamically repartition key ranges (yet). If it started
to support that, we would invalidate the cache tokens for the changed
partitions.


I'd suggest the following cache token generation scheme:

One cache token per key range for user state and one cache token for
each side input. On writes to user state or changing side input, the
associated cache token will be renewed.

On the SDK side, it should be sufficient to let the SDK re-associate all
its cached data belonging to a valid cache token with a new cache token
returned by a successful write. This has to happen in the active scope
(i.e. user state, or a particular side input).

If the key range changes, new cache tokens have to generated. This
should happen automatically because the Runner does not checkpoint cache
tokens and will generate new ones when it restarts from an earlier
checkpoint.

The current PR needs to be changed to (1) only keep a single cache token
per user state and key range (2) add support for cache tokens for each
side input.

Hope that makes sense.

-Max

On 21.08.19 17:27, Reuven Lax wrote:
> 
> 
> On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels <m...@apache.org
> <mailto:m...@apache.org>> wrote:
> 
>     Appreciate all your comments! Replying below.
> 
> 
>     @Luke:
> 
>     > Having cache tokens per key would be very expensive indeed and I
>     believe we should go with a single cache token "per" bundle.
> 
>     Thanks for your comments on the PR. I was thinking to propose something
>     along this lines of having cache tokens valid for a particular
>     checkpointing "epoch". That would require even less token renewal than
>     the per-bundle approach.
> 
> 
>     @Thomas, thanks for the input. Some remarks:
> 
>     > Wouldn't it be simpler to have the runner just track a unique ID
>     for each worker and use that to communicate if the cache is valid or
>     not?
> 
>     We do not need a unique id per worker. If a cache token is valid for a
>     particular worker, it is also valid for another worker. That is with the
>     assumption that key ranges are always disjoint between the workers.
> 
>     > * When the bundle is started, the runner tells the worker if the
>     cache has become invalid (since it knows if another worker has
>     mutated state)
> 
>     This is simply done by not transferring the particular cache token. No
>     need to declare it invalid explicitly.
> 
>     > * When the worker sends mutation requests to the runner, it
>     includes its own ID (or the runner already has it as contextual
>     information). No need to wait for a response.
> 
>     Mutations of cached values can be freely done as long as the cache token
>     associated with the state is valid for a particular bundle. Only the
>     first time, the Runner needs to wait on the response to store the cache
>     token. This can also be done asynchronously.
> 
>     > * When the bundle is finished, the runner records the last writer
>     (only if a change occurred)
> 
>     I believe this is not necessary because there will only be one writer at
>     a time for a particular bundle and key range, hence only one writer
>     holds a valid cache token for a particular state and key range.
> 
> 
>     @Reuven:
> 
>     >  Dataflow divides the keyspace up into lexicographic ranges, and
>     creates a cache token per range.
> 
>     State is always processed partitioned by the Flink workers (hash-based,
>     not lexicopgrahical). I don't think that matters though because the key
>     ranges do not overlap between the workers. Flink does not support
>     dynamically repartitioning the key ranges. Even in case of fine-grained
>     recovery of workers and their key ranges, we would simply generate new
>     cache tokens for a particular worker.
> 
> 
> Dataflow's ranges are also hash based. When I said lexicographical, I
> meant lexicographical based on the hexadecimal hash value.
> 
> Indeed the fact that Dataflow can dynamically split and merge these
> ranges is what makes it trickier. If Flink does not repartition the
> ranges, then things are much easier.
> 
> 
> 
>     Thanks,
>     Max
> 
>     On 21.08.19 09:33, Reuven Lax wrote:
>     > Dataflow does something like this, however since work is
>     > load balanced across workers a per-worker id doesn't work very well.
>     > Dataflow divides the keyspace up into lexicographic ranges, and
>     creates
>     > a cache token per range. 
>     >
>     > On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise <t...@apache.org
>     <mailto:t...@apache.org>
>     > <mailto:t...@apache.org <mailto:t...@apache.org>>> wrote:
>     >
>     >     Commenting here vs. on the PR since related to the overall
>     approach.
>     >
>     >     Wouldn't it be simpler to have the runner just track a unique
>     ID for
>     >     each worker and use that to communicate if the cache is valid
>     or not?
>     >
>     >     * When the bundle is started, the runner tells the worker if the
>     >     cache has become invalid (since it knows if another worker has
>     >     mutated state)
>     >     * When the worker sends mutation requests to the runner, it
>     includes
>     >     its own ID (or the runner already has it as contextual
>     information).
>     >     No need to wait for a response.
>     >     * When the bundle is finished, the runner records the last writer
>     >     (only if a change occurred)
>     >
>     >     Whenever current worker ID and last writer ID doesn't match, cache
>     >     is invalid.
>     >
>     >     Thomas
>     >
>     >
>     >     On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik <lc...@google.com
>     <mailto:lc...@google.com>
>     >     <mailto:lc...@google.com <mailto:lc...@google.com>>> wrote:
>     >
>     >         Having cache tokens per key would be very expensive indeed
>     and I
>     >         believe we should go with a single cache token "per" bundle.
>     >
>     >         On Mon, Aug 19, 2019 at 11:36 AM Maximilian Michels
>     >         <m...@apache.org <mailto:m...@apache.org>
>     <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>     >
>     >             Maybe a Beam Python expert can chime in for Rakesh's
>     question?
>     >
>     >             Luke, I was assuming cache tokens to be per key and state
>     >             id. During
>     >             implementing an initial support on the Runner side, I
>     >             realized that we
>     >             probably want cache tokens to only be per state id. Note
>     >             that if we had
>     >             per-key cache tokens, the number of cache tokens would
>     >             approach the
>     >             total number of keys in an application.
>     >
>     >             If anyone wants to have a look, here is a first version of
>     >             the Runner
>     >             side for cache tokens. Note that I only implemented cache
>     >             tokens for
>     >             BagUserState for now, but it can be easily added for side
>     >             inputs as well.
>     >
>     >             https://github.com/apache/beam/pull/9374
>     >
>     >             -Max
>     >
>     >
> 

Reply via email to