philipmargeotab opened a new issue, #26116:
URL: https://github.com/apache/airflow/issues/26116

   I'm seeing unexpected behaviour with backfilling for a dag, using the 
airflow CLI. After running:
   
   `gcloud composer environments run [company environment] --location 
[location] dags backfill -- --reset-dagruns -s 2022-05-05 -e 2022-05-18 
minimal_skip`
   
   I observe that 2022-05-05 is complete on the Airflow UI, but the next time 
it is run, i see 2022-05-07, rather than 2022-05-06 as expected. 
   
   ![Screenshot from 2022-05-19 
14-28-49](https://user-images.githubusercontent.com/93161537/169373989-968e75d8-de68-4d57-9903-08bfe9bf0d3f.png)
   
   ![Screenshot from 2022-05-19 
14-29-00](https://user-images.githubusercontent.com/93161537/169374000-c6b99da2-a829-4801-9c3f-a6042e37420b.png)
   
   I'm using Airflow 2.1.4 + composer
   
   ```
   import os
   import datetime
   
   import airflow
   from airflow.providers.google.cloud.operators.bigquery import 
BigQueryInsertJobOperator
   
   from airflow_dag_utilities import airflow_dag_utilities as dag_utils
   
   def main():
       dag_args = {
           # DAG
           # department-project_name-dag_description (inherit from definition)
           "dag_id": os.path.splitext(os.path.basename(__file__))[0],
           "schedule_interval": "0 0 * * *",
           "max_active_runs": 1,
           "dagrun_timeout": datetime.timedelta(minutes=59),
           "template_searchpath": ["/home/airflow/gcs/dags/minimal_skip/"],
           "catchup": False,  # must include because airflow default catchup is 
True
           # Operators
           "default_args": {
               # 'start_date': datetime.datetime(2020, 1, 13, 15), (year, m,  
d, hour)
               "start_date": None,  # time: UTC, uses 
dag_utils.suggest_start_date
               "owner": "xxxxxxx",
               "email": ["[email protected]"],
               "depends_on_past": False,  # new instance will not run if past 
job failed
               "retries": 1,
               "retry_delay": datetime.timedelta(seconds=30),
               "email_on_failure": True,
               "email_on_retry": False,
           },
       }
       dag_args["default_args"]["start_date"] = dag_utils.suggest_start_date(
           dag_args["schedule_interval"], intervals=-2
       )  # remove automatic start_date assignment when using your own
   
       create_dag(dag_args)
   
   
   def create_dag(dag_args):
       with airflow.DAG(**dag_args) as dag:
           """
           No logic outside of tasks(operators) or they are constantly run by 
scheduler
           """
   
           t_select_five = BigQueryInsertJobOperator(
               task_id='select_five',
               configuration={
                   'query': {
                       'query': "SELECT 5;",
                       'useLegacySql': False,
                   },
               },
           )
   
           t_select_ten = BigQueryInsertJobOperator(
               task_id='select_10',
               configuration={
                   'query': {
                       'query': "SELECT 10",
                       'useLegacySql': False,
                   },
               },
           )
           t_select_five >> t_select_ten
   
       globals()[dag.dag_id] = dag  # keep this
   
   
   main()  # keep this
   ```
   
   _Originally posted by @philipmargeotab in 
https://github.com/apache/airflow/discussions/23808_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to