This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ec32cca6547 Fix DAG run trigger to surface errors instead of
swallowing them (#64130)
ec32cca6547 is described below
commit ec32cca6547a9935a67c8e57b6a34d5bf5781500
Author: Daniel Standish <[email protected]>
AuthorDate: Mon May 11 10:26:21 2026 -0700
Fix DAG run trigger to surface errors instead of swallowing them (#64130)
The try/except ValueError block around dag creation wrapped all errors
into generic 400 responses, hiding the original exception context from
logs and error tracking. Let exceptions propagate naturally so the full
traceback is preserved.
Existing call sites (e.g. backfill _create_backfill) catch ValueError to
wrap param validation failures, and tests assert pytest.raises(ValueError).
Adding ValueError as a base preserves that behavior while still allowing
narrow `except ParamValidationError` handling.
FastAPI TestClient defaults to raise_server_exceptions=True, so a
RuntimeError from create_dagrun() surfaces directly rather than being
mapped to a 500. Assert the propagation with pytest.raises — that's the
behavior the PR is guarding against (old code silently mapped any
ValueError to 400).
---
.../api_fastapi/core_api/routes/public/dag_run.py | 21 +++++++++---------
airflow-core/src/airflow/exceptions.py | 4 ++++
.../src/airflow/serialization/definitions/param.py | 3 ++-
.../core_api/routes/public/test_dag_run.py | 25 ++++++++++++++++++----
task-sdk/src/airflow/sdk/exceptions.py | 2 +-
5 files changed, 39 insertions(+), 16 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index a40676f8e18..2e23ad1a171 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -99,6 +99,7 @@ from airflow.api_fastapi.core_api.security import (
)
from airflow.api_fastapi.core_api.services.public.dag_run import DagRunWaiter
from airflow.api_fastapi.logging.decorators import action_logging
+from airflow.exceptions import ParamValidationError
from airflow.listeners.listener import get_listener_manager
from airflow.models import DagModel, DagRun
from airflow.models.asset import AssetEvent
@@ -597,10 +598,10 @@ def trigger_dag_run(
else:
triggered_by = DagRunTriggeredByType.REST_API
- try:
- dag = get_latest_version_of_dag(dag_bag, dag_id, session)
- params = body.validate_context(dag)
+ dag = get_latest_version_of_dag(dag_bag, dag_id, session)
+ params = body.validate_context(dag)
+ try:
dag_run = dag.create_dagrun(
run_id=params["run_id"],
logical_date=params["logical_date"],
@@ -614,14 +615,14 @@ def trigger_dag_run(
partition_key=params["partition_key"],
session=session,
)
+ except (ParamValidationError, ValueError) as e:
+ raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) from e
- dag_run_note = body.note
- if dag_run_note:
- current_user_id = user.get_id()
- dag_run.note = (dag_run_note, current_user_id)
- return dag_run
- except ValueError as e:
- raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e))
+ dag_run_note = body.note
+ if dag_run_note:
+ current_user_id = user.get_id()
+ dag_run.note = (dag_run_note, current_user_id)
+ return dag_run
@dag_run_router.get(
diff --git a/airflow-core/src/airflow/exceptions.py
b/airflow-core/src/airflow/exceptions.py
index f9a891472a3..21798efee98 100644
--- a/airflow-core/src/airflow/exceptions.py
+++ b/airflow-core/src/airflow/exceptions.py
@@ -38,6 +38,7 @@ try:
AirflowRescheduleException as AirflowRescheduleException,
AirflowTimetableInvalid as AirflowTimetableInvalid,
NodeNotFound as NodeNotFound,
+ ParamValidationError as ParamValidationError,
TaskNotFound as TaskNotFound,
)
except ModuleNotFoundError:
@@ -79,6 +80,9 @@ except ModuleNotFoundError:
class AirflowOptionalProviderFeatureException(AirflowException): # type:
ignore[no-redef]
"""Raise by providers when imports are missing for optional provider
features."""
+ class ParamValidationError(AirflowException, ValueError): # type:
ignore[no-redef]
+ """Raise when DAG params fail validation."""
+
class AirflowBadRequest(AirflowException):
"""Raise when the application or server cannot handle the request."""
diff --git a/airflow-core/src/airflow/serialization/definitions/param.py
b/airflow-core/src/airflow/serialization/definitions/param.py
index 12470c5fdd2..69a779f710e 100644
--- a/airflow-core/src/airflow/serialization/definitions/param.py
+++ b/airflow-core/src/airflow/serialization/definitions/param.py
@@ -22,6 +22,7 @@ import collections.abc
import copy
from typing import TYPE_CHECKING, Any, Literal
+from airflow.exceptions import ParamValidationError
from airflow.serialization.definitions.notset import NOTSET, is_arg_set
if TYPE_CHECKING:
@@ -143,7 +144,7 @@ class SerializedParamsDict(collections.abc.Mapping[str,
Any]):
try:
return v.resolve(raises=True)
except Exception as e:
- raise ValueError(f"Invalid input for param {k}: {e}") from None
+ raise ParamValidationError(f"Invalid input for param {k}:
{e}") from None
return {k: _validate_one(k, v) for k, v in self.__dict.items()}
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index ae89fe1e4b1..1c2e8b32196 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -2207,16 +2207,33 @@ class TestTriggerDagRun:
]
@mock.patch("airflow.serialization.definitions.dag.SerializedDAG.create_dagrun")
- def test_dagrun_creation_exception_is_handled(self, mock_create_dagrun,
test_client):
- now = timezone.utcnow().isoformat()
- error_message = "Encountered Error"
+ def test_dagrun_creation_param_validation_error_returns_400(self,
mock_create_dagrun, test_client):
+ from airflow.exceptions import ParamValidationError
- mock_create_dagrun.side_effect = ValueError(error_message)
+ now = timezone.utcnow().isoformat()
+ error_message = "Invalid input for param x"
+ mock_create_dagrun.side_effect = ParamValidationError(error_message)
response = test_client.post(f"/dags/{DAG1_ID}/dagRuns",
json={"logical_date": now})
assert response.status_code == 400
assert response.json() == {"detail": error_message}
+
@mock.patch("airflow.serialization.definitions.dag.SerializedDAG.create_dagrun")
+ def test_dagrun_creation_non_validation_error_propagates(self,
mock_create_dagrun, test_client):
+ """
+ Non-ParamValidationError exceptions from create_dagrun() must not be
swallowed.
+
+ TestClient's default raise_server_exceptions=True surfaces server-side
+ exceptions to the caller; in production these would become a 500. The
+ regression we are guarding against is the old behavior where any
+ ValueError got silently converted to 400.
+ """
+ now = timezone.utcnow().isoformat()
+ mock_create_dagrun.side_effect = RuntimeError("boom")
+
+ with pytest.raises(RuntimeError, match="boom"):
+ test_client.post(f"/dags/{DAG1_ID}/dagRuns", json={"logical_date":
now})
+
def test_should_respond_404_if_a_dag_is_inactive(self, test_client,
session, testing_dag_bundle):
now = timezone.utcnow().isoformat()
self._dags_for_trigger_tests(session)
diff --git a/task-sdk/src/airflow/sdk/exceptions.py
b/task-sdk/src/airflow/sdk/exceptions.py
index ed3bb3f1493..b0ff82be293 100644
--- a/task-sdk/src/airflow/sdk/exceptions.py
+++ b/task-sdk/src/airflow/sdk/exceptions.py
@@ -305,7 +305,7 @@ class XComNotFound(AirflowException):
)
-class ParamValidationError(AirflowException):
+class ParamValidationError(AirflowException, ValueError):
"""Raise when DAG params is invalid."""