[
https://issues.apache.org/jira/browse/BEAM-10260?focusedWorklogId=446344&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-446344
]
ASF GitHub Bot logged work on BEAM-10260:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 16/Jun/20 07:33
Start Date: 16/Jun/20 07:33
Worklog Time Spent: 10m
Work Description: mxm commented on a change in pull request #12012:
URL: https://github.com/apache/beam/pull/12012#discussion_r440643909
##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -941,15 +942,29 @@ def extend(self,
is_cached=False
):
# type: (...) -> _Future
+ # Make sure the input is a list of elements
+ elements = list(elements)
cache_token = self._get_cache_token(state_key, is_cached)
if cache_token:
# Update the cache
cache_key = self._convert_to_cache_key(state_key)
- if self._state_cache.get(cache_key, cache_token) is None:
- # We have never cached this key before, first initialize cache
- self.blocking_get(state_key, coder, is_cached=True)
- # Now update the values in the cache
- self._state_cache.extend(cache_key, cache_token, elements)
+ cached_value = self._state_cache.get(cache_key, cache_token)
+ # Keep in mind that the state for this key can be evicted
+ # while executing this function. Either read or write to the cache
+ # but never do both here!
+ if cached_value is None:
+ # We have never cached this key before, first retrieve state
+ cached_value = self.blocking_get(state_key, coder)
+ # Just extend the already cached value
+ if isinstance(cached_value, list):
+ # The state is fully cached and can be extended
+ cached_value.extend(elements)
+ elif isinstance(cached_value, itertools.chain):
+ # The state is too large to be fully cached (continuation token used),
Review comment:
Extend is called when new state gets appended. In line 963 we know that
the cache already contains the head of the state + an iterator which retrieves
the rest from the runner due to the use of continuation tokens for large state.
We don't want to cache further values, that's why we skip updating the cache
and just send the new values to the Runner (below). When we retrieve this state
from the cache, it will have the head cached and retrieve the rest from the
Runner, including the appended values.
Should it be an error? No, because we still want to be able to append to
large state.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 446344)
Time Spent: 40m (was: 0.5h)
> Nested iterable in state cache exceeds Python recursion limit
> -------------------------------------------------------------
>
> Key: BEAM-10260
> URL: https://issues.apache.org/jira/browse/BEAM-10260
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-harness
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Priority: P2
> Time Spent: 40m
> Remaining Estimate: 0h
>
> With state caching enabled, the changes in BEAM-8298 lead to an unbounded
> nesting of state iterables when appending to a bag state, quickly resulting
> in an exception:
> {noformat}
> events = list(event_bag.read())
> File
> "/code/venvs/venv/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 488, in __iter__ for elem in self.first:
> File
> "/code/venvs/venv/lib/python3.6/site-packages/apache_beam/runners/worker/statecache.py",
> line 186, in __iter__ for item in value:
> File
> "/code/venvs/venv/lib/python3.6/site-packages/apache_beam/runners/worker/statecache.py",
> line 186, in __iter__ for item in value:
> File
> "/code/venvs/venv/lib/python3.6/site-packages/apache_beam/runners/worker/statecache.py",
> line 186, in __iter__ for item in value: [Previous line repeated 977
> more times]
> RecursionError: maximum recursion depth exceeded while calling a Python
> object
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)