matthewblock opened a new issue, #28016:
URL: https://github.com/apache/airflow/issues/28016

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   Using Airflow Version 2.4.2
   
   Manually triggered DAG runs interspersed between scheduled DAG runs cause 
the scheduled DAG runs' [template 
variables](https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html)
 — in my specific use case, `prev_data_interval_start_success` and 
`prev_data_interval_end_success` — to no longer behave as expected. This 
appears to be due to how we determine a previous DAG run.
   
   When we have mixed manually triggered and scheduled DAG runs, the 
`logical_date` are not equivalent - For manually triggered DAG runs, the 
`logical_date` is the exact timestamp the DAG was triggered.
   
   The following table shows mixed manually triggered and scheduled DAG runs, 
and some of the templated timestamps. Notice that this table is ordered by the 
`Actual execution date`, which is the exact timestamp the DAG executed, and 
that the `logical_date` is out of order for manually triggered DAGs.
   
   **Bolded** are the template variables that IMO do not behave as predicted:
   
   | DAG Run | Triggered | `logical_date` | Actual execution date | 
`data_interval`<br />`_start` | `data_interval`<br />`_end` | `prev_data_`<br 
/>`interval_`<br />`start_success` | `prev_data_`<br />`interval_`<br 
/>`end_success` |
   | ------- | --------- | ------------- | --------------------- | 
--------------------- | ------------------- | 
------------------------------------ | ---------------------------------- |
   | 1       | Scheduled | 1:05:00       | 1:10:29               | 1:05:00      
         | 1:10:00             | None                                 | None    
                           |
   | 2       | Manual    | 1:11:26       | 1:11:26               | 1:05:00      
         | 1:10:00             | 1:05:00                              | 1:10:00 
                           |
   | 3       | Scheduled | 1:10:00       | 1:15:00               | 1:10:00      
         | 1:15:00             | 1:05:00                              | 1:10:00 
                           |
   | 4       | Manual    | 1:17:51       | 1:17:52               | 1:10:00      
         | 1:15:00             | 1:05:00                              | 1:10:00 
                           |
   | 5       | Scheduled | 1:15:00       | 1:20:01               | 1:15:00      
         | 1:20:00             | **1:05:00**                              | 
**1:10:00**                            |
   | 6       | Scheduled | 1:20:00       | 1:25:00               | 1:20:00      
         | 1:25:00             | **1:10:00**                              | 
**1:15:00**                            |
   | 7       | Scheduled | 1:25:00       | 1:30:01               | 1:25:00      
         | 1:30:00             | 1:20:00                              | 1:25:00 
                           |
   
   For the bolded scheduled DAG runs, note that the 
`prev_data_interval_start_success` and `prev_data_interval_end_success` both 
point to the data interval that is technically **earlier** than the most recent 
successful DAG run's data interval.
   
   #### Causes
   
   I believe this is due to the way Airflow calculates the **previous DAG 
run**. If we order by `logical_date`, this function will prioritize manually 
triggered DAG runs, because:
   - Manually triggered DAG runs' `logical_date` (`==execution_date`) are 
_later_ than `data_interval_end`
   - Scheduled DAG runs' `logical_date` are _equal to_ `data_interval_start`, 
which occurs **before** `data_interval_end` for an equivalent "trigger" 
timestamp
   
   See in 
[`airflow/models/dagrun.py`](https://github.com/apache/airflow/blob/430e930902792fc37cdd2c517783f7dd544fbebf/airflow/models/dagrun.py#L503):
   
   ```
       @provide_session
       def get_previous_dagrun(
           self, state: DagRunState | None = None, session: Session = 
NEW_SESSION
       ) -> DagRun | None:
           """The previous DagRun, if there is one"""
           filters = [
               DagRun.dag_id == self.dag_id,
               DagRun.execution_date < self.execution_date, <-- THIS LINE
           ]
           if state is not None:
               filters.append(DagRun.state == state)
           return 
session.query(DagRun).filter(*filters).order_by(DagRun.execution_date.desc()).first()
   ```
   
   ### What you think should happen instead
   
   IMO this is a bug - DAG Run 6 in the table above should identify DAG Run 5 
as the most recent successful DAG run, and the template variables should 
reflect this accordingly. 
   
   A deeper issue in how Airflow draws the line between scheduled DAG runs and 
manually triggered DAG runs.
   
   There have been 
[discussions](https://github.com/apache/airflow/discussions/22232) on keeping 
manually triggered DAGs and scheduled DAGs entirely separate - best practice 
being don't ever manually trigger scheduled DAG runs.
   
   In our use case, we want to be able to easily trigger a DAG run when there 
is a gap in execution, and that DAG run should have a data interval that covers 
any gaps.
   
   **Possible Solutions**
   
   - For DAGs that have a time-based schedule (not `null` and not a `Dataset`), 
manually triggered DAG runs set `logical_date` to `data_interval_start`
   - Enhance backfill feature by enabling it via UI, not just CLI
       - Backfill should be much more accessible to Airflow users
   
   ### How to reproduce
   
   Sample DAG that prints template variables every 5 minutes.
   
   To reproduce, manually trigger the DAG between scheduled DAG runs and note 
the lag in `prev_data_interval_start_success` and 
`prev_data_interval_end_success`.
   
   ```
   from datetime import datetime, timedelta
   
   from airflow import DAG
   from airflow.operators.python import PythonOperator
   
   default_args = {
       "owner": "airflow",
       "depends_on_past": False,
       "start_date": datetime(2020, 1, 1),
       "retries": 0,
       "retry_delay": timedelta(minutes=5),
   }
   
   
   def print_data_intervals(
       data_interval_start: datetime,
       data_interval_end: datetime,
       prev_data_interval_start_success: datetime,
       prev_data_interval_end_success: datetime,
       logical_date: datetime,
   ):
       print(f"{data_interval_start=}")
       print(f"{data_interval_end=}")
       print(f"{prev_data_interval_start_success=}")
       print(f"{prev_data_interval_end_success=}")
       print(f"{logical_date=}")
   
   
   with DAG(
       dag_id="test_data_intervals",
       default_args=default_args,
       schedule="*/5 * * * *",
       catchup=False,
   ):
       task = PythonOperator(
           task_id="print_data_intervals",
           python_callable=print_data_intervals,
       )
   
       task
   
   ```
   
   ### Operating System
   
   debian
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to