lukecwik commented on code in PR #23046:
URL: https://github.com/apache/beam/pull/23046#discussion_r966408485
##########
sdks/python/apache_beam/runners/worker/sdk_worker.py:
##########
@@ -1175,29 +1162,21 @@ def extend(
# type: (...) -> _Future
cache_token = self._get_cache_token(state_key)
if cache_token:
- # Update the cache
+ # Update the cache if the value is already present and
+ # can be updated.
cache_key = self._convert_to_cache_key(state_key)
- cached_value = self._state_cache.get(cache_key, cache_token)
- # Keep in mind that the state for this key can be evicted
- # while executing this function. Either read or write to the cache
- # but never do both here!
- if cached_value is None:
- # We have never cached this key before, first retrieve state
- cached_value = self.blocking_get(state_key, coder)
- # Just extend the already cached value
+ cached_value = self._state_cache.peek((cache_key, cache_token))
Review Comment:
Yes, `peek` doesn't modify the cache and returns the value or None if not
present. `get` returns the value (loading if necessary).
We are peeking because we want to support blind writes. This allows us to
append to the bag without needing to load it from the runner. As an
optimization we also append to the cached in memory version if it is fully
loaded since we know that we will have a consistent answer with what the runner
would have provided to us.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]