This is an automated email from the ASF dual-hosted git repository. utkarsharma pushed a commit to branch v2-9-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0df76f5727f79da0547b9d1cfd813fa11d1be0e9 Author: Fred Thomsen <[email protected]> AuthorDate: Fri Jun 21 22:26:28 2024 -0500 Fix TriggeredDagRunOperator triggered link (#40336) * Fix TriggeredDagRunOperator triggered link Link to the specific dag run id that was triggered, as opposed to the logical date, such that the instance that was triggered on the DAG grid UI. * Add comment regarding triggered dag xcom (cherry picked from commit 806bb80ce43dcd79ab57b040151a06ff679f2cf1) --- airflow/operators/trigger_dagrun.py | 12 ++++++++---- tests/operators/test_trigger_dagrun.py | 4 ++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py index f8cfa5256a..d8a1da7624 100644 --- a/airflow/operators/trigger_dagrun.py +++ b/airflow/operators/trigger_dagrun.py @@ -63,10 +63,13 @@ class TriggerDagRunLink(BaseOperatorLink): name = "Triggered DAG" def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str: - # Fetch the correct execution date for the triggerED dag which is + # Fetch the correct dag_run_id for the triggerED dag which is # stored in xcom during execution of the triggerING task. - when = XCom.get_value(ti_key=ti_key, key=XCOM_LOGICAL_DATE_ISO) - query = {"dag_id": cast(TriggerDagRunOperator, operator).trigger_dag_id, "base_date": when} + triggered_dag_run_id = XCom.get_value(ti_key=ti_key, key=XCOM_RUN_ID) + query = { + "dag_id": cast(TriggerDagRunOperator, operator).trigger_dag_id, + "dag_run_id": triggered_dag_run_id, + } return build_airflow_url_with_query(query) @@ -199,8 +202,9 @@ class TriggerDagRunOperator(BaseOperator): raise e if dag_run is None: raise RuntimeError("The dag_run should be set here!") - # Store the execution date from the dag run (either created or found above) to + # Store the run id from the dag run (either created or found above) to # be used when creating the extra link on the webserver. + # TODO: Logical date as xcom stored only for backwards compatibility. Remove in Airflow 3.0 ti = context["task_instance"] ti.xcom_push(key=XCOM_LOGICAL_DATE_ISO, value=dag_run.logical_date.isoformat()) ti.xcom_push(key=XCOM_RUN_ID, value=dag_run.run_id) diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index 9eed9b786e..267bfeaa01 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -91,7 +91,7 @@ class TestDagRunOperator: """ Asserts whether the correct extra links url will be created. - Specifically it tests whether the correct dag id and date are passed to + Specifically it tests whether the correct dag id and run id are passed to the method which constructs the final url. Note: We can't run that method to generate the url itself because the Flask app context isn't available within the test logic, so it is mocked here. @@ -110,7 +110,7 @@ class TestDagRunOperator: args, _ = mock_build_url.call_args expected_args = { "dag_id": triggered_dag_run.dag_id, - "base_date": triggered_dag_run.logical_date.isoformat(), + "dag_run_id": triggered_dag_run.run_id, } assert expected_args in args
