Commenting here vs. on the PR since related to the overall approach. Wouldn't it be simpler to have the runner just track a unique ID for each worker and use that to communicate if the cache is valid or not?
* When the bundle is started, the runner tells the worker if the cache has become invalid (since it knows if another worker has mutated state) * When the worker sends mutation requests to the runner, it includes its own ID (or the runner already has it as contextual information). No need to wait for a response. * When the bundle is finished, the runner records the last writer (only if a change occurred) Whenever current worker ID and last writer ID doesn't match, cache is invalid. Thomas On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik <lc...@google.com> wrote: > Having cache tokens per key would be very expensive indeed and I believe > we should go with a single cache token "per" bundle. > > On Mon, Aug 19, 2019 at 11:36 AM Maximilian Michels <m...@apache.org> > wrote: > >> Maybe a Beam Python expert can chime in for Rakesh's question? >> >> Luke, I was assuming cache tokens to be per key and state id. During >> implementing an initial support on the Runner side, I realized that we >> probably want cache tokens to only be per state id. Note that if we had >> per-key cache tokens, the number of cache tokens would approach the >> total number of keys in an application. >> >> If anyone wants to have a look, here is a first version of the Runner >> side for cache tokens. Note that I only implemented cache tokens for >> BagUserState for now, but it can be easily added for side inputs as well. >> >> https://github.com/apache/beam/pull/9374 >> >> -Max >> >> >>