This is an automated email from the ASF dual-hosted git repository.

utkarsharma pushed a commit to branch v2-9-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0df76f5727f79da0547b9d1cfd813fa11d1be0e9
Author: Fred Thomsen <[email protected]>
AuthorDate: Fri Jun 21 22:26:28 2024 -0500

    Fix TriggeredDagRunOperator triggered link (#40336)
    
    * Fix TriggeredDagRunOperator triggered link
    
    Link to the specific dag run id that was triggered, as opposed to the
    logical date, such that the instance that was triggered on the DAG grid
    UI.
    
    * Add comment regarding triggered dag xcom
    
    (cherry picked from commit 806bb80ce43dcd79ab57b040151a06ff679f2cf1)
---
 airflow/operators/trigger_dagrun.py    | 12 ++++++++----
 tests/operators/test_trigger_dagrun.py |  4 ++--
 2 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/airflow/operators/trigger_dagrun.py 
b/airflow/operators/trigger_dagrun.py
index f8cfa5256a..d8a1da7624 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -63,10 +63,13 @@ class TriggerDagRunLink(BaseOperatorLink):
     name = "Triggered DAG"
 
     def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> 
str:
-        # Fetch the correct execution date for the triggerED dag which is
+        # Fetch the correct dag_run_id for the triggerED dag which is
         # stored in xcom during execution of the triggerING task.
-        when = XCom.get_value(ti_key=ti_key, key=XCOM_LOGICAL_DATE_ISO)
-        query = {"dag_id": cast(TriggerDagRunOperator, 
operator).trigger_dag_id, "base_date": when}
+        triggered_dag_run_id = XCom.get_value(ti_key=ti_key, key=XCOM_RUN_ID)
+        query = {
+            "dag_id": cast(TriggerDagRunOperator, operator).trigger_dag_id,
+            "dag_run_id": triggered_dag_run_id,
+        }
         return build_airflow_url_with_query(query)
 
 
@@ -199,8 +202,9 @@ class TriggerDagRunOperator(BaseOperator):
                 raise e
         if dag_run is None:
             raise RuntimeError("The dag_run should be set here!")
-        # Store the execution date from the dag run (either created or found 
above) to
+        # Store the run id from the dag run (either created or found above) to
         # be used when creating the extra link on the webserver.
+        # TODO: Logical date as xcom stored only for backwards compatibility. 
Remove in Airflow 3.0
         ti = context["task_instance"]
         ti.xcom_push(key=XCOM_LOGICAL_DATE_ISO, 
value=dag_run.logical_date.isoformat())
         ti.xcom_push(key=XCOM_RUN_ID, value=dag_run.run_id)
diff --git a/tests/operators/test_trigger_dagrun.py 
b/tests/operators/test_trigger_dagrun.py
index 9eed9b786e..267bfeaa01 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -91,7 +91,7 @@ class TestDagRunOperator:
         """
         Asserts whether the correct extra links url will be created.
 
-        Specifically it tests whether the correct dag id and date are passed to
+        Specifically it tests whether the correct dag id and run id are passed 
to
         the method which constructs the final url.
         Note: We can't run that method to generate the url itself because the 
Flask app context
         isn't available within the test logic, so it is mocked here.
@@ -110,7 +110,7 @@ class TestDagRunOperator:
         args, _ = mock_build_url.call_args
         expected_args = {
             "dag_id": triggered_dag_run.dag_id,
-            "base_date": triggered_dag_run.logical_date.isoformat(),
+            "dag_run_id": triggered_dag_run.run_id,
         }
         assert expected_args in args
 

Reply via email to