[
https://issues.apache.org/jira/browse/BEAM-3956?focusedWorklogId=87402&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87402
]
ASF GitHub Bot logged work on BEAM-3956:
----------------------------------------
Author: ASF GitHub Bot
Created on: 04/Apr/18 01:50
Start Date: 04/Apr/18 01:50
Worklog Time Spent: 10m
Work Description: shoyer commented on issue #4959: [BEAM-3956] Preserve
stacktraces for Python exceptions
URL: https://github.com/apache/beam/pull/4959#issuecomment-378453995
@robertwb Indeed, `six.reraise()` is a definite improvement! It results in
the original stacktrace being directly included as part of the result
stacktrace:
<details>
```python-stacktrace
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-1-2377312c7b45> in <module>()
8 raise ValueError('internal failure!')
9
---> 10 [1, 2, 3] | beam.Map(f)
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/transforms/ptransform.pyc
in __ror__(self, left, label)
491 _allocate_materialized_pipeline(p)
492 materialized_result =
_AddMaterializationTransforms().visit(result)
--> 493 p.run().wait_until_finish()
494 _release_materialized_pipeline(p)
495 return _FinalizeMaterialization().visit(materialized_result)
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/pipeline.pyc
in run(self, test_runner_api)
387 if test_runner_api and self._verify_runner_api_compatible():
388 return Pipeline.from_runner_api(
--> 389 self.to_runner_api(), self.runner,
self._options).run(False)
390
391 if self._options.view_as(TypeOptions).runtime_type_check:
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/pipeline.pyc
in run(self, test_runner_api)
400 finally:
401 shutil.rmtree(tmpdir)
--> 402 return self.runner.run_pipeline(self)
403
404 def __enter__(self):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/direct/direct_runner.pyc
in run_pipeline(self, pipeline)
133 runner = BundleBasedDirectRunner()
134
--> 135 return runner.run_pipeline(pipeline)
136
137
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
in run_pipeline(self, pipeline)
213 from apache_beam.runners.dataflow.dataflow_runner import
DataflowRunner
214 pipeline.visit(DataflowRunner.group_by_key_input_visitor())
--> 215 return self.run_via_runner_api(pipeline.to_runner_api())
216
217 def run_via_runner_api(self, pipeline_proto):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
in run_via_runner_api(self, pipeline_proto)
216
217 def run_via_runner_api(self, pipeline_proto):
--> 218 return self.run_stages(*self.create_stages(pipeline_proto))
219
220 def create_stages(self, pipeline_proto):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
in run_stages(self, pipeline_components, stages, safe_coders)
835 metrics_by_stage[stage.name] = self.run_stage(
836 controller, pipeline_components, stage,
--> 837 pcoll_buffers, safe_coders).process_bundle.metrics
838 finally:
839 controller.close()
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
in run_stage(self, controller, pipeline_components, stage, pcoll_buffers,
safe_coders)
936 return BundleManager(
937 controller, get_buffer, process_bundle_descriptor,
--> 938 self._progress_frequency).process_bundle(data_input,
data_output)
939
940 # These classes are used to interact with the worker.
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
in process_bundle(self, inputs, expected_outputs)
1108 process_bundle=beam_fn_api_pb2.ProcessBundleRequest(
1109
process_bundle_descriptor_reference=self._bundle_descriptor.id))
-> 1110 result_future =
self._controller.control_handler.push(process_bundle)
1111
1112 with ProgressRequester(
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
in push(self, request)
1001 request.instruction_id = 'control_%s' % self._uid_counter
1002 logging.debug('CONTROL REQUEST %s', request)
-> 1003 response = self.worker.do_instruction(request)
1004 logging.debug('CONTROL RESPONSE %s', response)
1005 return ControlFuture(request.instruction_id, response)
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/sdk_worker.pyc
in do_instruction(self, request)
199 # E.g. if register is set, this will call
self.register(request.register))
200 return getattr(self, request_type)(getattr(request,
request_type),
--> 201 request.instruction_id)
202 else:
203 raise NotImplementedError
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/sdk_worker.pyc
in process_bundle(self, request, instruction_id)
216 self.state_handler, self.data_channel_factory)
217 try:
--> 218 processor.process_bundle(instruction_id)
219 finally:
220 del self.bundle_processors[instruction_id]
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/bundle_processor.pyc
in process_bundle(self, instruction_id)
284 for op in reversed(self.ops.values()):
285 logging.info('start %s', op)
--> 286 op.start()
287
288 # Inject inputs from data plane.
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
in start(self)
236 else:
237 windowed_value =
_globally_windowed_value.with_value(value)
--> 238 self.output(windowed_value)
239
240
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
in output(self, windowed_value, output_index)
157
158 def output(self, windowed_value, output_index=0):
--> 159 cython.cast(Receiver,
self.receivers[output_index]).receive(windowed_value)
160
161 def add_receiver(self, operation, output_index=0):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
in receive(self, windowed_value)
83 self.update_counters_start(windowed_value)
84 for consumer in self.consumers:
---> 85 cython.cast(Operation, consumer).process(windowed_value)
86 self.update_counters_finish()
87
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
in process(self, o)
390 def process(self, o):
391 with self.scoped_process_state:
--> 392 self.dofn_receiver.receive(o)
393
394 def progress_metrics(self):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
in receive(self, windowed_value)
486
487 def receive(self, windowed_value):
--> 488 self.process(windowed_value)
489
490 def process(self, windowed_value):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
in process(self, windowed_value)
494 self.do_fn_invoker.invoke_process(windowed_value)
495 except
BaseEx---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-1-2377312c7b45> in <module>()
8 raise ValueError('internal failure!')
9
---> 10 [1, 2, 3] | beam.Map(f)
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/transforms/ptransform.pyc
in __ror__(self, left, label)
491 _allocate_materialized_pipeline(p)
492 materialized_result =
_AddMaterializationTransforms().visit(result)
--> 493 p.run().wait_until_finish()
494 _release_materialized_pipeline(p)
495 return _FinalizeMaterialization().visit(materialized_result)
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/pipeline.pyc
in run(self, test_runner_api)
387 if test_runner_api and self._verify_runner_api_compatible():
388 return Pipeline.from_runner_api(
--> 389 self.to_runner_api(), self.runner,
self._options).run(False)
390
391 if self._options.view_as(TypeOptions).runtime_type_check:
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/pipeline.pyc
in run(self, test_runner_api)
400 finally:
401 shutil.rmtree(tmpdir)
--> 402 return self.runner.run_pipeline(self)
403
404 def __enter__(self):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/direct/direct_runner.pyc
in run_pipeline(self, pipeline)
133 runner = BundleBasedDirectRunner()
134
--> 135 return runner.run_pipeline(pipeline)
136
137
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
in run_pipeline(self, pipeline)
213 from apache_beam.runners.dataflow.dataflow_runner import
DataflowRunner
214 pipeline.visit(DataflowRunner.group_by_key_input_visitor())
--> 215 return self.run_via_runner_api(pipeline.to_runner_api())
216
217 def run_via_runner_api(self, pipeline_proto):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
in run_via_runner_api(self, pipeline_proto)
216
217 def run_via_runner_api(self, pipeline_proto):
--> 218 return self.run_stages(*self.create_stages(pipeline_proto))
219
220 def create_stages(self, pipeline_proto):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
in run_stages(self, pipeline_components, stages, safe_coders)
835 metrics_by_stage[stage.name] = self.run_stage(
836 controller, pipeline_components, stage,
--> 837 pcoll_buffers, safe_coders).process_bundle.metrics
838 finally:
839 controller.close()
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
in run_stage(self, controller, pipeline_components, stage, pcoll_buffers,
safe_coders)
936 return BundleManager(
937 controller, get_buffer, process_bundle_descriptor,
--> 938 self._progress_frequency).process_bundle(data_input,
data_output)
939
940 # These classes are used to interact with the worker.
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
in process_bundle(self, inputs, expected_outputs)
1108 process_bundle=beam_fn_api_pb2.ProcessBundleRequest(
1109
process_bundle_descriptor_reference=self._bundle_descriptor.id))
-> 1110 result_future =
self._controller.control_handler.push(process_bundle)
1111
1112 with ProgressRequester(
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
in push(self, request)
1001 request.instruction_id = 'control_%s' % self._uid_counter
1002 logging.debug('CONTROL REQUEST %s', request)
-> 1003 response = self.worker.do_instruction(request)
1004 logging.debug('CONTROL RESPONSE %s', response)
1005 return ControlFuture(request.instruction_id, response)
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/sdk_worker.pyc
in do_instruction(self, request)
199 # E.g. if register is set, this will call
self.register(request.register))
200 return getattr(self, request_type)(getattr(request,
request_type),
--> 201 request.instruction_id)
202 else:
203 raise NotImplementedError
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/sdk_worker.pyc
in process_bundle(self, request, instruction_id)
216 self.state_handler, self.data_channel_factory)
217 try:
--> 218 processor.process_bundle(instruction_id)
219 finally:
220 del self.bundle_processors[instruction_id]
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/bundle_processor.pyc
in process_bundle(self, instruction_id)
284 for op in reversed(self.ops.values()):
285 logging.info('start %s', op)
--> 286 op.start()
287
288 # Inject inputs from data plane.
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
in start(self)
236 else:
237 windowed_value =
_globally_windowed_value.with_value(value)
--> 238 self.output(windowed_value)
239
240
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
in output(self, windowed_value, output_index)
157
158 def output(self, windowed_value, output_index=0):
--> 159 cython.cast(Receiver,
self.receivers[output_index]).receive(windowed_value)
160
161 def add_receiver(self, operation, output_index=0):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
in receive(self, windowed_value)
83 self.update_counters_start(windowed_value)
84 for consumer in self.consumers:
---> 85 cython.cast(Operation, consumer).process(windowed_value)
86 self.update_counters_finish()
87
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
in process(self, o)
390 def process(self, o):
391 with self.scoped_process_state:
--> 392 self.dofn_receiver.receive(o)
393
394 def progress_metrics(self):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
in receive(self, windowed_value)
486
487 def receive(self, windowed_value):
--> 488 self.process(windowed_value)
489
490 def process(self, windowed_value):
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
in process(self, windowed_value)
494 self.do_fn_invoker.invoke_process(windowed_value)
495 except BaseException as exn:
--> 496 self._reraise_augmented(exn)
497 finally:
498 self.scoped_metrics_container.exit()
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
in _reraise_augmented(self, exn)
537 + step_annotation)
538 new_exn._tagged_with_step = True
--> 539 six.reraise(type(new_exn), new_exn, original_traceback)
540
541
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
in process(self, windowed_value)
492 self.logging_context.enter()
493 self.scoped_metrics_container.enter()
--> 494 self.do_fn_invoker.invoke_process(windowed_value)
495 except BaseException as exn:
496 self._reraise_augmented(exn)
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
in invoke_process(self, windowed_value, restriction_tracker, output_processor,
additional_args, additional_kwargs)
282 output_processor = self.output_processor
283 output_processor.process_outputs(
--> 284 windowed_value, self.process_method(windowed_value.value))
285
286
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/transforms/core.pyc
in <lambda>(x)
970 wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
971 else:
--> 972 wrapper = lambda x: [fn(x)]
973
974 label = 'Map(%s)' % ptransform.label_from_callable(fn)
<ipython-input-1-2377312c7b45> in f(x)
2
3 def f(x):
----> 4 return g(x)
5 def g(x):
6 return h(x)
<ipython-input-1-2377312c7b45> in g(x)
4 return g(x)
5 def g(x):
----> 6 return h(x)
7 def h(x):
8 raise ValueError('internal failure!')
<ipython-input-1-2377312c7b45> in h(x)
6 return h(x)
7 def h(x):
----> 8 raise ValueError('internal failure!')
9
10 [1, 2, 3] | beam.Map(f)
ValueError: internal failure! [while running 'Map(f)']
ception as exn:
--> 496 self._reraise_augmented(exn)
497 finally:
498 self.scoped_metrics_container.exit()
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
in _reraise_augmented(self, exn)
537 + step_annotation)
538 new_exn._tagged_with_step = True
--> 539 six.reraise(type(new_exn), new_exn, original_traceback)
540
541
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
in process(self, windowed_value)
492 self.logging_context.enter()
493 self.scoped_metrics_container.enter()
--> 494 self.do_fn_invoker.invoke_process(windowed_value)
495 except BaseException as exn:
496 self._reraise_augmented(exn)
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
in invoke_process(self, windowed_value, restriction_tracker, output_processor,
additional_args, additional_kwargs)
282 output_processor = self.output_processor
283 output_processor.process_outputs(
--> 284 windowed_value, self.process_method(windowed_value.value))
285
286
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/transforms/core.pyc
in <lambda>(x)
970 wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
971 else:
--> 972 wrapper = lambda x: [fn(x)]
973
974 label = 'Map(%s)' % ptransform.label_from_callable(fn)
<ipython-input-1-2377312c7b45> in f(x)
2
3 def f(x):
----> 4 return g(x)
5 def g(x):
6 return h(x)
<ipython-input-1-2377312c7b45> in g(x)
4 return g(x)
5 def g(x):
----> 6 return h(x)
7 def h(x):
8 raise ValueError('internal failure!')
<ipython-input-1-2377312c7b45> in h(x)
6 return h(x)
7 def h(x):
----> 8 raise ValueError('internal failure!')
9
10 [1, 2, 3] | beam.Map(f)
ValueError: internal failure! [while running 'Map(f)']
```
</details>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 87402)
Time Spent: 2h 50m (was: 2h 40m)
> Stacktraces from exceptions in user code should be preserved in the Python SDK
> ------------------------------------------------------------------------------
>
> Key: BEAM-3956
> URL: https://issues.apache.org/jira/browse/BEAM-3956
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Stephan Hoyer
> Priority: Major
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
> Currently, Beam's Python SDK loses stacktraces for exceptions. It does
> helpfully add a tag like "[while running StageA]" to exception error
> messages, but that doesn't include the stacktrace of Python functions being
> called.
> Including the full stacktraces would make a big difference for the ease of
> debugging Beam pipelines when things go wrong.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)