[ 
https://issues.apache.org/jira/browse/BEAM-10260?focusedWorklogId=446627&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-446627
 ]

ASF GitHub Bot logged work on BEAM-10260:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Jun/20 16:25
            Start Date: 16/Jun/20 16:25
    Worklog Time Spent: 10m 
      Work Description: tweise commented on a change in pull request #12012:
URL: https://github.com/apache/beam/pull/12012#discussion_r440981766



##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -941,15 +942,29 @@ def extend(self,
              is_cached=False
             ):
     # type: (...) -> _Future
+    # Make sure the input is a list of elements
+    elements = list(elements)
     cache_token = self._get_cache_token(state_key, is_cached)
     if cache_token:
       # Update the cache
       cache_key = self._convert_to_cache_key(state_key)
-      if self._state_cache.get(cache_key, cache_token) is None:
-        # We have never cached this key before, first initialize cache
-        self.blocking_get(state_key, coder, is_cached=True)
-      # Now update the values in the cache
-      self._state_cache.extend(cache_key, cache_token, elements)
+      cached_value = self._state_cache.get(cache_key, cache_token)
+      # Keep in mind that the state for this key can be evicted
+      # while executing this function. Either read or write to the cache
+      # but never do both here!
+      if cached_value is None:
+        # We have never cached this key before, first retrieve state
+        cached_value = self.blocking_get(state_key, coder)
+      # Just extend the already cached value
+      if isinstance(cached_value, list):
+        # The state is fully cached and can be extended
+        cached_value.extend(elements)
+      elif isinstance(cached_value, itertools.chain):
+        # The state is too large to be fully cached (continuation token used),

Review comment:
       Makes sense, I missed that the new elements are still written to the 
state handler.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 446627)
    Time Spent: 1.5h  (was: 1h 20m)

> Nested iterable in state cache exceeds Python recursion limit
> -------------------------------------------------------------
>
>                 Key: BEAM-10260
>                 URL: https://issues.apache.org/jira/browse/BEAM-10260
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-harness
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: P2
>             Fix For: 2.23.0
>
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> With state caching enabled, the changes in BEAM-8298 lead to an unbounded 
> nesting of state iterables when appending to a bag state, quickly resulting 
> in an exception:
> {noformat}
> events = list(event_bag.read())  
> File 
> "/code/venvs/venv/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 488, in __iter__    for elem in self.first:  
> File 
> "/code/venvs/venv/lib/python3.6/site-packages/apache_beam/runners/worker/statecache.py",
>  line 186, in __iter__    for item in value:  
> File 
> "/code/venvs/venv/lib/python3.6/site-packages/apache_beam/runners/worker/statecache.py",
>  line 186, in __iter__    for item in value:  
> File 
> "/code/venvs/venv/lib/python3.6/site-packages/apache_beam/runners/worker/statecache.py",
>  line 186, in __iter__    for item in value:  [Previous line repeated 977 
> more times]
> RecursionError: maximum recursion depth exceeded while calling a Python 
> object 
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to