[ https://issues.apache.org/jira/browse/BEAM-8298?focusedWorklogId=378512&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378512 ]
ASF GitHub Bot logged work on BEAM-8298: ---------------------------------------- Author: ASF GitHub Bot Created on: 28/Jan/20 22:08 Start Date: 28/Jan/20 22:08 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10705: [BEAM-8298] Implement side input caching. URL: https://github.com/apache/beam/pull/10705#discussion_r372080828 ########## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ########## @@ -825,43 +833,92 @@ def extend(self, def clear(self, state_key, is_cached=False): # type: (beam_fn_api_pb2.StateKey, bool) -> _Future - if self._should_be_cached(is_cached): + cache_token = self._get_cache_token(state_key, is_cached) + if cache_token: cache_key = self._convert_to_cache_key(state_key) - self._state_cache.clear(cache_key, self._context.cache_token) + self._state_cache.clear(cache_key, cache_token) return self._underlying.clear(state_key) def done(self): # type: () -> None self._underlying.done() - def _materialize_iter(self, - state_key, # type: beam_fn_api_pb2.StateKey - coder # type: coder_impl.CoderImpl - ): + def _lazy_iterator( + self, + state_key, # type: beam_fn_api_pb2.StateKey + coder, # type: coder_impl.CoderImpl + continuation_token=None # type: Optional[bytes] + ): # type: (...) -> Iterator[Any] """Materializes the state lazily, one element at a time. :return A generator which returns the next element if advanced. """ - continuation_token = None while True: - data, continuation_token = \ - self._underlying.get_raw(state_key, continuation_token) + data, continuation_token = ( + self._underlying.get_raw(state_key, continuation_token)) input_stream = coder_impl.create_InputStream(data) while input_stream.size() > 0: yield coder.decode_from_stream(input_stream, True) if not continuation_token: break - def _should_be_cached(self, request_is_cached): - return (self._state_cache.is_cache_enabled() and - request_is_cached and - self._context.cache_token) + def _get_cache_token(self, state_key, request_is_cached): + if not self._state_cache.is_cache_enabled(): + return None + elif state_key.HasField('bag_user_state'): + if request_is_cached and self._context.user_state_cache_token: + return self._context.user_state_cache_token + else: + return self._context.bundle_cache_token + elif state_key.WhichOneof('type').endswith('_side_input'): + side_input = getattr(state_key, state_key.WhichOneof('type')) + return self._context.side_input_cache_tokens.get( + (side_input.transform_id, side_input.side_input_id), + self._context.bundle_cache_token) + + def _partially_cached_iterable( + self, + state_key, # type: beam_fn_api_pb2.StateKey + coder # type: coder_impl.CoderImpl + ): + # type: (...) -> Iterable[Any] + """Materialized the first page of data, concatinated with a lazy iterable + of the rest, if any. + """ + data, continuation_token = ( + self._underlying.get_raw(state_key, None)) + head = [] + input_stream = coder_impl.create_InputStream(data) + while input_stream.size() > 0: + head.append(coder.decode_from_stream(input_stream, True)) + + if continuation_token is None: + return head + else: + def iter_func(): + for item in head: + yield item + for item in self._lazy_iterator(state_key, coder, continuation_token): + yield item + return _IterableFromIterator(iter_func) @staticmethod def _convert_to_cache_key(state_key): return state_key.SerializeToString() +class _IterableFromIterator(object): + """Wraps an iterator as an iterable.""" + def __init__(self, iter_func): + self._iter_func = iter_func + def __iter__(self): + return iter_func() Review comment: ```suggestion return iter_func ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 378512) Time Spent: 1h 50m (was: 1h 40m) > Implement state caching for side inputs > --------------------------------------- > > Key: BEAM-8298 > URL: https://issues.apache.org/jira/browse/BEAM-8298 > Project: Beam > Issue Type: Improvement > Components: runner-core, sdk-py-harness > Reporter: Maximilian Michels > Assignee: Jing Chen > Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > Caching is currently only implemented for user state. -- This message was sent by Atlassian Jira (v8.3.4#803005)