This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f947803d5c8 Add exception chaining (#37423)
f947803d5c8 is described below
commit f947803d5c826daaadc8f31fd335df858338850c
Author: Shaheer Amjad <[email protected]>
AuthorDate: Thu Jan 29 21:17:36 2026 +0500
Add exception chaining (#37423)
* Add exception chaining to preserve error context
- Add 'from e' to exception re-raises in CloudSQLEnrichmentHandler
- Add exception chaining in processes.py for OSError and CalledProcessError
- Improve logging in core.py to preserve traceback context
This improves debuggability by preserving the full exception chain,
following Python PEP 3134 best practices.
Fixes #37422
* Fix yapf formatting for logging.warning statement
* Fix yapf formatting: put logging arguments on single line
---
CHANGES.md | 1 +
sdks/python/apache_beam/transforms/core.py | 3 ++-
.../transforms/enrichment_handlers/cloudsql.py | 4 ++--
sdks/python/apache_beam/utils/processes.py | 24 +++++++++++-----------
4 files changed, 17 insertions(+), 15 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index ff931802add..e2dcf6e0f2c 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -68,6 +68,7 @@
## New Features / Improvements
+* (Python) Added exception chaining to preserve error context in
CloudSQLEnrichmentHandler, processes utilities, and core transforms
([#37422](https://github.com/apache/beam/issues/37422)).
* X feature added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
## Breaking Changes
diff --git a/sdks/python/apache_beam/transforms/core.py
b/sdks/python/apache_beam/transforms/core.py
index ea11bca9474..128a070e2ac 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -2444,7 +2444,8 @@ class _ExceptionHandlingWrapperDoFn(DoFn):
try:
self._on_failure_callback(exn, args[0])
except Exception as e:
- logging.warning('on_failure_callback failed with error: %s', e)
+ logging.warning(
+ 'on_failure_callback failed with error: %s', e, exc_info=True)
yield pvalue.TaggedOutput(
self._dead_letter_tag,
(
diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py
b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py
index 3fe3a62f954..ba0b8617f67 100644
--- a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py
+++ b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py
@@ -395,11 +395,11 @@ class
CloudSQLEnrichmentHandler(EnrichmentSourceHandler[beam.Row, beam.Row]):
return data
except Exception as e:
transaction.rollback()
- raise RuntimeError(f"Database operation failed: {e}")
+ raise RuntimeError(f"Database operation failed: {e}") from e
except Exception as e:
raise Exception(
f'Could not execute the query. Please check if the query is properly
'
- f'formatted and the table exists. {e}')
+ f'formatted and the table exists. {e}') from e
finally:
if connection:
connection.close()
diff --git a/sdks/python/apache_beam/utils/processes.py
b/sdks/python/apache_beam/utils/processes.py
index c7b9e240d96..f6daecea212 100644
--- a/sdks/python/apache_beam/utils/processes.py
+++ b/sdks/python/apache_beam/utils/processes.py
@@ -49,18 +49,18 @@ else:
kwargs['shell'] = True
try:
out = subprocess.call(*args, **kwargs)
- except OSError:
- raise RuntimeError("Executable {} not found".format(args[0]))
+ except OSError as e:
+ raise RuntimeError("Executable {} not found".format(args[0])) from e
except subprocess.CalledProcessError as error:
if isinstance(args, tuple) and (args[0][2] == "pip"):
raise RuntimeError( \
"Full traceback: {}\n Pip install failed for package: {} \
\n Output from execution of subprocess: {}" \
- .format(traceback.format_exc(), args[0][6], error. output))
+ .format(traceback.format_exc(), args[0][6], error. output)) from
error
else:
raise RuntimeError("Full trace: {}\
\n Output of the failed child process: {} " \
- .format(traceback.format_exc(), error.output))
+ .format(traceback.format_exc(), error.output)) from error
return out
def check_call(*args, **kwargs):
@@ -68,18 +68,18 @@ else:
kwargs['shell'] = True
try:
out = subprocess.check_call(*args, **kwargs)
- except OSError:
- raise RuntimeError("Executable {} not found".format(args[0]))
+ except OSError as e:
+ raise RuntimeError("Executable {} not found".format(args[0])) from e
except subprocess.CalledProcessError as error:
if isinstance(args, tuple) and (args[0][2] == "pip"):
raise RuntimeError( \
"Full traceback: {} \n Pip install failed for package: {} \
\n Output from execution of subprocess: {}" \
- .format(traceback.format_exc(), args[0][6], error.output))
+ .format(traceback.format_exc(), args[0][6], error.output)) from error
else:
raise RuntimeError("Full trace: {} \
\n Output of the failed child process: {}" \
- .format(traceback.format_exc(), error.output))
+ .format(traceback.format_exc(), error.output)) from error
return out
def check_output(*args, **kwargs):
@@ -87,18 +87,18 @@ else:
kwargs['shell'] = True
try:
out = subprocess.check_output(*args, **kwargs)
- except OSError:
- raise RuntimeError("Executable {} not found".format(args[0]))
+ except OSError as e:
+ raise RuntimeError("Executable {} not found".format(args[0])) from e
except subprocess.CalledProcessError as error:
if isinstance(args, tuple) and (args[0][2] == "pip"):
raise RuntimeError( \
"Full traceback: {} \n Pip install failed for package: {} \
\n Output from execution of subprocess: {}" \
- .format(traceback.format_exc(), args[0][6], error.output))
+ .format(traceback.format_exc(), args[0][6], error.output)) from error
else:
raise RuntimeError("Full trace: {}, \
output of the failed child process {} "\
- .format(traceback.format_exc(), error.output))
+ .format(traceback.format_exc(), error.output)) from error
return out
def Popen(*args, **kwargs):