This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f947803d5c8 Add exception chaining (#37423)
f947803d5c8 is described below

commit f947803d5c826daaadc8f31fd335df858338850c
Author: Shaheer Amjad <[email protected]>
AuthorDate: Thu Jan 29 21:17:36 2026 +0500

    Add exception chaining (#37423)
    
    * Add exception chaining to preserve error context
    
    - Add 'from e' to exception re-raises in CloudSQLEnrichmentHandler
    - Add exception chaining in processes.py for OSError and CalledProcessError
    - Improve logging in core.py to preserve traceback context
    
    This improves debuggability by preserving the full exception chain,
    following Python PEP 3134 best practices.
    
    Fixes #37422
    
    * Fix yapf formatting for logging.warning statement
    
    * Fix yapf formatting: put logging arguments on single line
---
 CHANGES.md                                         |  1 +
 sdks/python/apache_beam/transforms/core.py         |  3 ++-
 .../transforms/enrichment_handlers/cloudsql.py     |  4 ++--
 sdks/python/apache_beam/utils/processes.py         | 24 +++++++++++-----------
 4 files changed, 17 insertions(+), 15 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index ff931802add..e2dcf6e0f2c 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -68,6 +68,7 @@
 
 ## New Features / Improvements
 
+* (Python) Added exception chaining to preserve error context in 
CloudSQLEnrichmentHandler, processes utilities, and core transforms 
([#37422](https://github.com/apache/beam/issues/37422)).
 * X feature added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
 
 ## Breaking Changes
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index ea11bca9474..128a070e2ac 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -2444,7 +2444,8 @@ class _ExceptionHandlingWrapperDoFn(DoFn):
         try:
           self._on_failure_callback(exn, args[0])
         except Exception as e:
-          logging.warning('on_failure_callback failed with error: %s', e)
+          logging.warning(
+              'on_failure_callback failed with error: %s', e, exc_info=True)
       yield pvalue.TaggedOutput(
           self._dead_letter_tag,
           (
diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py 
b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py
index 3fe3a62f954..ba0b8617f67 100644
--- a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py
+++ b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py
@@ -395,11 +395,11 @@ class 
CloudSQLEnrichmentHandler(EnrichmentSourceHandler[beam.Row, beam.Row]):
         return data
       except Exception as e:
         transaction.rollback()
-        raise RuntimeError(f"Database operation failed: {e}")
+        raise RuntimeError(f"Database operation failed: {e}") from e
     except Exception as e:
       raise Exception(
           f'Could not execute the query. Please check if the query is properly 
'
-          f'formatted and the table exists. {e}')
+          f'formatted and the table exists. {e}') from e
     finally:
       if connection:
         connection.close()
diff --git a/sdks/python/apache_beam/utils/processes.py 
b/sdks/python/apache_beam/utils/processes.py
index c7b9e240d96..f6daecea212 100644
--- a/sdks/python/apache_beam/utils/processes.py
+++ b/sdks/python/apache_beam/utils/processes.py
@@ -49,18 +49,18 @@ else:
       kwargs['shell'] = True
     try:
       out = subprocess.call(*args, **kwargs)
-    except OSError:
-      raise RuntimeError("Executable {} not found".format(args[0]))
+    except OSError as e:
+      raise RuntimeError("Executable {} not found".format(args[0])) from e
     except subprocess.CalledProcessError as error:
       if isinstance(args, tuple) and (args[0][2] == "pip"):
         raise RuntimeError( \
           "Full traceback: {}\n Pip install failed for package: {} \
           \n Output from execution of subprocess: {}" \
-          .format(traceback.format_exc(), args[0][6], error. output))
+          .format(traceback.format_exc(), args[0][6], error. output)) from 
error
       else:
         raise RuntimeError("Full trace: {}\
            \n Output of the failed child process: {} " \
-          .format(traceback.format_exc(), error.output))
+          .format(traceback.format_exc(), error.output)) from error
     return out
 
   def check_call(*args, **kwargs):
@@ -68,18 +68,18 @@ else:
       kwargs['shell'] = True
     try:
       out = subprocess.check_call(*args, **kwargs)
-    except OSError:
-      raise RuntimeError("Executable {} not found".format(args[0]))
+    except OSError as e:
+      raise RuntimeError("Executable {} not found".format(args[0])) from e
     except subprocess.CalledProcessError as error:
       if isinstance(args, tuple) and (args[0][2] == "pip"):
         raise RuntimeError( \
           "Full traceback: {} \n Pip install failed for package: {} \
           \n Output from execution of subprocess: {}" \
-          .format(traceback.format_exc(), args[0][6], error.output))
+          .format(traceback.format_exc(), args[0][6], error.output)) from error
       else:
         raise RuntimeError("Full trace: {} \
           \n Output of the failed child process: {}" \
-          .format(traceback.format_exc(), error.output))
+          .format(traceback.format_exc(), error.output)) from error
     return out
 
   def check_output(*args, **kwargs):
@@ -87,18 +87,18 @@ else:
       kwargs['shell'] = True
     try:
       out = subprocess.check_output(*args, **kwargs)
-    except OSError:
-      raise RuntimeError("Executable {} not found".format(args[0]))
+    except OSError as e:
+      raise RuntimeError("Executable {} not found".format(args[0])) from e
     except subprocess.CalledProcessError as error:
       if isinstance(args, tuple) and (args[0][2] == "pip"):
         raise RuntimeError( \
           "Full traceback: {} \n Pip install failed for package: {} \
           \n Output from execution of subprocess: {}" \
-          .format(traceback.format_exc(), args[0][6], error.output))
+          .format(traceback.format_exc(), args[0][6], error.output)) from error
       else:
         raise RuntimeError("Full trace: {}, \
            output of the failed child process {} "\
-          .format(traceback.format_exc(), error.output))
+          .format(traceback.format_exc(), error.output)) from error
     return out
 
   def Popen(*args, **kwargs):

Reply via email to