claudevdm commented on issue #25374:
URL: https://github.com/apache/beam/issues/25374#issuecomment-2368352128

   I looked into this further and it can be reproduced when 
`with_exception_handling(use_subprocess=True)` along with iterable side inputs.
   
   The problem comes from the usage of thead local variables in the state API 
[1] and [2]. 
   
   - ParDo#with_exception_handling(use_subprocess=True) creates a subprocess 
using  a ProcessPoolExecutor [3].
   - The thread local variable `_context` is set on the MainThread in the case 
of DirectRunner
   - When the subprocess tries to access the `_context` ThreadPoolExecutor 
tries to pipe it via a QueueFeederThread, that does not have access to the 
thread local variable since it was instantiated on the MainThread and the 
exception occurs
   
   To work around this try to join the necessary side inputs into the 
PCollection used in the unsafe DoFn in a step before using 
ParDo#with_exception_handling(use_subprocess=True). This way the subprocess 
should not try to use the state API in the subprocess.  
   
   [1] 
https://github.com/apache/beam/blob/7c382df7d186d154f54d364ca3d4974d0c036fdd/sdks/python/apache_beam/runners/worker/sdk_worker.py#L1037
   [2] 
https://github.com/apache/beam/blob/7c382df7d186d154f54d364ca3d4974d0c036fdd/sdks/python/apache_beam/runners/worker/sdk_worker.py#L1169
   [3] 
https://github.com/apache/beam/blob/7c382df7d186d154f54d364ca3d4974d0c036fdd/sdks/python/apache_beam/transforms/core.py#L2512
   
   On DirectRunner the stack trace is is slightly different
   `Traceback (most recent call last):
     File 
"/usr/local/google/home/cvandermerwe/github/beam/sdks/python/apache_beam/transforms/core.py",
 line 2525, in _call_remote
       return self._pool.submit(method, *args, **kwargs).result(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib/python3.11/concurrent/futures/_base.py", line 456, in result
       return self.__get_result()
              ^^^^^^^^^^^^^^^^^^^
     File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in 
__get_result
       raise self._exception
     File "/usr/lib/python3.11/multiprocessing/queues.py", line 244, in _feed
       obj = _ForkingPickler.dumps(obj)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib/python3.11/multiprocessing/reduction.py", line 51, in dumps
       cls(buf, protocol).dump(obj)
     File 
"/usr/local/google/home/cvandermerwe/github/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py",
 line 373, in __reduce__
       return list, (list(self), )
                     ^^^^^^^^^^
     File 
"/usr/local/google/home/cvandermerwe/github/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py",
 line 370, in __iter__
       self._state_handler.blocking_get(self._state_key, self._coder_impl))
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/google/home/cvandermerwe/github/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 1213, in blocking_get
       cache_token = self._get_cache_token(state_key)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/google/home/cvandermerwe/github/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
 line 1340, in _get_cache_token
       return self._context.side_input_cache_tokens.get(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   AttributeError: '_thread._local' object has no attribute 
'side_input_cache_tokens'`
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to