cozos opened a new issue, #25374:
URL: https://github.com/apache/beam/issues/25374
### What happened?
Using `ParDo#with_exception_handling(use_subprocess=True)` seems to fail
because it relies on `_context.process_instruction_id` which doesn't exist in
the process.
```
Traceback (most recent call last):
File
"/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/multiprocessing/queues.py",
line 236, in _feed
obj = _ForkingPickler.dumps(obj)
File
"/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
File "apache_beam/coders/coder_impl.py", line 1035, in
apache_beam.coders.coder_impl._ConcatSequence.__reduce__
File "apache_beam/coders/coder_impl.py", line 1025, in __iter__
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 1184, in _lazy_iterator
self._underlying.get_raw(state_key, continuation_token))
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 986, in get_raw
continuation_token=continuation_token)))
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 1019, in _blocking_request
req_future = self._request(request)
File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 1009, in _request
request.instruction_id = self._context.process_instruction_id
AttributeError: \'_thread._local\' object has no attribute
\'process_instruction_id\'
"""
', '
The above exception was the direct cause of the following exception:
', 'Traceback (most recent call last):
', ' File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/transforms/core.py",
line 1895, in process
result = self._fn.process(*args, **kwargs)
', ' File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/transforms/core.py",
line 1933, in process
return self._call_remote(self._remote_process, *args, **kwargs)
', ' File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/transforms/core.py",
line 1948, in _call_remote
return self._pool.submit(method, *args, **kwargs).result()
', ' File
"/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/concurrent/futures/_base.py",
line 432, in result
return self.__get_result()
', ' File
"/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/concurrent/futures/_base.py",
line 384, in __get_result
raise self._exception
', ' File
"/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/multiprocessing/queues.py",
line 236, in _feed
obj = _ForkingPickler.dumps(obj)
', ' File
"/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/multiprocessing/reduction.py",
line 51, in dumps
cls(buf, protocol).dump(obj)
', ' File "apache_beam/coders/coder_impl.py", line 1035, in
apache_beam.coders.coder_impl._ConcatSequence.__reduce__
', ' File "apache_beam/coders/coder_impl.py", line 1025, in __iter__
', ' File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 1184, in _lazy_iterator
self._underlying.get_raw(state_key, continuation_token))
', ' File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 986, in get_raw
continuation_token=continuation_token)))
', ' File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 1019, in _blocking_request
req_future = self._request(request)
', ' File
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
line 1009, in _request
request.instruction_id = self._context.process_instruction_id
', "AttributeError: '_thread._local' object has no attribute
'process_instruction_id'
```
I will come up with reproducible example
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [X] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]