cozos opened a new issue, #25374:
URL: https://github.com/apache/beam/issues/25374

   ### What happened?
   
   Using `ParDo#with_exception_handling(use_subprocess=True)` seems to fail 
because it relies on `_context.process_instruction_id` which doesn't exist in 
the process.
   
   ```
   Traceback (most recent call last):
    File 
"/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/multiprocessing/queues.py",
 line 236, in _feed
    obj = _ForkingPickler.dumps(obj)
    File 
"/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/multiprocessing/reduction.py",
 line 51, in dumps
    cls(buf, protocol).dump(obj)
    File "apache_beam/coders/coder_impl.py", line 1035, in 
apache_beam.coders.coder_impl._ConcatSequence.__reduce__
    File "apache_beam/coders/coder_impl.py", line 1025, in __iter__
    File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 1184, in _lazy_iterator
    self._underlying.get_raw(state_key, continuation_token))
    File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 986, in get_raw
    continuation_token=continuation_token)))
    File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 1019, in _blocking_request
    req_future = self._request(request)
    File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 1009, in _request
    request.instruction_id = self._context.process_instruction_id
   AttributeError: \'_thread._local\' object has no attribute 
\'process_instruction_id\'
   """
   ', '
   The above exception was the direct cause of the following exception:
   
   ', 'Traceback (most recent call last):
   ', ' File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/transforms/core.py",
 line 1895, in process
    result = self._fn.process(*args, **kwargs)
   ', ' File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/transforms/core.py",
 line 1933, in process
    return self._call_remote(self._remote_process, *args, **kwargs)
   ', ' File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/transforms/core.py",
 line 1948, in _call_remote
    return self._pool.submit(method, *args, **kwargs).result()
   ', ' File 
"/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/concurrent/futures/_base.py",
 line 432, in result
    return self.__get_result()
   ', ' File 
"/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/concurrent/futures/_base.py",
 line 384, in __get_result
    raise self._exception
   ', ' File 
"/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/multiprocessing/queues.py",
 line 236, in _feed
    obj = _ForkingPickler.dumps(obj)
   ', ' File 
"/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/multiprocessing/reduction.py",
 line 51, in dumps
    cls(buf, protocol).dump(obj)
   ', ' File "apache_beam/coders/coder_impl.py", line 1035, in 
apache_beam.coders.coder_impl._ConcatSequence.__reduce__
   ', ' File "apache_beam/coders/coder_impl.py", line 1025, in __iter__
   ', ' File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 1184, in _lazy_iterator
    self._underlying.get_raw(state_key, continuation_token))
   ', ' File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 986, in get_raw
    continuation_token=continuation_token)))
   ', ' File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 1019, in _blocking_request
    req_future = self._request(request)
   ', ' File 
"/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py",
 line 1009, in _request
    request.instruction_id = self._context.process_instruction_id
   ', "AttributeError: '_thread._local' object has no attribute 
'process_instruction_id'
   ```
   
   I will come up with reproducible example
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [X] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to