This is an automated email from the ASF dual-hosted git repository.
rom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d0f433b0245 Fix TriggerDagRunOperator extra_link when trigger_dag_id
is templated (#42810)
d0f433b0245 is described below
commit d0f433b0245e910ec568aaf255f28499e296ffce
Author: Fred Thomsen <[email protected]>
AuthorDate: Thu Nov 21 09:30:41 2024 -0500
Fix TriggerDagRunOperator extra_link when trigger_dag_id is templated
(#42810)
---
.../providers/standard/operators/trigger_dagrun.py | 19 ++++++++++++++++++-
tests/operators/test_trigger_dagrun.py | 21 +++++++++++++++++++++
2 files changed, 39 insertions(+), 1 deletion(-)
diff --git
a/providers/src/airflow/providers/standard/operators/trigger_dagrun.py
b/providers/src/airflow/providers/standard/operators/trigger_dagrun.py
index 30818923912..dbe9425a1c1 100644
--- a/providers/src/airflow/providers/standard/operators/trigger_dagrun.py
+++ b/providers/src/airflow/providers/standard/operators/trigger_dagrun.py
@@ -67,11 +67,28 @@ class TriggerDagRunLink(BaseOperatorLink):
name = "Triggered DAG"
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) ->
str:
+ from airflow.models.renderedtifields import RenderedTaskInstanceFields
+ from airflow.models.taskinstance import TaskInstance
+
+ ti = TaskInstance.get_task_instance(
+ dag_id=ti_key.dag_id, run_id=ti_key.run_id,
task_id=ti_key.task_id, map_index=ti_key.map_index
+ )
+ if TYPE_CHECKING:
+ assert ti is not None
+
+ template_fields = RenderedTaskInstanceFields.get_templated_fields(ti)
+ untemplated_trigger_dag_id = cast(TriggerDagRunOperator,
operator).trigger_dag_id
+ if template_fields:
+ trigger_dag_id = template_fields.get("trigger_dag_id",
untemplated_trigger_dag_id)
+ else:
+ trigger_dag_id = untemplated_trigger_dag_id
+
# Fetch the correct dag_run_id for the triggerED dag which is
# stored in xcom during execution of the triggerING task.
triggered_dag_run_id = XCom.get_value(ti_key=ti_key, key=XCOM_RUN_ID)
+
query = {
- "dag_id": cast(TriggerDagRunOperator, operator).trigger_dag_id,
+ "dag_id": trigger_dag_id,
"dag_run_id": triggered_dag_run_id,
}
return build_airflow_url_with_query(query)
diff --git a/tests/operators/test_trigger_dagrun.py
b/tests/operators/test_trigger_dagrun.py
index 85daeaed275..3100f62d66d 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -271,6 +271,27 @@ class TestDagRunOperator:
assert triggered_dag_run.logical_date == DEFAULT_DATE
self.assert_extra_link(triggered_dag_run, task, session)
+ def test_trigger_dagrun_with_templated_trigger_dag_id(self, dag_maker):
+ """Test TriggerDagRunOperator with templated trigger dag id."""
+ with dag_maker(
+ TEST_DAG_ID, default_args={"owner": "airflow", "start_date":
DEFAULT_DATE}, serialized=True
+ ) as dag:
+ task = TriggerDagRunOperator(
+
task_id="__".join(["test_trigger_dagrun_with_templated_trigger_dag_id",
TRIGGERED_DAG_ID]),
+ trigger_dag_id="{{ ti.task_id.rsplit('.',
1)[-1].split('__')[-1] }}",
+ )
+ self.re_sync_triggered_dag_to_db(dag, dag_maker)
+ dag_maker.create_dagrun()
+ task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
+
+ with create_session() as session:
+ dagruns = session.query(DagRun).filter(DagRun.dag_id ==
TRIGGERED_DAG_ID).all()
+ assert len(dagruns) == 1
+ triggered_dag_run = dagruns[0]
+ assert triggered_dag_run.external_trigger
+ assert triggered_dag_run.dag_id == TRIGGERED_DAG_ID
+ self.assert_extra_link(triggered_dag_run, task, session)
+
def test_trigger_dagrun_operator_conf(self, dag_maker):
"""Test passing conf to the triggered DagRun."""
with dag_maker(