s0rV opened a new issue, #31740:
URL: https://github.com/apache/airflow/issues/31740

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   We discovered a bug in the backfill feature of Airflow (version 2.4.3 and 
above). When attempting to backfill a DAG (let’s say DAG_1) while another DAG 
(let’s say DAG_2) in the same environment has a past end date or an end date 
prior to the current date (for example 2022-09-07), the backfill operation on 
DAG_1 gets deadlocked and takes the end date of DAG_2. Backfill is getting 
failed by throwing an error that :
   airflow.exceptions.BackfillUnfinished: BackfillJob is deadlocked.
   Also when we analysed the debug logs we found that it is saying :
   'Execution Date' PASSED: False, The execution date is 
2023-06-05T10:00:00+00:00 but this is after the task's end date 
2022-09-07T00:00:00+00:00.
   But we are not providing any end date to DAG_1.
   Also we got to know that :
   “The end date isn’t specified in a DAG but rather through default args 
applicable to more DAGs”: Unlike the scheduler where each file is parsed in a 
separate process, backfill is going to load all files within one process, so 
that’s possible for one file to corrupt shared objects used in another file.
   ref link: 
https://github.com/GoogleCloudPlatform/composer-airflow/blob/2.4.3/airflow/jobs/backfill_job.py
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   HOW TO REPRODUCE :
   In $AIRFLOW_HOME/dags/dags_new add below files:
   
   base_dag.py
   `from base64 import b64encode
   from collections import namedtuple
   from datetime import datetime, timedelta
   
   from airflow.models import Variable
   from airflow.operators.email import EmailOperator
   
   
   from airflow.operators.bash_operator import BashOperator
   
   default_args = {
       "owner": "airflow",
       "depends_on_past": False,
       "start_date": datetime(2018, 1, 1),
       "retries": 1,
       "email": ["[email protected]"],
       "email_on_failure": True,
       "email_on_retry": False,
       "retry_delay": timedelta(minutes=10),
   }
   
   
   class Task_1: #octopus base task
       def __init__(
           self,
           dag,
           task_name,
       ):
           self.dag = dag
           self.task_name = task_name
   
       def arm(self):
           return BashOperator(
                   bash_command="echo 1",
                   dag=self.dag,
                   task_id="do-{}".format(self.task_name),
               )
   
   
   class EmailNotificationTask:
       def __init__(self, dag):
   
           self.dag = dag
   
   
       def arm(self):
           return EmailOperator(
               dag=self.dag,
               task_id="send_email",
               to=default_args["email"],
               subject="subject",
               html_content="""Success Email for Process""",
           )
   
   
   class Task_2:#PubSubPullSensorTask
       def __init__(self, dag):
           self.dag = dag
   
       def arm(self):
           return BashOperator(
                   bash_command="echo 1",
                   dag=self.dag,
                   task_id="pubsub_pull_sensor",
               )
   
   
   class Task_3:#SnitchNotificationTask
       def __init__(self, dag, task_id):
           self.dag = dag
           self.task_id = task_id
   
       def arm(self):
           
           return BashOperator(
                   bash_command="echo 1",
                   dag=self.dag,
                   task_id=self.task_id,
               )
           
   
   class Task_4:#BQPartitionSensorTask
   
       def __init__(
           self,
           dag,
           task_name,
       ):
           self.dag = dag
           self.task_name = task_name
   
       def arm(self):
           op = BashOperator(
                   bash_command="echo 1",
                   dag=self.dag,
                   task_id=self.task_name,
               )
           
           return op
   
   
   class Task_5:#BQDeleteDatasetTask
       def __init__(
           self,
           dag,
           task_name="Task_5",
           
       ):
           self.dag = dag
           self.task_name = task_name
           
       def arm(self):
           return BashOperator(
                   bash_command="echo 1",
                   dag=self.dag,
                   task_id=self.task_name,
               )
           
   
   
   class Task_6: #Dataflow
       def __init__(
           self,
           dag,
           task_name="Task_6",
       ):
           self.dag = dag
           self.task_name = task_name
           
       def arm(self):
            return BashOperator(
                   bash_command="echo 1",
                   dag=self.dag,
                   task_id=self.task_name,
               )`
    dag_1.py
    `from airflow import DAG
   from dags_new.base.base_dag import Task_1
   from dags_new.base.base_dag import EmailNotificationTask
   from dags_new.base.base_dag import Task_2
   from dags_new.base.base_dag import Task_3
   from dags_new.base.base_dag import Task_4
   from dags_new.base.base_dag import Task_5
   from dags_new.base.base_dag import Task_6
   from dags_new.base.base_dag import default_args
   from datetime import timedelta, datetime
   from airflow.models import Variable
   
   default_args['retry_delay'] = timedelta(minutes=5)
   default_args['dagrun_timeout_minutes'] = timedelta(minutes=420)
   
   default_args['retries'] = 5
   default_args['start_date'] = datetime(2023, 4, 19)
   
   schedule_interval = '0 8 * * *'
   dag = DAG(dag_id='Dag_1',
             default_args=default_args,
             max_active_runs=2,
             schedule_interval=schedule_interval,
             concurrency=3,
             catchup=False,
             )
   
   Task_A = Task_1(dag=dag,
                           task_name='Task_A',
                           ).arm()
   Task_B = Task_1(dag=dag,
                           task_name='Task_B',
                           ).arm()
   Task_C = Task_1(dag=dag,
                           task_name='Task_C',
                           ).arm()
   Task_D = Task_1(dag=dag,
                           task_name='Task_D',
                           ).arm()
   Task_E = Task_1(dag=dag,
                           task_name='Task_E',
                           ).arm()
   Task_F = Task_1(dag=dag,
                           task_name='Task_F',
                           ).arm()
   Task_G = Task_1(dag=dag,
                           task_name='Task_G',
                           
                           ).arm()
   Task_H = EmailNotificationTask(dag=dag).arm()
   Task_A.set_downstream(Task_C)
   Task_E.set_downstream(Task_H)
   Task_A.set_downstream(Task_F)
   Task_F.set_downstream(Task_H)
   Task_A.set_downstream(Task_G)
   Task_G.set_downstream(Task_H)
   Task_A.set_downstream(Task_C)
   Task_C.set_downstream(Task_H)
   Task_A.set_downstream(Task_D)
   Task_D.set_downstream(Task_H)
   Task_B.set_downstream(Task_H)`
   
   dag_2.py
   `
   from airflow import DAG
   from dags_new.base.base_dag import Task_1
   from dags_new.base.base_dag import EmailNotificationTask
   from dags_new.base.base_dag import Task_2
   from dags_new.base.base_dag import Task_3
   from dags_new.base.base_dag import Task_4
   from dags_new.base.base_dag import Task_5
   from dags_new.base.base_dag import Task_6
   from dags_new.base.base_dag import default_args
   from datetime import timedelta, datetime
   from airflow.models import Variable
   
   
   
   default_args['start_date'] = datetime(2022, 8, 1)
   default_args['end_date'] = datetime(2022, 9, 7)
   
   schedule_interval = '15 14 * * *'
   
   dag = DAG(dag_id='dag_2',
             default_args=default_args,
             max_active_runs=1,
             schedule_interval=schedule_interval,
             catchup=True,
             )
   
   task_a = Task_1(dag=dag,
                           task_name='task_a',
                           ).arm()
   task_b = Task_3(dag=dag,
                       task_id='task_b',
                       ).arm()
   
   
   task_a.set_downstream(task_b)
   `
   And perform backfill on dag_1
   Also when we tried without using the custom code to import operators it was 
working fine
   
   ### Operating System
   
   Container-Optimized OS
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to