jd185367 opened a new issue, #31193:
URL: https://github.com/apache/beam/issues/31193

   ### What would you like to happen?
   
   Add a way to handle uncaught runtime exceptions thrown within a transform to 
the Python SDK, e.g. something like this `with_exception_handling()` method:
   
   ```python
   def log_errors(error_item: tuple):
       item, error_info = error_item
       logging.error(error_info)
       logging.error(f"Failed to save item: {item}")
   
   _, errors = items | MyTransform().with_exception_handling()
   errors.Map(log_errors)
   ```
   
   This already exists for DoFns in 
[DoFn.with_exception_handling](https://github.com/apache/beam/blob/c531f898d6b3454761f78a2420df5424178fb002/sdks/python/apache_beam/transforms/core.py#L1493),
 and the Java SDK appears to offer something similar for PTransforms: 
https://beam.apache.org/releases/javadoc/2.15.0/index.html?org/apache/beam/sdk/transforms/WithFailures.html
   
   ## Motivation
   
   Google Cloud Dataflow will automatically re-try failed messages in streaming 
jobs; however, in the case of messages that cause runtime errors due to bad 
data/etc., this can cause messages to be retried infinitely and *block other 
messages from being processed*. The only fix we've found is to drain and 
re-start the pipeline to flush the bad messages, which is manual and risks 
losing data. There's no way to set a maximum number of retries per message. 
While we try to parse + validate messages up-front as much as possible, bugs 
have slipped through to production and caused runtime errors (and obviously, we 
can't prevent 100% of bugs).
   
   Being able to add a top-level error handler to the pipeline (or a root 
transform) would solve this, since in a worst-case scenario we could catch any 
failed messages/collections, log them, and not block the rest of the pipeline.
   
   Right now, though, adding a top-level exception handler isn't possible. For 
instance, **this example will not catch the raised error** in Apache Beam 
2.56.0, which is very unintuitive:
   ```python
   import logging
   import apache_beam as beam
   
   class BuggedTransform(beam.PTransform):
       def expand(self, messages: beam.PCollection) -> beam.PCollection:
           return messages | "Call function w/ bug" >> 
beam.Map(self.raise_error)
       
       def raise_error(self, m: str):
           if m == "bar":
               raise ValueError("This should be caught")
           print(m)
           return m
   
   class MyTransform(beam.PTransform):
       def expand(self, messages: beam.PCollection) -> beam.PCollection:
           try:
               return messages | "Run transform w/ bug" >> BuggedTransform()
           except Exception as e:      # This should catch the error, but 
doesn't!
               logging.error(f"Error happened {e}")
               return messages
   
   with beam.Pipeline() as pipeline:
       result = (
           pipeline
           | "Create example data" >> beam.Create(["foo", "bar", "baz"])
           | "Apply my transform" >> MyTransform()
       )
   ```
   Output:
   ```
   foo
   ERROR:apache_beam.runners.common:This should be caught [while running 'Apply 
my transform/Run transform w/ bug/Call function w/ bug']
   Traceback (most recent call last):
     File "apache_beam\runners\common.py", line 1435, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam\runners\common.py", line 640, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
     File 
"C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\transforms\core.py", 
line 1969, in <lambda>
       wrapper = lambda x: [fn(x)]
     File ".\beam_exception_example.py", line 12, in raise_error
       raise ValueError("This should be caught")
   ValueError: This should be caught
   Traceback (most recent call last):
     File "apache_beam\runners\common.py", line 1435, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam\runners\common.py", line 640, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
     File 
"C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\transforms\core.py", 
line 1969, in <lambda>
       wrapper = lambda x: [fn(x)]
     File ".\beam_exception_example.py", line 12, in raise_error
       raise ValueError("This should be caught")
   ValueError: This should be caught
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File ".\beam_exception_example.py", line 27, in <module>
       result = (
     File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\pipeline.py", 
line 613, in __exit__
       self.result = self.run()
     File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\pipeline.py", 
line 587, in run
       return self.runner.run_pipeline(self, self._options)
     File 
"C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\direct\direct_runner.py",
 line 128, in run_pipeline
       return runner.run_pipeline(pipeline, options)
     File 
"C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
 line 204, in run_pipeline
       self._latest_run_result = self.run_via_runner_api(
     File 
"C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
 line 228, in run_via_runner_api
       return self.run_stages(stage_context, stages)
     File 
"C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
 line 483, in run_stages
       bundle_results = self._execute_bundle(
     File 
"C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
 line 811, in _execute_bundle
       self._run_bundle(
     File 
"C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
 line 1048, in _run_bundle
       result, splits = bundle_manager.process_bundle(
     File 
"C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py",
 line 1384, in process_bundle
       result_future = 
self._worker_handler.control_conn.push(process_bundle_req)
     File 
"C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py",
 line 384, in push
       response = self.worker.do_instruction(request)
     File 
"C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
 line 656, in do_instruction
       return getattr(self, request_type)(
     File 
"C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
 line 694, in process_bundle
       bundle_processor.process_bundle(instruction_id))
     File 
"C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
 line 1113, in process_bundle
       input_op_by_transform_id[element.transform_id].process_encoded(
     File 
"C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
 line 237, in process_encoded
       self.output(decoded_value)
     File "apache_beam\runners\worker\operations.py", line 569, in 
apache_beam.runners.worker.operations.Operation.output
     File "apache_beam\runners\worker\operations.py", line 571, in 
apache_beam.runners.worker.operations.Operation.output
     File "apache_beam\runners\worker\operations.py", line 262, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam\runners\worker\operations.py", line 265, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam\runners\worker\operations.py", line 952, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam\runners\worker\operations.py", line 953, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam\runners\common.py", line 1437, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam\runners\common.py", line 1526, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam\runners\common.py", line 1435, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam\runners\common.py", line 639, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
     File "apache_beam\runners\common.py", line 1621, in 
apache_beam.runners.common._OutputHandler.handle_process_outputs
     File "apache_beam\runners\common.py", line 1734, in 
apache_beam.runners.common._OutputHandler._write_value_to_tag
     File "apache_beam\runners\worker\operations.py", line 265, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam\runners\worker\operations.py", line 952, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam\runners\worker\operations.py", line 953, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam\runners\common.py", line 1437, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam\runners\common.py", line 1526, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam\runners\common.py", line 1435, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam\runners\common.py", line 639, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
     File "apache_beam\runners\common.py", line 1621, in 
apache_beam.runners.common._OutputHandler.handle_process_outputs
     File "apache_beam\runners\common.py", line 1734, in 
apache_beam.runners.common._OutputHandler._write_value_to_tag
     File "apache_beam\runners\worker\operations.py", line 265, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam\runners\worker\operations.py", line 952, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam\runners\worker\operations.py", line 953, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam\runners\common.py", line 1437, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam\runners\common.py", line 1526, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam\runners\common.py", line 1435, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam\runners\common.py", line 639, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
     File "apache_beam\runners\common.py", line 1621, in 
apache_beam.runners.common._OutputHandler.handle_process_outputs
     File "apache_beam\runners\common.py", line 1734, in 
apache_beam.runners.common._OutputHandler._write_value_to_tag
     File "apache_beam\runners\worker\operations.py", line 265, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam\runners\worker\operations.py", line 952, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam\runners\worker\operations.py", line 953, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam\runners\common.py", line 1437, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam\runners\common.py", line 1547, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam\runners\common.py", line 1435, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam\runners\common.py", line 640, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
     File 
"C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\transforms\core.py", 
line 1969, in <lambda>
       wrapper = lambda x: [fn(x)]
     File ".\beam_exception_example.py", line 12, in raise_error
       raise ValueError("This should be caught")
   ValueError: This should be caught [while running 'Apply my transform/Run 
transform w/ bug/Call function w/ bug']
   ```
   
   The only solution we've found is to add this sort of error handling 
separately to every pipeline step, which isn't maintainable (e.g. if we have 
hundreds of DoFns, adding try-except blocks to all of them individually is 
labor-intensive).
   
   ## Related Issues
   
   - https://github.com/apache/beam/issues/18952 seems to address a similar 
(but more specific) problem
   
   ### Issue Priority
   
   Priority: 2 (default / most feature requests should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [X] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to