This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4637c9ed5c Avoid re-fetching DAG run in TriggerDagRunOperator (#27635)
4637c9ed5c is described below
commit 4637c9ed5cf1db2872f200c1a59b32edafaa65bc
Author: Aditya Malik <[email protected]>
AuthorDate: Wed Nov 16 09:21:33 2022 +0530
Avoid re-fetching DAG run in TriggerDagRunOperator (#27635)
---
airflow/api/common/trigger_dag.py | 4 +---
airflow/exceptions.py | 11 ++++++++++-
airflow/operators/trigger_dagrun.py | 2 +-
tests/operators/test_trigger_dagrun.py | 32 ++++++++++++++++++++++++++++++++
4 files changed, 44 insertions(+), 5 deletions(-)
diff --git a/airflow/api/common/trigger_dag.py
b/airflow/api/common/trigger_dag.py
index c9a8a1a592..2e02814a79 100644
--- a/airflow/api/common/trigger_dag.py
+++ b/airflow/api/common/trigger_dag.py
@@ -75,9 +75,7 @@ def _trigger_dag(
dag_run = DagRun.find_duplicate(dag_id=dag_id,
execution_date=execution_date, run_id=run_id)
if dag_run:
- raise DagRunAlreadyExists(
- f"A Dag Run already exists for dag id {dag_id} at {execution_date}
with run id {run_id}"
- )
+ raise DagRunAlreadyExists(dag_run=dag_run,
execution_date=execution_date, run_id=run_id)
run_conf = None
if conf:
diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index a06d8ae164..5815e47647 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -23,7 +23,10 @@ from __future__ import annotations
import datetime
import warnings
from http import HTTPStatus
-from typing import Any, NamedTuple, Sized
+from typing import TYPE_CHECKING, Any, NamedTuple, Sized
+
+if TYPE_CHECKING:
+ from airflow.models import DagRun
class AirflowException(Exception):
@@ -185,6 +188,12 @@ class DagRunNotFound(AirflowNotFoundException):
class DagRunAlreadyExists(AirflowBadRequest):
"""Raise when creating a DAG run for DAG which already has DAG run
entry."""
+ def __init__(self, dag_run: DagRun, execution_date: datetime.datetime,
run_id: str) -> None:
+ super().__init__(
+ f"A DAG Run already exists for DAG {dag_run.dag_id} at
{execution_date} with run id {run_id}"
+ )
+ self.dag_run = dag_run
+
class DagFileExists(AirflowBadRequest):
"""Raise when a DAG ID is still in DagBag i.e., DAG file is in DAG
folder."""
diff --git a/airflow/operators/trigger_dagrun.py
b/airflow/operators/trigger_dagrun.py
index 5c66e9410e..bfb1a32fee 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -156,7 +156,7 @@ class TriggerDagRunOperator(BaseOperator):
dag_bag = DagBag(dag_folder=dag_model.fileloc,
read_dags_from_db=True)
dag = dag_bag.get_dag(self.trigger_dag_id)
dag.clear(start_date=parsed_execution_date,
end_date=parsed_execution_date)
- dag_run = DagRun.find(dag_id=dag.dag_id, run_id=run_id)[0]
+ dag_run = e.dag_run
else:
raise e
if dag_run is None:
diff --git a/tests/operators/test_trigger_dagrun.py
b/tests/operators/test_trigger_dagrun.py
index 9b456afb19..fdaa7263b2 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -180,6 +180,38 @@ class TestDagRunOperator:
assert triggered_dag_run.execution_date == utc_now
self.assert_extra_link(triggered_dag_run, task, session)
+ def test_trigger_dagrun_with_scheduled_dag_run(self):
+ """Test TriggerDagRunOperator with custom execution_date and scheduled
dag_run."""
+ utc_now = timezone.utcnow()
+ task = TriggerDagRunOperator(
+ task_id="test_trigger_dagrun_with_execution_date",
+ trigger_dag_id=TRIGGERED_DAG_ID,
+ execution_date=utc_now,
+ dag=self.dag,
+ poke_interval=1,
+ reset_dag_run=True,
+ wait_for_completion=True,
+ )
+ run_id = f"scheduled__{utc_now.isoformat()}"
+ with create_session() as session:
+ dag_run = DagRun(
+ dag_id=TRIGGERED_DAG_ID,
+ execution_date=utc_now,
+ state=State.SUCCESS,
+ run_type="scheduled",
+ run_id=run_id,
+ )
+ session.add(dag_run)
+ session.commit()
+ task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
+
+ dagruns = session.query(DagRun).filter(DagRun.dag_id ==
TRIGGERED_DAG_ID).all()
+ assert len(dagruns) == 1
+ triggered_dag_run = dagruns[0]
+ assert triggered_dag_run.external_trigger
+ assert triggered_dag_run.execution_date == utc_now
+ self.assert_extra_link(triggered_dag_run, task, session)
+
def test_trigger_dagrun_with_templated_execution_date(self):
"""Test TriggerDagRunOperator with templated execution_date."""
task = TriggerDagRunOperator(