claudevdm commented on issue #25374: URL: https://github.com/apache/beam/issues/25374#issuecomment-2368352128
I looked into this further and it can be reproduced when `with_exception_handling(use_subprocess=True)` along with iterable side inputs. The problem comes from the usage of thead local variables in the state API [1] and [2]. - ParDo#with_exception_handling(use_subprocess=True) creates a subprocess using a ProcessPoolExecutor [3]. - The thread local variable `_context` is set on the MainThread in the case of DirectRunner - When the subprocess tries to access the `_context` ThreadPoolExecutor tries to pipe it via a QueueFeederThread, that does not have access to the thread local variable since it was instantiated on the MainThread and the exception occurs To work around this try to join the necessary side inputs into the PCollection used in the unsafe DoFn in a step before using ParDo#with_exception_handling(use_subprocess=True). This way the subprocess should not try to use the state API in the subprocess. [1] https://github.com/apache/beam/blob/7c382df7d186d154f54d364ca3d4974d0c036fdd/sdks/python/apache_beam/runners/worker/sdk_worker.py#L1037 [2] https://github.com/apache/beam/blob/7c382df7d186d154f54d364ca3d4974d0c036fdd/sdks/python/apache_beam/runners/worker/sdk_worker.py#L1169 [3] https://github.com/apache/beam/blob/7c382df7d186d154f54d364ca3d4974d0c036fdd/sdks/python/apache_beam/transforms/core.py#L2512 On DirectRunner the stack trace is is slightly different `Traceback (most recent call last): File "/usr/local/google/home/cvandermerwe/github/beam/sdks/python/apache_beam/transforms/core.py", line 2525, in _call_remote return self._pool.submit(method, *args, **kwargs).result( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/lib/python3.11/concurrent/futures/_base.py", line 456, in result return self.__get_result() ^^^^^^^^^^^^^^^^^^^ File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result raise self._exception File "/usr/lib/python3.11/multiprocessing/queues.py", line 244, in _feed obj = _ForkingPickler.dumps(obj) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/lib/python3.11/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) File "/usr/local/google/home/cvandermerwe/github/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 373, in __reduce__ return list, (list(self), ) ^^^^^^^^^^ File "/usr/local/google/home/cvandermerwe/github/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 370, in __iter__ self._state_handler.blocking_get(self._state_key, self._coder_impl)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/google/home/cvandermerwe/github/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 1213, in blocking_get cache_token = self._get_cache_token(state_key) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/google/home/cvandermerwe/github/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 1340, in _get_cache_token return self._context.side_input_cache_tokens.get( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AttributeError: '_thread._local' object has no attribute 'side_input_cache_tokens'` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
