This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a11e752d38c Fix Operator link for TriggerDagRunOperator (#47051)
a11e752d38c is described below

commit a11e752d38cc5275968332d9a9a10446259b56c3
Author: vatsrahul1001 <[email protected]>
AuthorDate: Fri Feb 28 01:09:07 2025 +0530

    Fix Operator link for TriggerDagRunOperator (#47051)
---
 airflow/utils/helpers.py                           |  4 +--
 .../src/airflow/sdk/execution_time/task_runner.py  |  2 +-
 tests/operators/test_trigger_dagrun.py             | 34 +++++++++++++++++-----
 tests/utils/test_helpers.py                        |  2 +-
 4 files changed, 30 insertions(+), 12 deletions(-)

diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index 371061069af..6196ec78028 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -256,10 +256,10 @@ def build_airflow_dagrun_url(dag_id: str, run_id: str) -> 
str:
     Build airflow dagrun url using base_url and provided dag_id and run_id.
 
     For example:
-    
http://localhost:9091/webapp/dags/hi/runs/manual__2025-02-23T18:27:39.051358+00:00_RZa1at4Q
+    
http://localhost:9091/dags/hi/runs/manual__2025-02-23T18:27:39.051358+00:00_RZa1at4Q
     """
     baseurl = conf.get("api", "base_url")
-    return f"{baseurl}/webapp/dags/{dag_id}/runs/{run_id}"
+    return f"{baseurl}/dags/{dag_id}/runs/{run_id}"
 
 
 # The 'template' argument is typed as Any because the jinja2.Template is too
diff --git a/task_sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task_sdk/src/airflow/sdk/execution_time/task_runner.py
index 97ee45cded5..0e5ce160dc5 100644
--- a/task_sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task_sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -779,7 +779,7 @@ def finalize(
 ):
     # Pushing xcom for each operator extra links defined on the operator only.
     for oe in ti.task.operator_extra_links:
-        link, xcom_key = oe.get_link(operator=ti.task, ti_key=ti.id), 
oe.xcom_key  # type: ignore[arg-type]
+        link, xcom_key = oe.get_link(operator=ti.task, ti_key=ti), oe.xcom_key 
 # type: ignore[arg-type]
         log.debug("Setting xcom for operator extra link", link=link, 
xcom_key=xcom_key)
         _xcom_push(ti, key=xcom_key, value=link)
 
diff --git a/tests/operators/test_trigger_dagrun.py 
b/tests/operators/test_trigger_dagrun.py
index 0f65c42af45..e8464dde2b3 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -24,6 +24,7 @@ from unittest import mock
 import pytest
 import time_machine
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, DagRunAlreadyExists, 
TaskDeferred
 from airflow.models.dag import DagModel
 from airflow.models.dagrun import DagRun
@@ -37,6 +38,7 @@ from airflow.utils.state import DagRunState, State, 
TaskInstanceState
 from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.db import parse_and_sync_to_db
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
 pytestmark = pytest.mark.db_test
 
@@ -95,14 +97,30 @@ class TestDagRunOperator:
             .one()
         )
 
-        # This is equivalent of a task run calling this and pushing to xcom
-        url = triggering_task.operator_extra_links[0].get_link(
-            operator=triggering_task, ti_key=triggering_ti.key
-        )
-        expected_url = (
-            
f"http://localhost:9091/webapp/dags/{triggered_dag_run.dag_id}/runs/{triggered_dag_run.run_id}";
-        )
-        assert url == expected_url
+        if AIRFLOW_V_3_0_PLUS:
+            base_url = conf.get_mandatory_value("api", "base_url").lower()
+            expected_url = 
f"{base_url}/dags/{triggered_dag_run.dag_id}/runs/{triggered_dag_run.run_id}"
+
+            link = triggering_task.operator_extra_links[0].get_link(
+                operator=triggering_task, ti_key=triggering_ti.key
+            )
+
+            assert link == expected_url, f"Expected {expected_url}, but got 
{link}"
+        else:
+            with mock.patch(
+                
"airflow.providers.standard.operators.trigger_dagrun.build_airflow_url_with_query"
+            ) as mock_build_url:
+                # This is equivalent of a task run calling this and pushing to 
xcom
+                triggering_task.operator_extra_links[0].get_link(
+                    operator=triggering_task, ti_key=triggering_ti.key
+                )
+                assert mock_build_url.called
+            args, _ = mock_build_url.call_args
+            expected_args = {
+                "dag_id": triggered_dag_run.dag_id,
+                "dag_run_id": triggered_dag_run.run_id,
+            }
+            assert expected_args in args
 
     def test_trigger_dagrun(self, dag_maker):
         """Test TriggerDagRunOperator."""
diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py
index 4cc7ddea72c..1784dd64afb 100644
--- a/tests/utils/test_helpers.py
+++ b/tests/utils/test_helpers.py
@@ -162,7 +162,7 @@ class TestHelpers:
         assert merged == {"a": 1, "r": {"b": 0, "c": 3}}
 
     def test_build_airflow_dagrun_url(self):
-        expected_url = "http://localhost:9091/webapp/dags/somedag/runs/abc123";
+        expected_url = "http://localhost:9091/dags/somedag/runs/abc123";
         assert build_airflow_dagrun_url(dag_id="somedag", run_id="abc123") == 
expected_url
 
     @pytest.mark.parametrize(

Reply via email to