[ 
https://issues.apache.org/jira/browse/BEAM-3956?focusedWorklogId=87398&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87398
 ]

ASF GitHub Bot logged work on BEAM-3956:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Apr/18 01:44
            Start Date: 04/Apr/18 01:44
    Worklog Time Spent: 10m 
      Work Description: shoyer commented on issue #4959: [BEAM-3956] Preserve 
stacktraces for Python exceptions
URL: https://github.com/apache/beam/pull/4959#issuecomment-378453120
 
 
   > Thank you for this change! Do you have an example exception message with 
these changes that includes the stack trace?
   
   Sure, let me give a full example. Suppose we run the following code:
   ```
   import apache_beam as beam
   
   def f(x):
     return g(x)
   def g(x):
     return h(x)
   def h(x):
     raise ValueError('internal failure!')
     
   [1, 2, 3] | beam.Map(f)
   ```
   
   With current beam, I get a long traceback, but it only references internal 
details of beam, not anything about my code (it includes none of my internal 
function calls to `f()`, `g()` or `h()`):
   <details>
   
   ```python-traceback
   ---------------------------------------------------------------------------
   ValueError                                Traceback (most recent call last)
   <ipython-input-1-2377312c7b45> in <module>()
         8   raise ValueError('internal failure!')
         9 
   ---> 10 [1, 2, 3] | beam.Map(f)
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/transforms/ptransform.pyc
 in __ror__(self, left, label)
       491     _allocate_materialized_pipeline(p)
       492     materialized_result = 
_AddMaterializationTransforms().visit(result)
   --> 493     p.run().wait_until_finish()
       494     _release_materialized_pipeline(p)
       495     return _FinalizeMaterialization().visit(materialized_result)
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/pipeline.pyc
 in run(self, test_runner_api)
       387     if test_runner_api and self._verify_runner_api_compatible():
       388       return Pipeline.from_runner_api(
   --> 389           self.to_runner_api(), self.runner, 
self._options).run(False)
       390 
       391     if self._options.view_as(TypeOptions).runtime_type_check:
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/pipeline.pyc
 in run(self, test_runner_api)
       400       finally:
       401         shutil.rmtree(tmpdir)
   --> 402     return self.runner.run_pipeline(self)
       403 
       404   def __enter__(self):
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/direct/direct_runner.pyc
 in run_pipeline(self, pipeline)
       133       runner = BundleBasedDirectRunner()
       134 
   --> 135     return runner.run_pipeline(pipeline)
       136 
       137 
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
 in run_pipeline(self, pipeline)
       213     from apache_beam.runners.dataflow.dataflow_runner import 
DataflowRunner
       214     pipeline.visit(DataflowRunner.group_by_key_input_visitor())
   --> 215     return self.run_via_runner_api(pipeline.to_runner_api())
       216 
       217   def run_via_runner_api(self, pipeline_proto):
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
 in run_via_runner_api(self, pipeline_proto)
       216 
       217   def run_via_runner_api(self, pipeline_proto):
   --> 218     return self.run_stages(*self.create_stages(pipeline_proto))
       219 
       220   def create_stages(self, pipeline_proto):
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
 in run_stages(self, pipeline_components, stages, safe_coders)
       835         metrics_by_stage[stage.name] = self.run_stage(
       836             controller, pipeline_components, stage,
   --> 837             pcoll_buffers, safe_coders).process_bundle.metrics
       838     finally:
       839       controller.close()
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
 in run_stage(self, controller, pipeline_components, stage, pcoll_buffers, 
safe_coders)
       936     return BundleManager(
       937         controller, get_buffer, process_bundle_descriptor,
   --> 938         self._progress_frequency).process_bundle(data_input, 
data_output)
       939 
       940   # These classes are used to interact with the worker.
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
 in process_bundle(self, inputs, expected_outputs)
      1108         process_bundle=beam_fn_api_pb2.ProcessBundleRequest(
      1109             
process_bundle_descriptor_reference=self._bundle_descriptor.id))
   -> 1110     result_future = 
self._controller.control_handler.push(process_bundle)
      1111 
      1112     with ProgressRequester(
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.pyc
 in push(self, request)
      1001         request.instruction_id = 'control_%s' % self._uid_counter
      1002       logging.debug('CONTROL REQUEST %s', request)
   -> 1003       response = self.worker.do_instruction(request)
      1004       logging.debug('CONTROL RESPONSE %s', response)
      1005       return ControlFuture(request.instruction_id, response)
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/sdk_worker.pyc
 in do_instruction(self, request)
       199       # E.g. if register is set, this will call 
self.register(request.register))
       200       return getattr(self, request_type)(getattr(request, 
request_type),
   --> 201                                          request.instruction_id)
       202     else:
       203       raise NotImplementedError
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/sdk_worker.pyc
 in process_bundle(self, request, instruction_id)
       216             self.state_handler, self.data_channel_factory)
       217     try:
   --> 218       processor.process_bundle(instruction_id)
       219     finally:
       220       del self.bundle_processors[instruction_id]
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/bundle_processor.pyc
 in process_bundle(self, instruction_id)
       284       for op in reversed(self.ops.values()):
       285         logging.info('start %s', op)
   --> 286         op.start()
       287 
       288       # Inject inputs from data plane.
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
 in start(self)
       236           else:
       237             windowed_value = 
_globally_windowed_value.with_value(value)
   --> 238           self.output(windowed_value)
       239 
       240 
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
 in output(self, windowed_value, output_index)
       157 
       158   def output(self, windowed_value, output_index=0):
   --> 159     cython.cast(Receiver, 
self.receivers[output_index]).receive(windowed_value)
       160 
       161   def add_receiver(self, operation, output_index=0):
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
 in receive(self, windowed_value)
        83     self.update_counters_start(windowed_value)
        84     for consumer in self.consumers:
   ---> 85       cython.cast(Operation, consumer).process(windowed_value)
        86     self.update_counters_finish()
        87 
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
 in process(self, o)
       390   def process(self, o):
       391     with self.scoped_process_state:
   --> 392       self.dofn_receiver.receive(o)
       393 
       394   def progress_metrics(self):
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
 in receive(self, windowed_value)
       486 
       487   def receive(self, windowed_value):
   --> 488     self.process(windowed_value)
       489 
       490   def process(self, windowed_value):
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
 in process(self, windowed_value)
       494       self.do_fn_invoker.invoke_process(windowed_value)
       495     except BaseException as exn:
   --> 496       self._reraise_augmented(exn)
       497     finally:
       498       self.scoped_metrics_container.exit()
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
 in _reraise_augmented(self, exn)
       535           + step_annotation)
       536       new_exn._tagged_with_step = True
   --> 537     six.raise_from(new_exn, original_traceback)
       538 
       539 
   
   
/usr/local/google/home/shoyer/miniconda3/envs/beam-dev/lib/python2.7/site-packages/six.pyc
 in raise_from(value, from_value)
       735 else:
       736     def raise_from(value, from_value):
   --> 737         raise value
       738 
       739 
   
   ValueError: internal failure! [while running 'Map(f)']
   ```
   
   </details>
   
   With this change, the original traceback is preserved at the end:
   
   <details>
   
   ```python-traceback
   ---------------------------------------------------------------------------
   ValueError                                Traceback (most recent call last)
   <ipython-input-1-2377312c7b45> in <module>()
         8   raise ValueError('internal failure!')
         9 
   ---> 10 [1, 2, 3] | beam.Map(f)
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/transforms/ptransform.pyc
 in __ror__(self, left, label)
       491     _allocate_materialized_pipeline(p)
       492     materialized_result = 
_AddMaterializationTransforms().visit(result)
   --> 493     p.run().wait_until_finish()
       494     _release_materialized_pipeline(p)
       495     return _FinalizeMaterialization().visit(materialized_result)
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/pipeline.pyc
 in run(self, test_runner_api)
       387     if test_runner_api and self._verify_runner_api_compatible():
       388       return Pipeline.from_runner_api(
   --> 389           self.to_runner_api(), self.runner, 
self._options).run(False)
       390 
       391     if self._options.view_as(TypeOptions).runtime_type_check:
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/pipeline.pyc
 in run(self, test_runner_api)
       400       finally:
       401         shutil.rmtree(tmpdir)
   --> 402     return self.runner.run_pipeline(self)
       403 
       404   def __enter__(self):
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/direct/direct_runner.pyc
 in run_pipeline(self, pipeline)
       133       runner = BundleBasedDirectRunner()
       134 
   --> 135     return runner.run_pipeline(pipeline)
       136 
       137 
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py
 in run_pipeline(self, pipeline)
       213     from apache_beam.runners.dataflow.dataflow_runner import 
DataflowRunner
       214     pipeline.visit(DataflowRunner.group_by_key_input_visitor())
   --> 215     return self.run_via_runner_api(pipeline.to_runner_api())
       216 
       217   def run_via_runner_api(self, pipeline_proto):
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py
 in run_via_runner_api(self, pipeline_proto)
       216 
       217   def run_via_runner_api(self, pipeline_proto):
   --> 218     return self.run_stages(*self.create_stages(pipeline_proto))
       219 
       220   def create_stages(self, pipeline_proto):
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py
 in run_stages(self, pipeline_components, stages, safe_coders)
       835         metrics_by_stage[stage.name] = self.run_stage(
       836             controller, pipeline_components, stage,
   --> 837             pcoll_buffers, safe_coders).process_bundle.metrics
       838     finally:
       839       controller.close()
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py
 in run_stage(self, controller, pipeline_components, stage, pcoll_buffers, 
safe_coders)
       936     return BundleManager(
       937         controller, get_buffer, process_bundle_descriptor,
   --> 938         self._progress_frequency).process_bundle(data_input, 
data_output)
       939 
       940   # These classes are used to interact with the worker.
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py
 in process_bundle(self, inputs, expected_outputs)
      1108         process_bundle=beam_fn_api_pb2.ProcessBundleRequest(
      1109             
process_bundle_descriptor_reference=self._bundle_descriptor.id))
   -> 1110     result_future = 
self._controller.control_handler.push(process_bundle)
      1111 
      1112     with ProgressRequester(
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py
 in push(self, request)
      1001         request.instruction_id = 'control_%s' % self._uid_counter
      1002       logging.debug('CONTROL REQUEST %s', request)
   -> 1003       response = self.worker.do_instruction(request)
      1004       logging.debug('CONTROL RESPONSE %s', response)
      1005       return ControlFuture(request.instruction_id, response)
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py
 in do_instruction(self, request)
       199       # E.g. if register is set, this will call 
self.register(request.register))
       200       return getattr(self, request_type)(getattr(request, 
request_type),
   --> 201                                          request.instruction_id)
       202     else:
       203       raise NotImplementedError
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py
 in process_bundle(self, request, instruction_id)
       216             self.state_handler, self.data_channel_factory)
       217     try:
   --> 218       processor.process_bundle(instruction_id)
       219     finally:
       220       del self.bundle_processors[instruction_id]
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py
 in process_bundle(self, instruction_id)
       284       for op in reversed(self.ops.values()):
       285         logging.info('start %s', op)
   --> 286         op.start()
       287 
       288       # Inject inputs from data plane.
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
 in start(self)
       236           else:
       237             windowed_value = 
_globally_windowed_value.with_value(value)
   --> 238           self.output(windowed_value)
       239 
       240 
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
 in output(self, windowed_value, output_index)
       157 
       158   def output(self, windowed_value, output_index=0):
   --> 159     cython.cast(Receiver, 
self.receivers[output_index]).receive(windowed_value)
       160 
       161   def add_receiver(self, operation, output_index=0):
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
 in receive(self, windowed_value)
        83     self.update_counters_start(windowed_value)
        84     for consumer in self.consumers:
   ---> 85       cython.cast(Operation, consumer).process(windowed_value)
        86     self.update_counters_finish()
        87 
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/worker/operations.pyc
 in process(self, o)
       390   def process(self, o):
       391     with self.scoped_process_state:
   --> 392       self.dofn_receiver.receive(o)
       393 
       394   def progress_metrics(self):
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
 in receive(self, windowed_value)
       486 
       487   def receive(self, windowed_value):
   --> 488     self.process(windowed_value)
       489 
       490   def process(self, windowed_value):
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
 in process(self, windowed_value)
       494       self.do_fn_invoker.invoke_process(windowed_value)
       495     except BaseException as exn:
   --> 496       self._reraise_augmented(exn)
       497     finally:
       498       self.scoped_metrics_container.exit()
   
   
/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py
 in _reraise_augmented(self, exn)
       538           + step_annotation + stacktrace_text)
       539       new_exn._tagged_with_step = True
   --> 540     six.raise_from(new_exn, original_traceback)
       541 
       542 
   
   
/usr/local/google/home/shoyer/miniconda3/envs/beam-dev/lib/python2.7/site-packages/six.pyc
 in raise_from(value, from_value)
       735 else:
       736     def raise_from(value, from_value):
   --> 737         raise value
       738 
       739 
   
   ValueError: internal failure! [while running 'Map(f)']
   Traceback (most recent call last):
   
     File 
"/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py",
 line 494, in process
       self.do_fn_invoker.invoke_process(windowed_value)
   
     File 
"/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/runners/common.py",
 line 284, in invoke_process
       windowed_value, self.process_method(windowed_value.value))
   
     File 
"/usr/local/google/home/shoyer/open-source/beam/sdks/python/apache_beam/transforms/core.py",
 line 972, in <lambda>
       wrapper = lambda x: [fn(x)]
   
     File "<ipython-input-1-2377312c7b45>", line 4, in f
       return g(x)
   
     File "<ipython-input-1-2377312c7b45>", line 6, in g
       return h(x)
   
     File "<ipython-input-1-2377312c7b45>", line 8, in h
       raise ValueError('internal failure!')
   
   ValueError: internal failure!
   ```
   
   </details>
   
   @robertwb I will try rewriting this to use `six.reraise()` instead. That 
might be a cleaner way to do this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 87398)
    Time Spent: 2h 20m  (was: 2h 10m)

> Stacktraces from exceptions in user code should be preserved in the Python SDK
> ------------------------------------------------------------------------------
>
>                 Key: BEAM-3956
>                 URL: https://issues.apache.org/jira/browse/BEAM-3956
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Stephan Hoyer
>            Priority: Major
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Currently, Beam's Python SDK loses stacktraces for exceptions. It does 
> helpfully add a tag like "[while running StageA]" to exception error 
> messages, but that doesn't include the stacktrace of Python functions being 
> called.
> Including the full stacktraces would make a big difference for the ease of 
> debugging Beam pipelines when things go wrong.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to