This is an automated email from the ASF dual-hosted git repository. pierrejeambrun pushed a commit to branch v2-5-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 9891eef2abdade85c2956f99e29405fa7ecc960b Author: Vedant Lodha <[email protected]> AuthorDate: Thu Feb 2 01:07:39 2023 +0530 Fix dag run trigger with a note. (#29228) * Fix dag run trigger with a note. Currently, triggering dag run with note gives 400. This PR fixes it. Closes: #28825 * make flask_login.current_user a global import * precommit error fixes. * add test coverage for the fix (cherry picked from commit b94f36bf563f5c8372086cec63b74eadef638ef8) --- airflow/api_connexion/endpoints/dag_run_endpoint.py | 7 +++++-- airflow/api_connexion/schemas/dag_run_schema.py | 2 +- tests/api_connexion/endpoints/test_dag_run_endpoint.py | 14 +++++++------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index b566ba2df7..2b3ee16b2f 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -21,6 +21,7 @@ from http import HTTPStatus import pendulum from connexion import NoContent from flask import g +from flask_login import current_user from marshmallow import ValidationError from sqlalchemy import or_ from sqlalchemy.orm import Query, Session @@ -319,6 +320,10 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse: dag_hash=get_airflow_app().dag_bag.dags_hash.get(dag_id), session=session, ) + dag_run_note = post_body.get("note") + if dag_run_note: + current_user_id = getattr(current_user, "id", None) + dag_run.note = (dag_run_note, current_user_id) return dagrun_schema.dump(dag_run) except ValueError as ve: raise BadRequest(detail=str(ve)) @@ -438,8 +443,6 @@ def set_dag_run_note(*, dag_id: str, dag_run_id: str, session: Session = NEW_SES except ValidationError as err: raise BadRequest(detail=str(err)) - from flask_login import current_user - current_user_id = getattr(current_user, "id", None) if dag_run.dag_run_note is None: dag_run.note = (new_note, current_user_id) diff --git a/airflow/api_connexion/schemas/dag_run_schema.py b/airflow/api_connexion/schemas/dag_run_schema.py index 7ca857951b..999b533728 100644 --- a/airflow/api_connexion/schemas/dag_run_schema.py +++ b/airflow/api_connexion/schemas/dag_run_schema.py @@ -73,7 +73,7 @@ class DAGRunSchema(SQLAlchemySchema): data_interval_end = auto_field(dump_only=True) last_scheduling_decision = auto_field(dump_only=True) run_type = auto_field(dump_only=True) - note = auto_field(dump_only=True) + note = auto_field(dump_only=False) @pre_load def autogenerate(self, data, **kwargs): diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index 8b02bb321f..b645e601e9 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -1018,14 +1018,14 @@ class TestGetDagRunBatchDateFilters(TestDagRunEndpoint): class TestPostDagRun(TestDagRunEndpoint): @pytest.mark.parametrize("logical_date_field_name", ["execution_date", "logical_date"]) @pytest.mark.parametrize( - "dag_run_id, logical_date", + "dag_run_id, logical_date, note", [ - pytest.param("TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", id="both-present"), - pytest.param(None, "2020-06-11T18:00:00+00:00", id="only-date"), - pytest.param(None, None, id="both-missing"), + pytest.param("TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", "test-note", id="all-present"), + pytest.param(None, "2020-06-11T18:00:00+00:00", None, id="only-date"), + pytest.param(None, None, None, id="all-missing"), ], ) - def test_should_respond_200(self, logical_date_field_name, dag_run_id, logical_date): + def test_should_respond_200(self, logical_date_field_name, dag_run_id, logical_date, note): self._create_dag("TEST_DAG_ID") # We'll patch airflow.utils.timezone.utcnow to always return this so we @@ -1037,7 +1037,7 @@ class TestPostDagRun(TestDagRunEndpoint): request_json[logical_date_field_name] = logical_date if dag_run_id is not None: request_json["dag_run_id"] = dag_run_id - + request_json["note"] = note with mock.patch("airflow.utils.timezone.utcnow", lambda: fixed_now): response = self.client.post( "api/v1/dags/TEST_DAG_ID/dagRuns", @@ -1068,7 +1068,7 @@ class TestPostDagRun(TestDagRunEndpoint): "data_interval_start": expected_logical_date, "last_scheduling_decision": None, "run_type": "manual", - "note": None, + "note": note, } def test_should_respond_400_if_a_dag_has_import_errors(self, session):
