This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a11e752d38c Fix Operator link for TriggerDagRunOperator (#47051)
a11e752d38c is described below
commit a11e752d38cc5275968332d9a9a10446259b56c3
Author: vatsrahul1001 <[email protected]>
AuthorDate: Fri Feb 28 01:09:07 2025 +0530
Fix Operator link for TriggerDagRunOperator (#47051)
---
airflow/utils/helpers.py | 4 +--
.../src/airflow/sdk/execution_time/task_runner.py | 2 +-
tests/operators/test_trigger_dagrun.py | 34 +++++++++++++++++-----
tests/utils/test_helpers.py | 2 +-
4 files changed, 30 insertions(+), 12 deletions(-)
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index 371061069af..6196ec78028 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -256,10 +256,10 @@ def build_airflow_dagrun_url(dag_id: str, run_id: str) ->
str:
Build airflow dagrun url using base_url and provided dag_id and run_id.
For example:
-
http://localhost:9091/webapp/dags/hi/runs/manual__2025-02-23T18:27:39.051358+00:00_RZa1at4Q
+
http://localhost:9091/dags/hi/runs/manual__2025-02-23T18:27:39.051358+00:00_RZa1at4Q
"""
baseurl = conf.get("api", "base_url")
- return f"{baseurl}/webapp/dags/{dag_id}/runs/{run_id}"
+ return f"{baseurl}/dags/{dag_id}/runs/{run_id}"
# The 'template' argument is typed as Any because the jinja2.Template is too
diff --git a/task_sdk/src/airflow/sdk/execution_time/task_runner.py
b/task_sdk/src/airflow/sdk/execution_time/task_runner.py
index 97ee45cded5..0e5ce160dc5 100644
--- a/task_sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task_sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -779,7 +779,7 @@ def finalize(
):
# Pushing xcom for each operator extra links defined on the operator only.
for oe in ti.task.operator_extra_links:
- link, xcom_key = oe.get_link(operator=ti.task, ti_key=ti.id),
oe.xcom_key # type: ignore[arg-type]
+ link, xcom_key = oe.get_link(operator=ti.task, ti_key=ti), oe.xcom_key
# type: ignore[arg-type]
log.debug("Setting xcom for operator extra link", link=link,
xcom_key=xcom_key)
_xcom_push(ti, key=xcom_key, value=link)
diff --git a/tests/operators/test_trigger_dagrun.py
b/tests/operators/test_trigger_dagrun.py
index 0f65c42af45..e8464dde2b3 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -24,6 +24,7 @@ from unittest import mock
import pytest
import time_machine
+from airflow.configuration import conf
from airflow.exceptions import AirflowException, DagRunAlreadyExists,
TaskDeferred
from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun
@@ -37,6 +38,7 @@ from airflow.utils.state import DagRunState, State,
TaskInstanceState
from airflow.utils.types import DagRunType
from tests_common.test_utils.db import parse_and_sync_to_db
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
pytestmark = pytest.mark.db_test
@@ -95,14 +97,30 @@ class TestDagRunOperator:
.one()
)
- # This is equivalent of a task run calling this and pushing to xcom
- url = triggering_task.operator_extra_links[0].get_link(
- operator=triggering_task, ti_key=triggering_ti.key
- )
- expected_url = (
-
f"http://localhost:9091/webapp/dags/{triggered_dag_run.dag_id}/runs/{triggered_dag_run.run_id}"
- )
- assert url == expected_url
+ if AIRFLOW_V_3_0_PLUS:
+ base_url = conf.get_mandatory_value("api", "base_url").lower()
+ expected_url =
f"{base_url}/dags/{triggered_dag_run.dag_id}/runs/{triggered_dag_run.run_id}"
+
+ link = triggering_task.operator_extra_links[0].get_link(
+ operator=triggering_task, ti_key=triggering_ti.key
+ )
+
+ assert link == expected_url, f"Expected {expected_url}, but got
{link}"
+ else:
+ with mock.patch(
+
"airflow.providers.standard.operators.trigger_dagrun.build_airflow_url_with_query"
+ ) as mock_build_url:
+ # This is equivalent of a task run calling this and pushing to
xcom
+ triggering_task.operator_extra_links[0].get_link(
+ operator=triggering_task, ti_key=triggering_ti.key
+ )
+ assert mock_build_url.called
+ args, _ = mock_build_url.call_args
+ expected_args = {
+ "dag_id": triggered_dag_run.dag_id,
+ "dag_run_id": triggered_dag_run.run_id,
+ }
+ assert expected_args in args
def test_trigger_dagrun(self, dag_maker):
"""Test TriggerDagRunOperator."""
diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py
index 4cc7ddea72c..1784dd64afb 100644
--- a/tests/utils/test_helpers.py
+++ b/tests/utils/test_helpers.py
@@ -162,7 +162,7 @@ class TestHelpers:
assert merged == {"a": 1, "r": {"b": 0, "c": 3}}
def test_build_airflow_dagrun_url(self):
- expected_url = "http://localhost:9091/webapp/dags/somedag/runs/abc123"
+ expected_url = "http://localhost:9091/dags/somedag/runs/abc123"
assert build_airflow_dagrun_url(dag_id="somedag", run_id="abc123") ==
expected_url
@pytest.mark.parametrize(