s0rV opened a new issue, #31740: URL: https://github.com/apache/airflow/issues/31740
### Apache Airflow version Other Airflow 2 version (please specify below) ### What happened We discovered a bug in the backfill feature of Airflow (version 2.4.3 and above). When attempting to backfill a DAG (let’s say DAG_1) while another DAG (let’s say DAG_2) in the same environment has a past end date or an end date prior to the current date (for example 2022-09-07), the backfill operation on DAG_1 gets deadlocked and takes the end date of DAG_2. Backfill is getting failed by throwing an error that : airflow.exceptions.BackfillUnfinished: BackfillJob is deadlocked. Also when we analysed the debug logs we found that it is saying : 'Execution Date' PASSED: False, The execution date is 2023-06-05T10:00:00+00:00 but this is after the task's end date 2022-09-07T00:00:00+00:00. But we are not providing any end date to DAG_1. Also we got to know that : “The end date isn’t specified in a DAG but rather through default args applicable to more DAGs”: Unlike the scheduler where each file is parsed in a separate process, backfill is going to load all files within one process, so that’s possible for one file to corrupt shared objects used in another file. ref link: https://github.com/GoogleCloudPlatform/composer-airflow/blob/2.4.3/airflow/jobs/backfill_job.py ### What you think should happen instead _No response_ ### How to reproduce HOW TO REPRODUCE : In $AIRFLOW_HOME/dags/dags_new add below files: base_dag.py `from base64 import b64encode from collections import namedtuple from datetime import datetime, timedelta from airflow.models import Variable from airflow.operators.email import EmailOperator from airflow.operators.bash_operator import BashOperator default_args = { "owner": "airflow", "depends_on_past": False, "start_date": datetime(2018, 1, 1), "retries": 1, "email": ["[email protected]"], "email_on_failure": True, "email_on_retry": False, "retry_delay": timedelta(minutes=10), } class Task_1: #octopus base task def __init__( self, dag, task_name, ): self.dag = dag self.task_name = task_name def arm(self): return BashOperator( bash_command="echo 1", dag=self.dag, task_id="do-{}".format(self.task_name), ) class EmailNotificationTask: def __init__(self, dag): self.dag = dag def arm(self): return EmailOperator( dag=self.dag, task_id="send_email", to=default_args["email"], subject="subject", html_content="""Success Email for Process""", ) class Task_2:#PubSubPullSensorTask def __init__(self, dag): self.dag = dag def arm(self): return BashOperator( bash_command="echo 1", dag=self.dag, task_id="pubsub_pull_sensor", ) class Task_3:#SnitchNotificationTask def __init__(self, dag, task_id): self.dag = dag self.task_id = task_id def arm(self): return BashOperator( bash_command="echo 1", dag=self.dag, task_id=self.task_id, ) class Task_4:#BQPartitionSensorTask def __init__( self, dag, task_name, ): self.dag = dag self.task_name = task_name def arm(self): op = BashOperator( bash_command="echo 1", dag=self.dag, task_id=self.task_name, ) return op class Task_5:#BQDeleteDatasetTask def __init__( self, dag, task_name="Task_5", ): self.dag = dag self.task_name = task_name def arm(self): return BashOperator( bash_command="echo 1", dag=self.dag, task_id=self.task_name, ) class Task_6: #Dataflow def __init__( self, dag, task_name="Task_6", ): self.dag = dag self.task_name = task_name def arm(self): return BashOperator( bash_command="echo 1", dag=self.dag, task_id=self.task_name, )` dag_1.py `from airflow import DAG from dags_new.base.base_dag import Task_1 from dags_new.base.base_dag import EmailNotificationTask from dags_new.base.base_dag import Task_2 from dags_new.base.base_dag import Task_3 from dags_new.base.base_dag import Task_4 from dags_new.base.base_dag import Task_5 from dags_new.base.base_dag import Task_6 from dags_new.base.base_dag import default_args from datetime import timedelta, datetime from airflow.models import Variable default_args['retry_delay'] = timedelta(minutes=5) default_args['dagrun_timeout_minutes'] = timedelta(minutes=420) default_args['retries'] = 5 default_args['start_date'] = datetime(2023, 4, 19) schedule_interval = '0 8 * * *' dag = DAG(dag_id='Dag_1', default_args=default_args, max_active_runs=2, schedule_interval=schedule_interval, concurrency=3, catchup=False, ) Task_A = Task_1(dag=dag, task_name='Task_A', ).arm() Task_B = Task_1(dag=dag, task_name='Task_B', ).arm() Task_C = Task_1(dag=dag, task_name='Task_C', ).arm() Task_D = Task_1(dag=dag, task_name='Task_D', ).arm() Task_E = Task_1(dag=dag, task_name='Task_E', ).arm() Task_F = Task_1(dag=dag, task_name='Task_F', ).arm() Task_G = Task_1(dag=dag, task_name='Task_G', ).arm() Task_H = EmailNotificationTask(dag=dag).arm() Task_A.set_downstream(Task_C) Task_E.set_downstream(Task_H) Task_A.set_downstream(Task_F) Task_F.set_downstream(Task_H) Task_A.set_downstream(Task_G) Task_G.set_downstream(Task_H) Task_A.set_downstream(Task_C) Task_C.set_downstream(Task_H) Task_A.set_downstream(Task_D) Task_D.set_downstream(Task_H) Task_B.set_downstream(Task_H)` dag_2.py ` from airflow import DAG from dags_new.base.base_dag import Task_1 from dags_new.base.base_dag import EmailNotificationTask from dags_new.base.base_dag import Task_2 from dags_new.base.base_dag import Task_3 from dags_new.base.base_dag import Task_4 from dags_new.base.base_dag import Task_5 from dags_new.base.base_dag import Task_6 from dags_new.base.base_dag import default_args from datetime import timedelta, datetime from airflow.models import Variable default_args['start_date'] = datetime(2022, 8, 1) default_args['end_date'] = datetime(2022, 9, 7) schedule_interval = '15 14 * * *' dag = DAG(dag_id='dag_2', default_args=default_args, max_active_runs=1, schedule_interval=schedule_interval, catchup=True, ) task_a = Task_1(dag=dag, task_name='task_a', ).arm() task_b = Task_3(dag=dag, task_id='task_b', ).arm() task_a.set_downstream(task_b) ` And perform backfill on dag_1 Also when we tried without using the custom code to import operators it was working fine ### Operating System Container-Optimized OS ### Versions of Apache Airflow Providers _No response_ ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
