[
https://issues.apache.org/jira/browse/BEAM-10260?focusedWorklogId=446272&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-446272
]
ASF GitHub Bot logged work on BEAM-10260:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 16/Jun/20 04:06
Start Date: 16/Jun/20 04:06
Worklog Time Spent: 10m
Work Description: tweise commented on a change in pull request #12012:
URL: https://github.com/apache/beam/pull/12012#discussion_r440568137
##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -941,15 +942,29 @@ def extend(self,
is_cached=False
):
# type: (...) -> _Future
+ # Make sure the input is a list of elements
+ elements = list(elements)
Review comment:
Isn't `cached_value.extend(elements)` the only place where this matters?
##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -941,15 +942,29 @@ def extend(self,
is_cached=False
):
# type: (...) -> _Future
+ # Make sure the input is a list of elements
+ elements = list(elements)
cache_token = self._get_cache_token(state_key, is_cached)
if cache_token:
# Update the cache
cache_key = self._convert_to_cache_key(state_key)
- if self._state_cache.get(cache_key, cache_token) is None:
- # We have never cached this key before, first initialize cache
- self.blocking_get(state_key, coder, is_cached=True)
- # Now update the values in the cache
- self._state_cache.extend(cache_key, cache_token, elements)
+ cached_value = self._state_cache.get(cache_key, cache_token)
+ # Keep in mind that the state for this key can be evicted
+ # while executing this function. Either read or write to the cache
+ # but never do both here!
+ if cached_value is None:
+ # We have never cached this key before, first retrieve state
+ cached_value = self.blocking_get(state_key, coder)
+ # Just extend the already cached value
+ if isinstance(cached_value, list):
+ # The state is fully cached and can be extended
+ cached_value.extend(elements)
+ elif isinstance(cached_value, itertools.chain):
+ # The state is too large to be fully cached (continuation token used),
Review comment:
Why would `extend` be called in this case? Should this be an error?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 446272)
Time Spent: 0.5h (was: 20m)
> Nested iterable in state cache exceeds Python recursion limit
> -------------------------------------------------------------
>
> Key: BEAM-10260
> URL: https://issues.apache.org/jira/browse/BEAM-10260
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-harness
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Priority: P2
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> With state caching enabled, the changes in BEAM-8298 lead to an unbounded
> nesting of state iterables when appending to a bag state, quickly resulting
> in an exception:
> {noformat}
> events = list(event_bag.read())
> File
> "/code/venvs/venv/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 488, in __iter__ for elem in self.first:
> File
> "/code/venvs/venv/lib/python3.6/site-packages/apache_beam/runners/worker/statecache.py",
> line 186, in __iter__ for item in value:
> File
> "/code/venvs/venv/lib/python3.6/site-packages/apache_beam/runners/worker/statecache.py",
> line 186, in __iter__ for item in value:
> File
> "/code/venvs/venv/lib/python3.6/site-packages/apache_beam/runners/worker/statecache.py",
> line 186, in __iter__ for item in value: [Previous line repeated 977
> more times]
> RecursionError: maximum recursion depth exceeded while calling a Python
> object
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)