This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1d234aa2ab Fix HTTP 500 Internal Server Error if DAG is triggered with
bad params (#39409)
1d234aa2ab is described below
commit 1d234aa2ab8b300055bb3ec51bdd1168651b7a47
Author: Jens Scheffler <[email protected]>
AuthorDate: Sat May 4 23:27:12 2024 +0200
Fix HTTP 500 Internal Server Error if DAG is triggered with bad params
(#39409)
---
airflow/api_connexion/endpoints/dag_run_endpoint.py | 3 ++-
tests/api_connexion/endpoints/test_dag_run_endpoint.py | 13 ++++++++++++-
2 files changed, 14 insertions(+), 2 deletions(-)
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 99f3ef2893..96fdd42fa0 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -59,6 +59,7 @@ from airflow.api_connexion.schemas.task_instance_schema
import (
task_instance_reference_collection_schema,
)
from airflow.auth.managers.models.resource_details import DagAccessEntity
+from airflow.exceptions import ParamValidationError
from airflow.models import DagModel, DagRun
from airflow.timetables.base import DataInterval
from airflow.utils.airflow_flask_app import get_airflow_app
@@ -356,7 +357,7 @@ def post_dag_run(*, dag_id: str, session: Session =
NEW_SESSION) -> APIResponse:
current_user_id = get_auth_manager().get_user_id()
dag_run.note = (dag_run_note, current_user_id)
return dagrun_schema.dump(dag_run)
- except ValueError as ve:
+ except (ValueError, ParamValidationError) as ve:
raise BadRequest(detail=str(ve))
if dagrun_instance.execution_date == logical_date:
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 5182ef427e..2d7c6ac054 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -28,6 +28,7 @@ from airflow.datasets import Dataset
from airflow.models.dag import DAG, DagModel
from airflow.models.dagrun import DagRun
from airflow.models.dataset import DatasetEvent, DatasetModel
+from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.security import permissions
from airflow.utils import timezone
@@ -127,7 +128,7 @@ class TestDagRunEndpoint:
dag_instance.is_active = True
with create_session() as session:
session.add(dag_instance)
- dag = DAG(dag_id=dag_id, schedule=None)
+ dag = DAG(dag_id=dag_id, schedule=None, params={"validated_number":
Param(1, minimum=1, maximum=10)})
self.app.dag_bag.bag_dag(dag, root_dag=dag)
return dag_instance
@@ -1320,6 +1321,16 @@ class TestPostDagRun(TestDagRunEndpoint):
"type": EXCEPTIONS_LINK_MAP[400],
}
+ def test_raises_validation_error_for_invalid_params(self):
+ self._create_dag("TEST_DAG_ID")
+ response = self.client.post(
+ "api/v1/dags/TEST_DAG_ID/dagRuns",
+ json={"conf": {"validated_number": 5000}}, # DAG param must be
between 1 and 10
+ environ_overrides={"REMOTE_USER": "test"},
+ )
+ assert response.status_code == 400
+ assert "Invalid input for param" in response.json["detail"]
+
@mock.patch("airflow.api_connexion.endpoints.dag_run_endpoint.get_airflow_app")
def test_dagrun_creation_exception_is_handled(self, mock_get_app, session):
self._create_dag("TEST_DAG_ID")