This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ae2fe0ce713 fix: Fix Triggered DAG button not visible during 
queued/running state (#67327)
ae2fe0ce713 is described below

commit ae2fe0ce71389a38325f620a185e66a59cef3811
Author: Sejal Gupta <[email protected]>
AuthorDate: Thu Jun 11 03:35:57 2026 +0530

    fix: Fix Triggered DAG button not visible during queued/running state 
(#67327)
---
 .../providers/standard/operators/trigger_dagrun.py | 19 +++++++++++++-
 .../unit/standard/operators/test_trigger_dagrun.py | 29 ++++++++++++++++++++++
 .../task_sdk/execution_time/test_task_runner.py    | 12 +++++++++
 3 files changed, 59 insertions(+), 1 deletion(-)

diff --git 
a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py 
b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
index ff271b3c513..60d96cbe3f4 100644
--- 
a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
+++ 
b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
@@ -21,7 +21,7 @@ import datetime
 import inspect
 import json
 import time
-from collections.abc import Sequence
+from collections.abc import Mapping, Sequence
 from json import JSONDecodeError
 from typing import TYPE_CHECKING, Any, cast, overload
 
@@ -325,6 +325,23 @@ class TriggerDagRunOperator(BaseOperator):
         if parsed_run_after and "run_after" in parameters:
             kwargs_accepted["run_after"] = parsed_run_after
 
+        if isinstance(context, Mapping):
+            from airflow.utils import helpers
+
+            try:
+                build_url_fn = getattr(helpers, "build_airflow_dagrun_url", 
None)
+                ti = context.get("task_instance") or context.get("ti")
+
+                if build_url_fn and ti and hasattr(ti, "xcom_push"):
+                    ti.xcom_push(
+                        key=TriggerDagRunLink().xcom_key,
+                        value=build_url_fn(dag_id=self.trigger_dag_id, 
run_id=run_id),
+                    )
+            except (AttributeError, KeyError, TypeError, AssertionError) as e:
+                self.log.debug(
+                    "Skipping TriggerDagRunLink XCom push due to mock or 
incomplete context: %s", e
+                )
+
         raise DagRunTriggerException(**kwargs_accepted)
 
     def _trigger_dag_af_2(self, context, run_id, parsed_logical_date):
diff --git 
a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py 
b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py
index 972503e6f1e..2bce35574dc 100644
--- a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py
+++ b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py
@@ -203,6 +203,35 @@ class TestDagRunOperator:
         expected_url = f"{base_url}dags/{TRIGGERED_DAG_ID}/runs/test_run_id"
         assert link == expected_url, f"Expected {expected_url}, but got {link}"
 
+    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is 
different for Airflow 2 & 3")
+    def test_trigger_dagrun_pushes_extra_link_xcom_before_exception(self):
+        """
+        Eagerly push the "Triggered DAG" extra-link URL so the UI button is 
available
+        while the task is still running/deferred, not only after finalize() 
runs.
+        """
+        from airflow.providers.standard.operators.trigger_dagrun import 
TriggerDagRunLink
+        from airflow.utils import helpers
+
+        build_url_fn = getattr(helpers, "build_airflow_dagrun_url", None)
+        if not build_url_fn:
+            pytest.skip("Skipping because build_airflow_dagrun_url is not 
available in this Airflow version")
+
+        task = TriggerDagRunOperator(
+            task_id="test_task",
+            trigger_dag_id=TRIGGERED_DAG_ID,
+            trigger_run_id="custom_run_id",
+        )
+
+        ti_mock = mock.MagicMock()
+        with pytest.raises(DagRunTriggerException):
+            task.execute(context={"task_instance": ti_mock})
+
+        expected_url = build_url_fn(dag_id=TRIGGERED_DAG_ID, 
run_id="custom_run_id")
+        ti_mock.xcom_push.assert_called_once_with(
+            key=TriggerDagRunLink().xcom_key,
+            value=expected_url,
+        )
+
     @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is 
different for Airflow 2 & 3")
     def test_trigger_dagrun_custom_run_id(self):
         task = TriggerDagRunOperator(
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index c71fbc85ce6..471541a6429 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -4987,6 +4987,8 @@ class TestTriggerDagRunOperator:
         mock_supervisor_comms.send.side_effect = [
             # Set RTIF
             None,
+            # Account for the extra link XCom message sent by TriggerDagRunLink
+            None,
             # Successful Dag Run trigger
             OKResponse(ok=True),
             # Set XCOM,
@@ -5005,6 +5007,16 @@ class TestTriggerDagRunOperator:
         assert msg.state == expected_task_state
 
         expected_calls = [
+            mock.call.send(
+                msg=SetXCom(
+                    key="_link_TriggerDagRunLink",
+                    value="/dags/test_dag/runs/test_run_id",
+                    dag_id="test_handle_trigger_dag_run_wait_for_completion",
+                    task_id="test_task",
+                    run_id="test_run",
+                    map_index=-1,
+                ),
+            ),
             mock.call.send(
                 msg=TriggerDagRun(
                     dag_id="test_dag",

Reply via email to