GitHub user Frost3281 edited a discussion: ExternalTaskSensor (only for last
dag runs) in Airflow 3
In Airflow 2, I used the following Sensor to check the status of the latest run
of external DAGs. In Airflow 3, this code no longer works.
"`Direct database access via the ORM is not allowed in Airflow 3.0`"
I have tried many options: Engine / Connection and other methods of database
access. And I tried access through API (always getting connection refused). Is
there a proper way in Airflow 3 to achieve the same functionality with this
operator?
```python
import json
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, cast
import pytz # type: ignore
from airflow.exceptions import TaskNotFound
from airflow.models import DagBag, DagModel
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.db import provide_session # type: ignore
from sqlalchemy import select
from sqlalchemy.orm import Session
DAGS_TO_EXTERNAL_TASK_DOCS: dict[str, str] = {}
class LastDagRunSensor(ExternalTaskSensor):
"""Sensor to retrieve the last run of a DAG."""
def __init__(
self,
external_dag_id: str,
external_task_id: str | None = None,
**kwargs: Any,
) -> None:
"""Initializer."""
def dag_last_exec(crnt_dttm: datetime) -> datetime: # noqa: ARG001
return self.get_dag_last_execution_date(self.external_dag_id) #
type: ignore
super().__init__(
external_dag_id=external_dag_id,
external_task_id=external_task_id,
**kwargs,
)
self.execution_date_fn = dag_last_exec
@provide_session
def get_dag_last_execution_date(
self,
dag_id: str, # noqa: ARG002
session: Session,
) -> datetime:
"""Get the last execution date of the DAG."""
dag = _get_dag_by_id(session, self.external_dag_id)
if not dag:
return _yesterday()
return
dag.get_last_dagrun(include_externally_triggered=True).execution_date
def _get_dag_by_id(session: Session, dag_id: str) -> DagModel | None:
query = select(DagModel).where(DagModel.dag_id == dag_id)
return session.execute(query).scalars().first()
def _yesterday() -> datetime:
return datetime.now(tz=pytz.timezone('Europe/Moscow')) - timedelta(days=1)
GitHub link: https://github.com/apache/airflow/discussions/49997
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]