GitHub user Frost3281 edited a discussion: ExternalTaskSensor (only for last 
dag runs) in Airflow 3

In Airflow 2, I used the following Sensor to check the status of the latest run 
of external DAGs. In Airflow 3, this code no longer works.

"`Direct database access via the ORM is not allowed in Airflow 3.0`"

I have tried many options: Engine / Connection and other methods of database 
access. And I tried access through API (always getting connection refused). Is 
there a proper way in Airflow 3 to achieve the same functionality with this 
operator?

```python
import json
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, cast

import pytz  # type: ignore
from airflow.exceptions import TaskNotFound
from airflow.models import DagBag, DagModel
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.db import provide_session  # type: ignore
from sqlalchemy import select
from sqlalchemy.orm import Session

DAGS_TO_EXTERNAL_TASK_DOCS: dict[str, str] = {}


class LastDagRunSensor(ExternalTaskSensor):
    """Sensor to retrieve the last run of a DAG."""

    def __init__(
        self,
        external_dag_id: str,
        external_task_id: str | None = None,
        **kwargs: Any,
    ) -> None:
        """Initializer."""

        def dag_last_exec(crnt_dttm: datetime) -> datetime:  # noqa: ARG001
            return self.get_dag_last_execution_date(self.external_dag_id)  # 
type: ignore

        super().__init__(
            external_dag_id=external_dag_id,
            external_task_id=external_task_id,
            **kwargs,
        )
        self.execution_date_fn = dag_last_exec

    @provide_session
    def get_dag_last_execution_date(
        self,
        dag_id: str,  # noqa: ARG002
        session: Session,
    ) -> datetime:
        """Get the last execution date of the DAG."""
        dag = _get_dag_by_id(session, self.external_dag_id)
        if not dag:
            return _yesterday()
        return 
dag.get_last_dagrun(include_externally_triggered=True).execution_date


def _get_dag_by_id(session: Session, dag_id: str) -> DagModel | None:
    query = select(DagModel).where(DagModel.dag_id == dag_id)
    return session.execute(query).scalars().first()


def _yesterday() -> datetime:
    return datetime.now(tz=pytz.timezone('Europe/Moscow')) - timedelta(days=1)


GitHub link: https://github.com/apache/airflow/discussions/49997

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to