This is an automated email from the ASF dual-hosted git repository.

rom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d0f433b0245 Fix TriggerDagRunOperator extra_link when trigger_dag_id 
is templated (#42810)
d0f433b0245 is described below

commit d0f433b0245e910ec568aaf255f28499e296ffce
Author: Fred Thomsen <[email protected]>
AuthorDate: Thu Nov 21 09:30:41 2024 -0500

    Fix TriggerDagRunOperator extra_link when trigger_dag_id is templated 
(#42810)
---
 .../providers/standard/operators/trigger_dagrun.py  | 19 ++++++++++++++++++-
 tests/operators/test_trigger_dagrun.py              | 21 +++++++++++++++++++++
 2 files changed, 39 insertions(+), 1 deletion(-)

diff --git 
a/providers/src/airflow/providers/standard/operators/trigger_dagrun.py 
b/providers/src/airflow/providers/standard/operators/trigger_dagrun.py
index 30818923912..dbe9425a1c1 100644
--- a/providers/src/airflow/providers/standard/operators/trigger_dagrun.py
+++ b/providers/src/airflow/providers/standard/operators/trigger_dagrun.py
@@ -67,11 +67,28 @@ class TriggerDagRunLink(BaseOperatorLink):
     name = "Triggered DAG"
 
     def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> 
str:
+        from airflow.models.renderedtifields import RenderedTaskInstanceFields
+        from airflow.models.taskinstance import TaskInstance
+
+        ti = TaskInstance.get_task_instance(
+            dag_id=ti_key.dag_id, run_id=ti_key.run_id, 
task_id=ti_key.task_id, map_index=ti_key.map_index
+        )
+        if TYPE_CHECKING:
+            assert ti is not None
+
+        template_fields = RenderedTaskInstanceFields.get_templated_fields(ti)
+        untemplated_trigger_dag_id = cast(TriggerDagRunOperator, 
operator).trigger_dag_id
+        if template_fields:
+            trigger_dag_id = template_fields.get("trigger_dag_id", 
untemplated_trigger_dag_id)
+        else:
+            trigger_dag_id = untemplated_trigger_dag_id
+
         # Fetch the correct dag_run_id for the triggerED dag which is
         # stored in xcom during execution of the triggerING task.
         triggered_dag_run_id = XCom.get_value(ti_key=ti_key, key=XCOM_RUN_ID)
+
         query = {
-            "dag_id": cast(TriggerDagRunOperator, operator).trigger_dag_id,
+            "dag_id": trigger_dag_id,
             "dag_run_id": triggered_dag_run_id,
         }
         return build_airflow_url_with_query(query)
diff --git a/tests/operators/test_trigger_dagrun.py 
b/tests/operators/test_trigger_dagrun.py
index 85daeaed275..3100f62d66d 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -271,6 +271,27 @@ class TestDagRunOperator:
             assert triggered_dag_run.logical_date == DEFAULT_DATE
             self.assert_extra_link(triggered_dag_run, task, session)
 
+    def test_trigger_dagrun_with_templated_trigger_dag_id(self, dag_maker):
+        """Test TriggerDagRunOperator with templated trigger dag id."""
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                
task_id="__".join(["test_trigger_dagrun_with_templated_trigger_dag_id", 
TRIGGERED_DAG_ID]),
+                trigger_dag_id="{{ ti.task_id.rsplit('.', 
1)[-1].split('__')[-1] }}",
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
+
+        with create_session() as session:
+            dagruns = session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).all()
+            assert len(dagruns) == 1
+            triggered_dag_run = dagruns[0]
+            assert triggered_dag_run.external_trigger
+            assert triggered_dag_run.dag_id == TRIGGERED_DAG_ID
+            self.assert_extra_link(triggered_dag_run, task, session)
+
     def test_trigger_dagrun_operator_conf(self, dag_maker):
         """Test passing conf to the triggered DagRun."""
         with dag_maker(

Reply via email to