This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d0ced937ac0 Factor out a shared base for the database error handlers
(#68512)
d0ced937ac0 is described below
commit d0ced937ac0a1de71fea11071f12965bfe3ed8d2
Author: Stefan Wang <[email protected]>
AuthorDate: Mon Jun 15 08:50:55 2026 -0700
Factor out a shared base for the database error handlers (#68512)
_UniqueConstraintErrorHandler and DataErrorHandler carried near-identical
bodies — lookup id, statement log, [api] expose_stacktrace gating, and the
HTTPException detail shape. Move the shared body into a
_DatabaseErrorHandler
base; each handler now declares only its status_code, reason, and an
optional
_should_handle guard. No behavior change.
---
.../src/airflow/api_fastapi/common/exceptions.py | 134 ++++++++++-----------
1 file changed, 62 insertions(+), 72 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/common/exceptions.py
b/airflow-core/src/airflow/api_fastapi/common/exceptions.py
index efe775e544e..86af9e062cd 100644
--- a/airflow-core/src/airflow/api_fastapi/common/exceptions.py
+++ b/airflow-core/src/airflow/api_fastapi/common/exceptions.py
@@ -24,13 +24,14 @@ from enum import Enum
from typing import Generic, TypeVar
from fastapi import HTTPException, Request, status
-from sqlalchemy.exc import DataError, IntegrityError
+from sqlalchemy.exc import DatabaseError, DataError, IntegrityError
from airflow.configuration import conf
from airflow.exceptions import DeserializationError
from airflow.utils.strings import get_random_string
T = TypeVar("T", bound=Exception)
+DBError = TypeVar("DBError", bound=DatabaseError)
log = logging.getLogger(__name__)
@@ -53,8 +54,57 @@ class _DatabaseDialect(Enum):
POSTGRES = "postgres"
-class _UniqueConstraintErrorHandler(BaseErrorHandler[IntegrityError]):
- """Exception raised when trying to insert a duplicate value in a unique
column."""
+class _DatabaseErrorHandler(BaseErrorHandler[DBError]):
+ """
+ Base for handlers that turn a SQLAlchemy error into an actionable HTTP
response.
+
+ The failing statement is logged under a random lookup id and echoed back
to the
+ caller only when ``[api] expose_stacktrace`` is set; otherwise the
response just
+ points at that id in the api server logs. Subclasses set ``status_code``
and
+ ``reason`` and may override ``_should_handle`` to skip exceptions they do
not own.
+ """
+
+ status_code: int
+ reason: str
+
+ def _should_handle(self, exc: DBError) -> bool:
+ return True
+
+ def exception_handler(self, request: Request, exc: DBError):
+ if not self._should_handle(exc):
+ return
+ exception_id = get_random_string()
+ stacktrace = "".join(traceback.format_tb(exc.__traceback__))
+ log_message = f"Error with id {exception_id}, statement:
{exc.statement}\n{stacktrace}"
+ log.error(log_message)
+ if conf.get("api", "expose_stacktrace") == "True":
+ message = log_message
+ statement = str(exc.statement)
+ orig_error = str(exc.orig)
+ else:
+ message = (
+ "Serious error when handling your request. Check logs for more
details - "
+ f"you will find it in api server when you look for ID
{exception_id}"
+ )
+ statement = "hidden"
+ orig_error = "hidden"
+
+ raise HTTPException(
+ status_code=self.status_code,
+ detail={
+ "reason": self.reason,
+ "statement": statement,
+ "orig_error": orig_error,
+ "message": message,
+ },
+ )
+
+
+class _UniqueConstraintErrorHandler(_DatabaseErrorHandler[IntegrityError]):
+ """Translate a unique-constraint ``IntegrityError`` into a 409, matched
per database dialect."""
+
+ status_code = status.HTTP_409_CONFLICT
+ reason = "Unique constraint violation"
unique_constraint_error_prefix_dict: dict[_DatabaseDialect, str] = {
_DatabaseDialect.SQLITE: "UNIQUE constraint failed",
@@ -66,37 +116,8 @@ class
_UniqueConstraintErrorHandler(BaseErrorHandler[IntegrityError]):
super().__init__(IntegrityError)
self.dialect: _DatabaseDialect | None = None
- def exception_handler(self, request: Request, exc: IntegrityError):
- """Handle IntegrityError exception."""
- if self._is_dialect_matched(exc):
- exception_id = get_random_string()
- stacktrace = ""
- for tb in traceback.format_tb(exc.__traceback__):
- stacktrace += tb
-
- log_message = f"Error with id {exception_id}, statement:
{exc.statement}\n{stacktrace}"
- log.error(log_message)
- if conf.get("api", "expose_stacktrace") == "True":
- message = log_message
- statement = str(exc.statement)
- orig_error = str(exc.orig)
- else:
- message = (
- "Serious error when handling your request. Check logs for
more details - "
- f"you will find it in api server when you look for ID
{exception_id}"
- )
- statement = "hidden"
- orig_error = "hidden"
-
- raise HTTPException(
- status_code=status.HTTP_409_CONFLICT,
- detail={
- "reason": "Unique constraint violation",
- "statement": statement,
- "orig_error": orig_error,
- "message": message,
- },
- )
+ def _should_handle(self, exc: IntegrityError) -> bool:
+ return self._is_dialect_matched(exc)
def _is_dialect_matched(self, exc: IntegrityError) -> bool:
"""Check if the exception matches the unique constraint error message
for any dialect."""
@@ -108,51 +129,20 @@ class
_UniqueConstraintErrorHandler(BaseErrorHandler[IntegrityError]):
return False
-class DataErrorHandler(BaseErrorHandler[DataError]):
+class DataErrorHandler(_DatabaseErrorHandler[DataError]):
"""
- Translate ``sqlalchemy.exc.DataError`` into a 422.
+ Translate a ``sqlalchemy.exc.DataError`` into a 422.
- The database rejected a value that passed Pydantic validation (too long,
out
- of range, or the wrong type for its column), so it is a client error, not a
- 500. The statement and underlying DB error are logged under a lookup id and
- returned to the caller only when ``[api] expose_stacktrace`` is set,
mirroring
- ``_UniqueConstraintErrorHandler``.
+ The database rejected a value that passed Pydantic validation (too long,
out of
+ range, or the wrong type for its column), so it is a client error, not a
500.
"""
+ status_code = status.HTTP_422_UNPROCESSABLE_ENTITY
+ reason = "Value rejected by database"
+
def __init__(self):
super().__init__(DataError)
- def exception_handler(self, request: Request, exc: DataError):
- """Handle DataError exception."""
- exception_id = get_random_string()
- stacktrace = ""
- for tb in traceback.format_tb(exc.__traceback__):
- stacktrace += tb
-
- log_message = f"Error with id {exception_id}, statement:
{exc.statement}\n{stacktrace}"
- log.error(log_message)
- if conf.get("api", "expose_stacktrace") == "True":
- message = log_message
- statement = str(exc.statement)
- orig_error = str(exc.orig)
- else:
- message = (
- "Serious error when handling your request. Check logs for more
details - "
- f"you will find it in api server when you look for ID
{exception_id}"
- )
- statement = "hidden"
- orig_error = "hidden"
-
- raise HTTPException(
- status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
- detail={
- "reason": "Value rejected by database",
- "statement": statement,
- "orig_error": orig_error,
- "message": message,
- },
- )
-
class DagErrorHandler(BaseErrorHandler[DeserializationError]):
"""Handler for Dag related errors."""