This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 806bb80ce4 Fix TriggeredDagRunOperator triggered link (#40336)
806bb80ce4 is described below
commit 806bb80ce43dcd79ab57b040151a06ff679f2cf1
Author: Fred Thomsen <[email protected]>
AuthorDate: Fri Jun 21 22:26:28 2024 -0500
Fix TriggeredDagRunOperator triggered link (#40336)
* Fix TriggeredDagRunOperator triggered link
Link to the specific dag run id that was triggered, as opposed to the
logical date, such that the instance that was triggered on the DAG grid
UI.
* Add comment regarding triggered dag xcom
---
airflow/operators/trigger_dagrun.py | 12 ++++++++----
tests/operators/test_trigger_dagrun.py | 4 ++--
2 files changed, 10 insertions(+), 6 deletions(-)
diff --git a/airflow/operators/trigger_dagrun.py
b/airflow/operators/trigger_dagrun.py
index be0e99db1f..19cc0562cc 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -69,10 +69,13 @@ class TriggerDagRunLink(BaseOperatorLink):
name = "Triggered DAG"
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) ->
str:
- # Fetch the correct execution date for the triggerED dag which is
+ # Fetch the correct dag_run_id for the triggerED dag which is
# stored in xcom during execution of the triggerING task.
- when = XCom.get_value(ti_key=ti_key, key=XCOM_LOGICAL_DATE_ISO)
- query = {"dag_id": cast(TriggerDagRunOperator,
operator).trigger_dag_id, "base_date": when}
+ triggered_dag_run_id = XCom.get_value(ti_key=ti_key, key=XCOM_RUN_ID)
+ query = {
+ "dag_id": cast(TriggerDagRunOperator, operator).trigger_dag_id,
+ "dag_run_id": triggered_dag_run_id,
+ }
return build_airflow_url_with_query(query)
@@ -218,8 +221,9 @@ class TriggerDagRunOperator(BaseOperator):
raise e
if dag_run is None:
raise RuntimeError("The dag_run should be set here!")
- # Store the execution date from the dag run (either created or found
above) to
+ # Store the run id from the dag run (either created or found above) to
# be used when creating the extra link on the webserver.
+ # TODO: Logical date as xcom stored only for backwards compatibility.
Remove in Airflow 3.0
ti = context["task_instance"]
ti.xcom_push(key=XCOM_LOGICAL_DATE_ISO,
value=dag_run.logical_date.isoformat())
ti.xcom_push(key=XCOM_RUN_ID, value=dag_run.run_id)
diff --git a/tests/operators/test_trigger_dagrun.py
b/tests/operators/test_trigger_dagrun.py
index 7bee42243b..f369b70582 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -92,7 +92,7 @@ class TestDagRunOperator:
"""
Asserts whether the correct extra links url will be created.
- Specifically it tests whether the correct dag id and date are passed to
+ Specifically it tests whether the correct dag id and run id are passed
to
the method which constructs the final url.
Note: We can't run that method to generate the url itself because the
Flask app context
isn't available within the test logic, so it is mocked here.
@@ -111,7 +111,7 @@ class TestDagRunOperator:
args, _ = mock_build_url.call_args
expected_args = {
"dag_id": triggered_dag_run.dag_id,
- "base_date": triggered_dag_run.logical_date.isoformat(),
+ "dag_run_id": triggered_dag_run.run_id,
}
assert expected_args in args