ThiagoLuka opened a new issue, #36178:
URL: https://github.com/apache/airflow/issues/36178

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   I was running some tests to figure out how `data_interval_start ` and 
`data_interval_end` (among other date values) work while I was trying to make a 
custom version of data intervals based on them. I'm not really sure if that is 
a good idea or not, but I did find weird when a DagRun was referenced while not 
being the last executed run.
   
   I've set up a dag to run only one operator which received and printed a 
bunch of dates from the context and printed. The schedule was set to every 5 
minutes and I've triggered the DAG manually a bunch of times while I was 
testing it. The two relevante ones were 
   
   - manual__2023-12-11T19:51:13.326018+00:00
   - manual__2023-12-11T19:56:29.214134+00:00
   
   Here's a print of how the second manually triggered run showed:
   
   ![Screenshot 2023-12-11 at 19 24 
39](https://github.com/apache/airflow/assets/7564094/c4b6f6e1-5c30-4572-8abf-be6dd6ed49fa)
   
   The problem is that the next scheduled dagrun 
(scheduled__2023-12-11T19:55:00+00:00) references the first manually triggered 
run (manual__2023-12-11T19:51:13.326018+00:00) as being its previous one, not 
the latter (manual__2023-12-11T19:56:29.214134+00:00) which I've shown. Here's 
the print:
   
   ![Screenshot 2023-12-11 at 19 26 
48](https://github.com/apache/airflow/assets/7564094/81a1936b-fc2c-48a3-8674-2c18fe13e2fe)
   
   Is this the expected behaviour?
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   Manually trigger the DAG through the UI between some of the runs. This was 
the code used:
   
   ```
   from typing import Sequence, Union
   import pendulum
   
   from airflow import DAG
   from airflow.models import DagRun
   from airflow.models.baseoperator import BaseOperator
   
   def custom_data_interval(
       date_type: str,
       airflow_data_interval_start: str,
       airflow_data_interval_end: str,
       dag_start_date,
       last_dag_run: Union[DagRun, None],
   ) -> str:
   
     if date_type == 'start_date':
       if last_dag_run is None:  # first run case
         return dag_start_date
       return airflow_data_interval_start
   
     if date_type == 'end_date':
       return airflow_data_interval_end
   
     return ''
   
   
   
   class TestOperator(BaseOperator):
   
     template_fields: Sequence[str] = ["custom_params"]
   
     def __init__(self, custom_params: dict, **kwargs) -> None:
       super().__init__(**kwargs)
       self.custom_params = custom_params
   
     def execute(self, context):
       title = ' TESTE '
       print(f'{title:=^104}')
       for param_key, param_value in self.custom_params.items():
         print(f'{param_key:>50}: {param_value}')
   
   
   with DAG(
     dag_id="test_dag",
     schedule='*/5 * * * *',
     start_date=pendulum.datetime(year=2023, month=1, day=1),
     catchup=False,
     user_defined_macros={'custom_data_interval': custom_data_interval},
   ) as dag:
   
     custom_data_interval_airflow_vars = [
       "airflow_data_interval_start = data_interval_start",
       "airflow_data_interval_end = data_interval_end",
       "dag_start_date = dag.start_date",
       "last_dag_run = dag_run.get_previous_dagrun()"
     ]
     custom_data_interval_start = "{{ 
custom_data_interval(date_type='start_date'," + ', 
'.join(custom_data_interval_airflow_vars) + ") }}"
     custom_data_interval_end  = "{{ 
custom_data_interval(date_type='end_date'," + ', 
'.join(custom_data_interval_airflow_vars) + ") }}"
   
     test_task = TestOperator(
       task_id='test_task',
       custom_params={
         "data_interval_start": "{{ data_interval_start }}",
         "data_interval_end": "{{ data_interval_end }}",
         "ds": "{{ ds }}",
         "ds_nodash": "{{ ds_nodash }}",
         "logical_date": "{{ logical_date }}",
         "execution_date": "{{ execution_date }}",
         "prev_data_interval_start_success": "{{ 
prev_data_interval_start_success }}",
         "prev_data_interval_end_success": "{{ prev_data_interval_end_success 
}}",
         "prev_start_date_success": "{{ prev_start_date_success }}",
         "run_id": "{{ run_id }}",
         "ts": "{{ ts }}",
         "ts_nodash": "{{ ts_nodash }}",
         "ts_nodash_with_tz": "{{ ts_nodash_with_tz }}",
         "last_dag_run": "{{ dag_run.get_previous_dagrun() }}",
         "dag_start_date": "{{ dag.start_date }}",
         "custom_data_interval_start": custom_data_interval_start,
         "custom_data_interval_end": custom_data_interval_end,
       },
     )
   ```
   
   ### Operating System
   
   Amazon Linux 2
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Amazon (AWS) MWAA
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to