[
https://issues.apache.org/jira/browse/BEAM-3956?focusedWorklogId=87405&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87405
]
ASF GitHub Bot logged work on BEAM-3956:
----------------------------------------
Author: ASF GitHub Bot
Created on: 04/Apr/18 02:20
Start Date: 04/Apr/18 02:20
Worklog Time Spent: 10m
Work Description: shoyer commented on issue #4959: [BEAM-3956] Preserve
stacktraces for Python exceptions
URL: https://github.com/apache/beam/pull/4959#issuecomment-378453120
> Thank you for this change! Do you have an example exception message with
these changes that includes the stack trace?
Sure, let me give a full example. Suppose we run the following code:
```
import apache_beam as beam
def f(x):
return g(x)
def g(x):
return h(x)
def h(x):
raise ValueError('internal failure!')
[1, 2, 3] | beam.Map(f)
```
With current beam, I get a long traceback, but it only references internal
details of beam, not anything about my code (it includes none of my internal
function calls to `f()`, `g()` or `h()`):
<details>
```python-traceback
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-1-2377312c7b45> in <module>()
8 raise ValueError('internal failure!')
9
---> 10 [1, 2, 3] | beam.Map(f)
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/transforms/ptransform.pyc
in __ror__(self, left, label)
491 _allocate_materialized_pipeline(p)
492 materialized_result =
_AddMaterializationTransforms().visit(result)
--> 493 p.run().wait_until_finish()
494 _release_materialized_pipeline(p)
495 return _FinalizeMaterialization().visit(materialized_result)
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/pipeline.pyc
in run(self, test_runner_api)
387 if test_runner_api and self._verify_runner_api_compatible():
388 return Pipeline.from_runner_api(
--> 389 self.to_runner_api(), self.runner,
self._options).run(False)
390
391 if self._options.view_as(TypeOptions).runtime_type_check:
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/pipeline.pyc
in run(self, test_runner_api)
400 finally:
401 shutil.rmtree(tmpdir)
--> 402 return self.runner.run_pipeline(self)
403
404 def __enter__(self):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/direct/direct_runner.pyc
in run_pipeline(self, pipeline)
133 runner = BundleBasedDirectRunner()
134
--> 135 return runner.run_pipeline(pipeline)
136
137
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
in run_pipeline(self, pipeline)
213 from apache_beam.runners.dataflow.dataflow_runner import
DataflowRunner
214 pipeline.visit(DataflowRunner.group_by_key_input_visitor())
--> 215 return self.run_via_runner_api(pipeline.to_runner_api())
216
217 def run_via_runner_api(self, pipeline_proto):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
in run_via_runner_api(self, pipeline_proto)
216
217 def run_via_runner_api(self, pipeline_proto):
--> 218 return self.run_stages(*self.create_stages(pipeline_proto))
219
220 def create_stages(self, pipeline_proto):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
in run_stages(self, pipeline_components, stages, safe_coders)
835 metrics_by_stage[stage.name] = self.run_stage(
836 controller, pipeline_components, stage,
--> 837 pcoll_buffers, safe_coders).process_bundle.metrics
838 finally:
839 controller.close()
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
in run_stage(self, controller, pipeline_components, stage, pcoll_buffers,
safe_coders)
936 return BundleManager(
937 controller, get_buffer, process_bundle_descriptor,
--> 938 self._progress_frequency).process_bundle(data_input,
data_output)
939
940 # These classes are used to interact with the worker.
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
in process_bundle(self, inputs, expected_outputs)
1108 process_bundle=beam_fn_api_pb2.ProcessBundleRequest(
1109
process_bundle_descriptor_reference=self._bundle_descriptor.id))
-> 1110 result_future =
self._controller.control_handler.push(process_bundle)
1111
1112 with ProgressRequester(
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
in push(self, request)
1001 request.instruction_id = 'control_%s' % self._uid_counter
1002 logging.debug('CONTROL REQUEST %s', request)
-> 1003 response = self.worker.do_instruction(request)
1004 logging.debug('CONTROL RESPONSE %s', response)
1005 return ControlFuture(request.instruction_id, response)
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/sdk_worker.pyc
in do_instruction(self, request)
199 # E.g. if register is set, this will call
self.register(request.register))
200 return getattr(self, request_type)(getattr(request,
request_type),
--> 201 request.instruction_id)
202 else:
203 raise NotImplementedError
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/sdk_worker.pyc
in process_bundle(self, request, instruction_id)
216 self.state_handler, self.data_channel_factory)
217 try:
--> 218 processor.process_bundle(instruction_id)
219 finally:
220 del self.bundle_processors[instruction_id]
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/bundle_processor.pyc
in process_bundle(self, instruction_id)
284 for op in reversed(self.ops.values()):
285 logging.info('start %s', op)
--> 286 op.start()
287
288 # Inject inputs from data plane.
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
in start(self)
236 else:
237 windowed_value =
_globally_windowed_value.with_value(value)
--> 238 self.output(windowed_value)
239
240
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
in output(self, windowed_value, output_index)
157
158 def output(self, windowed_value, output_index=0):
--> 159 cython.cast(Receiver,
self.receivers[output_index]).receive(windowed_value)
160
161 def add_receiver(self, operation, output_index=0):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
in receive(self, windowed_value)
83 self.update_counters_start(windowed_value)
84 for consumer in self.consumers:
---> 85 cython.cast(Operation, consumer).process(windowed_value)
86 self.update_counters_finish()
87
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
in process(self, o)
390 def process(self, o):
391 with self.scoped_process_state:
--> 392 self.dofn_receiver.receive(o)
393
394 def progress_metrics(self):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
in receive(self, windowed_value)
486
487 def receive(self, windowed_value):
--> 488 self.process(windowed_value)
489
490 def process(self, windowed_value):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
in process(self, windowed_value)
494 self.do_fn_invoker.invoke_process(windowed_value)
495 except BaseException as exn:
--> 496 self._reraise_augmented(exn)
497 finally:
498 self.scoped_metrics_container.exit()
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
in _reraise_augmented(self, exn)
535 + step_annotation)
536 new_exn._tagged_with_step = True
--> 537 six.raise_from(new_exn, original_traceback)
538
539
/usr/local/google/home/shoyer/miniconda3/envs/beam-dev/lib/python2.7/site-packages/six.pyc
in raise_from(value, from_value)
735 else:
736 def raise_from(value, from_value):
--> 737 raise value
738
739
ValueError: internal failure! [while running 'Map(f)']
```
</details>
With this change, the original traceback is preserved at the end:
<details>
```python-traceback
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-1-2377312c7b45> in <module>()
8 raise ValueError('internal failure!')
9
---> 10 [1, 2, 3] | beam.Map(f)
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/transforms/ptransform.pyc
in __ror__(self, left, label)
491 _allocate_materialized_pipeline(p)
492 materialized_result =
_AddMaterializationTransforms().visit(result)
--> 493 p.run().wait_until_finish()
494 _release_materialized_pipeline(p)
495 return _FinalizeMaterialization().visit(materialized_result)
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/pipeline.pyc
in run(self, test_runner_api)
387 if test_runner_api and self._verify_runner_api_compatible():
388 return Pipeline.from_runner_api(
--> 389 self.to_runner_api(), self.runner,
self._options).run(False)
390
391 if self._options.view_as(TypeOptions).runtime_type_check:
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/pipeline.pyc
in run(self, test_runner_api)
400 finally:
401 shutil.rmtree(tmpdir)
--> 402 return self.runner.run_pipeline(self)
403
404 def __enter__(self):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/direct/direct_runner.pyc
in run_pipeline(self, pipeline)
133 runner = BundleBasedDirectRunner()
134
--> 135 return runner.run_pipeline(pipeline)
136
137
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py
in run_pipeline(self, pipeline)
213 from apache_beam.runners.dataflow.dataflow_runner import
DataflowRunner
214 pipeline.visit(DataflowRunner.group_by_key_input_visitor())
--> 215 return self.run_via_runner_api(pipeline.to_runner_api())
216
217 def run_via_runner_api(self, pipeline_proto):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py
in run_via_runner_api(self, pipeline_proto)
216
217 def run_via_runner_api(self, pipeline_proto):
--> 218 return self.run_stages(*self.create_stages(pipeline_proto))
219
220 def create_stages(self, pipeline_proto):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py
in run_stages(self, pipeline_components, stages, safe_coders)
835 metrics_by_stage[stage.name] = self.run_stage(
836 controller, pipeline_components, stage,
--> 837 pcoll_buffers, safe_coders).process_bundle.metrics
838 finally:
839 controller.close()
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py
in run_stage(self, controller, pipeline_components, stage, pcoll_buffers,
safe_coders)
936 return BundleManager(
937 controller, get_buffer, process_bundle_descriptor,
--> 938 self._progress_frequency).process_bundle(data_input,
data_output)
939
940 # These classes are used to interact with the worker.
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py
in process_bundle(self, inputs, expected_outputs)
1108 process_bundle=beam_fn_api_pb2.ProcessBundleRequest(
1109
process_bundle_descriptor_reference=self._bundle_descriptor.id))
-> 1110 result_future =
self._controller.control_handler.push(process_bundle)
1111
1112 with ProgressRequester(
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py
in push(self, request)
1001 request.instruction_id = 'control_%s' % self._uid_counter
1002 logging.debug('CONTROL REQUEST %s', request)
-> 1003 response = self.worker.do_instruction(request)
1004 logging.debug('CONTROL RESPONSE %s', response)
1005 return ControlFuture(request.instruction_id, response)
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py
in do_instruction(self, request)
199 # E.g. if register is set, this will call
self.register(request.register))
200 return getattr(self, request_type)(getattr(request,
request_type),
--> 201 request.instruction_id)
202 else:
203 raise NotImplementedError
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py
in process_bundle(self, request, instruction_id)
216 self.state_handler, self.data_channel_factory)
217 try:
--> 218 processor.process_bundle(instruction_id)
219 finally:
220 del self.bundle_processors[instruction_id]
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py
in process_bundle(self, instruction_id)
284 for op in reversed(self.ops.values()):
285 logging.info('start %s', op)
--> 286 op.start()
287
288 # Inject inputs from data plane.
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
in start(self)
236 else:
237 windowed_value =
_globally_windowed_value.with_value(value)
--> 238 self.output(windowed_value)
239
240
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
in output(self, windowed_value, output_index)
157
158 def output(self, windowed_value, output_index=0):
--> 159 cython.cast(Receiver,
self.receivers[output_index]).receive(windowed_value)
160
161 def add_receiver(self, operation, output_index=0):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
in receive(self, windowed_value)
83 self.update_counters_start(windowed_value)
84 for consumer in self.consumers:
---> 85 cython.cast(Operation, consumer).process(windowed_value)
86 self.update_counters_finish()
87
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
in process(self, o)
390 def process(self, o):
391 with self.scoped_process_state:
--> 392 self.dofn_receiver.receive(o)
393
394 def progress_metrics(self):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
in receive(self, windowed_value)
486
487 def receive(self, windowed_value):
--> 488 self.process(windowed_value)
489
490 def process(self, windowed_value):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
in process(self, windowed_value)
494 self.do_fn_invoker.invoke_process(windowed_value)
495 except BaseException as exn:
--> 496 self._reraise_augmented(exn)
497 finally:
498 self.scoped_metrics_container.exit()
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
in _reraise_augmented(self, exn)
538 + step_annotation + stacktrace_text)
539 new_exn._tagged_with_step = True
--> 540 six.raise_from(new_exn, original_traceback)
541
542
/usr/local/google/home/shoyer/miniconda3/envs/beam-dev/lib/python2.7/site-packages/six.pyc
in raise_from(value, from_value)
735 else:
736 def raise_from(value, from_value):
--> 737 raise value
738
739
ValueError: internal failure! [while running 'Map(f)']
Traceback (most recent call last):
File
"/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py",
line 494, in process
self.do_fn_invoker.invoke_process(windowed_value)
File
"/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py",
line 284, in invoke_process
windowed_value, self.process_method(windowed_value.value))
File
"/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/transforms/core.py",
line 972, in <lambda>
wrapper = lambda x: [fn(x)]
File "<ipython-input-1-2377312c7b45>", line 4, in f
return g(x)
File "<ipython-input-1-2377312c7b45>", line 6, in g
return h(x)
File "<ipython-input-1-2377312c7b45>", line 8, in h
raise ValueError('internal failure!')
ValueError: internal failure!
```
</details>
@robertwb I will try rewriting this to use `six.reraise()` instead. That
might be a cleaner way to do this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 87405)
Time Spent: 3h 10m (was: 3h)
> Stacktraces from exceptions in user code should be preserved in the Python SDK
> ------------------------------------------------------------------------------
>
> Key: BEAM-3956
> URL: https://issues.apache.org/jira/browse/BEAM-3956
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Stephan Hoyer
> Priority: Major
> Time Spent: 3h 10m
> Remaining Estimate: 0h
>
> Currently, Beam's Python SDK loses stacktraces for exceptions. It does
> helpfully add a tag like "[while running StageA]" to exception error
> messages, but that doesn't include the stacktrace of Python functions being
> called.
> Including the full stacktraces would make a big difference for the ease of
> debugging Beam pipelines when things go wrong.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)