On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels <m...@apache.org> wrote:

> Appreciate all your comments! Replying below.
>
>
> @Luke:
>
> > Having cache tokens per key would be very expensive indeed and I believe
> we should go with a single cache token "per" bundle.
>
> Thanks for your comments on the PR. I was thinking to propose something
> along this lines of having cache tokens valid for a particular
> checkpointing "epoch". That would require even less token renewal than
> the per-bundle approach.
>
>
> @Thomas, thanks for the input. Some remarks:
>
> > Wouldn't it be simpler to have the runner just track a unique ID for
> each worker and use that to communicate if the cache is valid or not?
>
> We do not need a unique id per worker. If a cache token is valid for a
> particular worker, it is also valid for another worker. That is with the
> assumption that key ranges are always disjoint between the workers.
>
> > * When the bundle is started, the runner tells the worker if the cache
> has become invalid (since it knows if another worker has mutated state)
>
> This is simply done by not transferring the particular cache token. No
> need to declare it invalid explicitly.
>
> > * When the worker sends mutation requests to the runner, it includes its
> own ID (or the runner already has it as contextual information). No need to
> wait for a response.
>
> Mutations of cached values can be freely done as long as the cache token
> associated with the state is valid for a particular bundle. Only the
> first time, the Runner needs to wait on the response to store the cache
> token. This can also be done asynchronously.
>
> > * When the bundle is finished, the runner records the last writer (only
> if a change occurred)
>
> I believe this is not necessary because there will only be one writer at
> a time for a particular bundle and key range, hence only one writer
> holds a valid cache token for a particular state and key range.
>
>
> @Reuven:
>
> >  Dataflow divides the keyspace up into lexicographic ranges, and creates
> a cache token per range.
>
> State is always processed partitioned by the Flink workers (hash-based,
> not lexicopgrahical). I don't think that matters though because the key
> ranges do not overlap between the workers. Flink does not support
> dynamically repartitioning the key ranges. Even in case of fine-grained
> recovery of workers and their key ranges, we would simply generate new
> cache tokens for a particular worker.
>

Dataflow's ranges are also hash based. When I said lexicographical, I meant
lexicographical based on the hexadecimal hash value.

Indeed the fact that Dataflow can dynamically split and merge these ranges
is what makes it trickier. If Flink does not repartition the ranges, then
things are much easier.


>
> Thanks,
> Max
>
> On 21.08.19 09:33, Reuven Lax wrote:
> > Dataflow does something like this, however since work is
> > load balanced across workers a per-worker id doesn't work very well.
> > Dataflow divides the keyspace up into lexicographic ranges, and creates
> > a cache token per range.
> >
> > On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise <t...@apache.org
> > <mailto:t...@apache.org>> wrote:
> >
> >     Commenting here vs. on the PR since related to the overall approach.
> >
> >     Wouldn't it be simpler to have the runner just track a unique ID for
> >     each worker and use that to communicate if the cache is valid or not?
> >
> >     * When the bundle is started, the runner tells the worker if the
> >     cache has become invalid (since it knows if another worker has
> >     mutated state)
> >     * When the worker sends mutation requests to the runner, it includes
> >     its own ID (or the runner already has it as contextual information).
> >     No need to wait for a response.
> >     * When the bundle is finished, the runner records the last writer
> >     (only if a change occurred)
> >
> >     Whenever current worker ID and last writer ID doesn't match, cache
> >     is invalid.
> >
> >     Thomas
> >
> >
> >     On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik <lc...@google.com
> >     <mailto:lc...@google.com>> wrote:
> >
> >         Having cache tokens per key would be very expensive indeed and I
> >         believe we should go with a single cache token "per" bundle.
> >
> >         On Mon, Aug 19, 2019 at 11:36 AM Maximilian Michels
> >         <m...@apache.org <mailto:m...@apache.org>> wrote:
> >
> >             Maybe a Beam Python expert can chime in for Rakesh's
> question?
> >
> >             Luke, I was assuming cache tokens to be per key and state
> >             id. During
> >             implementing an initial support on the Runner side, I
> >             realized that we
> >             probably want cache tokens to only be per state id. Note
> >             that if we had
> >             per-key cache tokens, the number of cache tokens would
> >             approach the
> >             total number of keys in an application.
> >
> >             If anyone wants to have a look, here is a first version of
> >             the Runner
> >             side for cache tokens. Note that I only implemented cache
> >             tokens for
> >             BagUserState for now, but it can be easily added for side
> >             inputs as well.
> >
> >             https://github.com/apache/beam/pull/9374
> >
> >             -Max
> >
> >
>

Reply via email to