This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4637c9ed5c Avoid re-fetching DAG run in TriggerDagRunOperator (#27635)
4637c9ed5c is described below

commit 4637c9ed5cf1db2872f200c1a59b32edafaa65bc
Author: Aditya Malik <[email protected]>
AuthorDate: Wed Nov 16 09:21:33 2022 +0530

    Avoid re-fetching DAG run in TriggerDagRunOperator (#27635)
---
 airflow/api/common/trigger_dag.py      |  4 +---
 airflow/exceptions.py                  | 11 ++++++++++-
 airflow/operators/trigger_dagrun.py    |  2 +-
 tests/operators/test_trigger_dagrun.py | 32 ++++++++++++++++++++++++++++++++
 4 files changed, 44 insertions(+), 5 deletions(-)

diff --git a/airflow/api/common/trigger_dag.py 
b/airflow/api/common/trigger_dag.py
index c9a8a1a592..2e02814a79 100644
--- a/airflow/api/common/trigger_dag.py
+++ b/airflow/api/common/trigger_dag.py
@@ -75,9 +75,7 @@ def _trigger_dag(
     dag_run = DagRun.find_duplicate(dag_id=dag_id, 
execution_date=execution_date, run_id=run_id)
 
     if dag_run:
-        raise DagRunAlreadyExists(
-            f"A Dag Run already exists for dag id {dag_id} at {execution_date} 
with run id {run_id}"
-        )
+        raise DagRunAlreadyExists(dag_run=dag_run, 
execution_date=execution_date, run_id=run_id)
 
     run_conf = None
     if conf:
diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index a06d8ae164..5815e47647 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -23,7 +23,10 @@ from __future__ import annotations
 import datetime
 import warnings
 from http import HTTPStatus
-from typing import Any, NamedTuple, Sized
+from typing import TYPE_CHECKING, Any, NamedTuple, Sized
+
+if TYPE_CHECKING:
+    from airflow.models import DagRun
 
 
 class AirflowException(Exception):
@@ -185,6 +188,12 @@ class DagRunNotFound(AirflowNotFoundException):
 class DagRunAlreadyExists(AirflowBadRequest):
     """Raise when creating a DAG run for DAG which already has DAG run 
entry."""
 
+    def __init__(self, dag_run: DagRun, execution_date: datetime.datetime, 
run_id: str) -> None:
+        super().__init__(
+            f"A DAG Run already exists for DAG {dag_run.dag_id} at 
{execution_date} with run id {run_id}"
+        )
+        self.dag_run = dag_run
+
 
 class DagFileExists(AirflowBadRequest):
     """Raise when a DAG ID is still in DagBag i.e., DAG file is in DAG 
folder."""
diff --git a/airflow/operators/trigger_dagrun.py 
b/airflow/operators/trigger_dagrun.py
index 5c66e9410e..bfb1a32fee 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -156,7 +156,7 @@ class TriggerDagRunOperator(BaseOperator):
                 dag_bag = DagBag(dag_folder=dag_model.fileloc, 
read_dags_from_db=True)
                 dag = dag_bag.get_dag(self.trigger_dag_id)
                 dag.clear(start_date=parsed_execution_date, 
end_date=parsed_execution_date)
-                dag_run = DagRun.find(dag_id=dag.dag_id, run_id=run_id)[0]
+                dag_run = e.dag_run
             else:
                 raise e
         if dag_run is None:
diff --git a/tests/operators/test_trigger_dagrun.py 
b/tests/operators/test_trigger_dagrun.py
index 9b456afb19..fdaa7263b2 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -180,6 +180,38 @@ class TestDagRunOperator:
             assert triggered_dag_run.execution_date == utc_now
             self.assert_extra_link(triggered_dag_run, task, session)
 
+    def test_trigger_dagrun_with_scheduled_dag_run(self):
+        """Test TriggerDagRunOperator with custom execution_date and scheduled 
dag_run."""
+        utc_now = timezone.utcnow()
+        task = TriggerDagRunOperator(
+            task_id="test_trigger_dagrun_with_execution_date",
+            trigger_dag_id=TRIGGERED_DAG_ID,
+            execution_date=utc_now,
+            dag=self.dag,
+            poke_interval=1,
+            reset_dag_run=True,
+            wait_for_completion=True,
+        )
+        run_id = f"scheduled__{utc_now.isoformat()}"
+        with create_session() as session:
+            dag_run = DagRun(
+                dag_id=TRIGGERED_DAG_ID,
+                execution_date=utc_now,
+                state=State.SUCCESS,
+                run_type="scheduled",
+                run_id=run_id,
+            )
+            session.add(dag_run)
+            session.commit()
+            task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
+
+            dagruns = session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).all()
+            assert len(dagruns) == 1
+            triggered_dag_run = dagruns[0]
+            assert triggered_dag_run.external_trigger
+            assert triggered_dag_run.execution_date == utc_now
+            self.assert_extra_link(triggered_dag_run, task, session)
+
     def test_trigger_dagrun_with_templated_execution_date(self):
         """Test TriggerDagRunOperator with templated execution_date."""
         task = TriggerDagRunOperator(

Reply via email to