ThiagoLuka opened a new issue, #36178: URL: https://github.com/apache/airflow/issues/36178
### Apache Airflow version Other Airflow 2 version (please specify below) ### What happened I was running some tests to figure out how `data_interval_start ` and `data_interval_end` (among other date values) work while I was trying to make a custom version of data intervals based on them. I'm not really sure if that is a good idea or not, but I did find weird when a DagRun was referenced while not being the last executed run. I've set up a dag to run only one operator which received and printed a bunch of dates from the context and printed. The schedule was set to every 5 minutes and I've triggered the DAG manually a bunch of times while I was testing it. The two relevante ones were - manual__2023-12-11T19:51:13.326018+00:00 - manual__2023-12-11T19:56:29.214134+00:00 Here's a print of how the second manually triggered run showed:  The problem is that the next scheduled dagrun (scheduled__2023-12-11T19:55:00+00:00) references the first manually triggered run (manual__2023-12-11T19:51:13.326018+00:00) as being its previous one, not the latter (manual__2023-12-11T19:56:29.214134+00:00) which I've shown. Here's the print:  Is this the expected behaviour? ### What you think should happen instead _No response_ ### How to reproduce Manually trigger the DAG through the UI between some of the runs. This was the code used: ``` from typing import Sequence, Union import pendulum from airflow import DAG from airflow.models import DagRun from airflow.models.baseoperator import BaseOperator def custom_data_interval( date_type: str, airflow_data_interval_start: str, airflow_data_interval_end: str, dag_start_date, last_dag_run: Union[DagRun, None], ) -> str: if date_type == 'start_date': if last_dag_run is None: # first run case return dag_start_date return airflow_data_interval_start if date_type == 'end_date': return airflow_data_interval_end return '' class TestOperator(BaseOperator): template_fields: Sequence[str] = ["custom_params"] def __init__(self, custom_params: dict, **kwargs) -> None: super().__init__(**kwargs) self.custom_params = custom_params def execute(self, context): title = ' TESTE ' print(f'{title:=^104}') for param_key, param_value in self.custom_params.items(): print(f'{param_key:>50}: {param_value}') with DAG( dag_id="test_dag", schedule='*/5 * * * *', start_date=pendulum.datetime(year=2023, month=1, day=1), catchup=False, user_defined_macros={'custom_data_interval': custom_data_interval}, ) as dag: custom_data_interval_airflow_vars = [ "airflow_data_interval_start = data_interval_start", "airflow_data_interval_end = data_interval_end", "dag_start_date = dag.start_date", "last_dag_run = dag_run.get_previous_dagrun()" ] custom_data_interval_start = "{{ custom_data_interval(date_type='start_date'," + ', '.join(custom_data_interval_airflow_vars) + ") }}" custom_data_interval_end = "{{ custom_data_interval(date_type='end_date'," + ', '.join(custom_data_interval_airflow_vars) + ") }}" test_task = TestOperator( task_id='test_task', custom_params={ "data_interval_start": "{{ data_interval_start }}", "data_interval_end": "{{ data_interval_end }}", "ds": "{{ ds }}", "ds_nodash": "{{ ds_nodash }}", "logical_date": "{{ logical_date }}", "execution_date": "{{ execution_date }}", "prev_data_interval_start_success": "{{ prev_data_interval_start_success }}", "prev_data_interval_end_success": "{{ prev_data_interval_end_success }}", "prev_start_date_success": "{{ prev_start_date_success }}", "run_id": "{{ run_id }}", "ts": "{{ ts }}", "ts_nodash": "{{ ts_nodash }}", "ts_nodash_with_tz": "{{ ts_nodash_with_tz }}", "last_dag_run": "{{ dag_run.get_previous_dagrun() }}", "dag_start_date": "{{ dag.start_date }}", "custom_data_interval_start": custom_data_interval_start, "custom_data_interval_end": custom_data_interval_end, }, ) ``` ### Operating System Amazon Linux 2 ### Versions of Apache Airflow Providers _No response_ ### Deployment Amazon (AWS) MWAA ### Deployment details _No response_ ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
