mrn-aglic opened a new issue, #33994:
URL: https://github.com/apache/airflow/issues/33994

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   I have a DAG that should generate some start and end dates for a query to be 
executed on BigQuery. I'm using dynamic task mapping to determine the number of 
days in the past that the query should take into consideration. E.g. one 
dynamically mapped task will take the data from 3 days before, the other one 4 
days before, 5, etc.
   
   This seems to work fine on regular runs, but the backfill job detects a 
deadlock that I don't see. 
   
   I'm using Airflow 2.6.1+astro2 and Postgres 13.11. The issue happens with 
LocalExecutor during development. 
   
   ### What you think should happen instead
   
   The backfill should not detect a deadlock in this scenario. Backfill job 
should finish successfully. 
   
   ### How to reproduce
   
   Here is an example I used to reproduce the issue using PythonOperators:
   
   ```
   from typing import Dict
   
   import pendulum
   from airflow import DAG
   from airflow.models import DagRun
   from airflow.operators.python import PythonOperator
   from airflow.utils.types import DagRunType
   
   from dags.include.helpers.dag_helpers import merge_with_default_args
   
   _QUERY_INTERVAL_START_OFFSET = 14
   _QUERY_INTERVAL_END_OFFSET = 2
   
   
   def _get_start_end_dates(dag_run: DagRun, data_interval_end: 
pendulum.DateTime):
       if dag_run.run_type in [DagRunType.BACKFILL_JOB, DagRunType.MANUAL]:
           start_date = 
data_interval_end.subtract(days=_QUERY_INTERVAL_END_OFFSET).date()
           end_date = 
data_interval_end.subtract(days=_QUERY_INTERVAL_END_OFFSET).date()
   
           return [
               {
                   "start_date": start_date.isoformat(),
                   "end_date": end_date.isoformat(),
               }
           ]
   
       return [
           {
               "start_date": 
data_interval_end.subtract(days=i).date().isoformat(),
               "end_date": 
data_interval_end.subtract(days=i).date().isoformat(),
           }
           for i in range(_QUERY_INTERVAL_END_OFFSET, 
_QUERY_INTERVAL_START_OFFSET + 1)
       ]
   
   
   def _cleanup(start_date: str, end_date: str, current_date: str):
       print(start_date)
       print(end_date)
       print(current_date)
   
   
   def _insert(run_opts: Dict[str, str]):
       print(run_opts)
   
   def _get_insert_run_data(
       dag_run: DagRun, data_interval_end: pendulum.DateTime
   ):
       current_date = data_interval_end.date().isoformat()
       return [
           {"current_date": current_date, **dates}
           for dates in _get_start_end_dates(dag_run, data_interval_end)
       ]
   
   
   with DAG(
       dag_id="deadlock_reprod",
       catchup=False,
       start_date=pendulum.datetime(2023, 6, 7),
       template_searchpath=[
           "/usr/local/airflow/dags/include",
           "/usr/local/airflow/dags/sem",
       ],
       default_args=merge_with_default_args(),
       schedule="0 6 * * *",  # At 06:00 UTC every day
       max_active_runs=8,
       max_active_tasks=8,
   ):
       get_run_data = PythonOperator(
           task_id="get_run_data",
           python_callable=_get_insert_run_data,
       )
   
       cleanup = PythonOperator.partial(
           task_id="cleanup",
           python_callable=_cleanup
       ).expand(op_args=get_run_data.output)
   
       insert = PythonOperator.partial(
           task_id="insert",
           python_callable=_cleanup
       ).expand(op_args=get_run_data.output)
   
       cleanup >> insert
   
   ```
   
   Then I let the first DAGRun run (one which starts automatically when 
enabling the DAG).
   
   Then I execute into the docker container (scheduler) and run:
    `airflow dags backfill deadlock_reprod --start-date 2023-07-01 --end-date 
2023-08-01 -B`
    
   
   
   Here is the log:
   ```
   WARNING - Deadlock discovered for 
ti_status.to_run=dict_values([<TaskInstance: deadlock_reprod.insert 
backfill__2023-07-25T06:00:00+00:00 map_index=0 [scheduled]>, <TaskInstance: 
deadlock_reprod.insert backfill__2023-07-28T06:00:00+00:00 map_index=0 
[scheduled]>, <TaskInstance: deadlock_reprod.insert 
backfill__2023-07-24T06:00:00+00:00 map_index=0 [scheduled]>, <TaskInstance: 
deadlock_reprod.insert backfill__2023-07-27T06:00:00+00:00 map_index=0 
[scheduled]>, <TaskInstance: deadlock_reprod.insert 
backfill__2023-07-30T06:00:00+00:00 map_index=0 [scheduled]>, <TaskInstance: 
deadlock_reprod.insert backfill__2023-07-26T06:00:00+00:00 map_index=0 
[scheduled]>, <TaskInstance: deadlock_reprod.insert 
backfill__2023-07-29T06:00:00+00:00 map_index=0 [scheduled]>, <TaskInstance: 
deadlock_reprod.insert backfill__2023-07-31T06:00:00+00:00 map_index=0 
[scheduled]>])
   [2023-09-01T06:54:37.183+0000] {backfill_job_runner.py:408} INFO - [backfill 
progress] | finished run 0 of 31 | tasks waiting: 0 | succeeded: 8 | running: 0 
| failed: 0 | skipped: 0 | deadlocked: 8 | not ready: 8
   [2023-09-01T06:54:37.309+0000] {local_executor.py:400} INFO - Shutting down 
LocalExecutor; waiting for running tasks to finish.  Signal again if you don't 
want to wait.
   Traceback (most recent call last):
     File "/home/astro/.local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File "/usr/local/lib/python3.10/site-packages/airflow/__main__.py", line 
48, in main
       args.func(args)
     File "/usr/local/lib/python3.10/site-packages/airflow/cli/cli_config.py", 
line 51, in command
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.10/site-packages/airflow/utils/cli.py", line 
112, in wrapper
       return f(*args, **kwargs)
     File 
"/usr/local/lib/python3.10/site-packages/airflow/cli/commands/dag_command.py", 
line 139, in dag_backfill
       _run_dag_backfill(dags, args)
     File 
"/usr/local/lib/python3.10/site-packages/airflow/cli/commands/dag_command.py", 
line 92, in _run_dag_backfill
       dag.run(
     File "/usr/local/lib/python3.10/site-packages/airflow/models/dag.py", line 
2490, in run
       run_job(job=job, execute_callable=job_runner._execute)
     File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", 
line 76, in wrapper
       return func(*args, session=session, **kwargs)
     File "/usr/local/lib/python3.10/site-packages/airflow/jobs/job.py", line 
284, in run_job
       return execute_job(job, execute_callable=execute_callable)
     File "/usr/local/lib/python3.10/site-packages/airflow/jobs/job.py", line 
313, in execute_job
       ret = execute_callable()
     File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", 
line 76, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/usr/local/lib/python3.10/site-packages/airflow/jobs/backfill_job_runner.py", 
line 925, in _execute
       raise BackfillUnfinished(err, ti_status)
   airflow.exceptions.BackfillUnfinished: BackfillJob is deadlocked.
   These tasks have succeeded:
   DAG ID           Task ID       Run ID                                 Try 
number
   ---------------  ------------  -----------------------------------  
------------
   deadlock_reprod  get_run_data  backfill__2023-07-24T06:00:00+00:00           
  1
   deadlock_reprod  get_run_data  backfill__2023-07-25T06:00:00+00:00           
  1
   deadlock_reprod  get_run_data  backfill__2023-07-26T06:00:00+00:00           
  1
   deadlock_reprod  get_run_data  backfill__2023-07-27T06:00:00+00:00           
  1
   deadlock_reprod  get_run_data  backfill__2023-07-28T06:00:00+00:00           
  1
   deadlock_reprod  get_run_data  backfill__2023-07-29T06:00:00+00:00           
  1
   deadlock_reprod  get_run_data  backfill__2023-07-30T06:00:00+00:00           
  1
   deadlock_reprod  get_run_data  backfill__2023-07-31T06:00:00+00:00           
  1
   
   These tasks are running:
   DAG ID    Task ID    Run ID    Try number
   --------  ---------  --------  ------------
   
   These tasks have failed:
   DAG ID    Task ID    Run ID    Try number
   --------  ---------  --------  ------------
   
   These tasks are skipped:
   DAG ID    Task ID    Run ID    Try number
   --------  ---------  --------  ------------
   
   These tasks are deadlocked:
   DAG ID           Task ID    Run ID                                 Map Index 
   Try number
   ---------------  ---------  -----------------------------------  ----------- 
 ------------
   deadlock_reprod  insert     backfill__2023-07-24T06:00:00+00:00            1 
            0
   deadlock_reprod  insert     backfill__2023-07-25T06:00:00+00:00            1 
            0
   deadlock_reprod  insert     backfill__2023-07-26T06:00:00+00:00            1 
            0
   deadlock_reprod  insert     backfill__2023-07-27T06:00:00+00:00            1 
            0
   deadlock_reprod  insert     backfill__2023-07-28T06:00:00+00:00            1 
            0
   deadlock_reprod  insert     backfill__2023-07-29T06:00:00+00:00            1 
            0
   deadlock_reprod  insert     backfill__2023-07-30T06:00:00+00:00            1 
            0
   deadlock_reprod  insert     backfill__2023-07-31T06:00:00+00:00            1 
            0
   ```
   
   ### Operating System
   
   OS X (Linux in Docker container)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   Docker:
   Engine: 24.0.2
   Compose: v2.19.1
   
   Docker desktop: 
   Version
   4.21.1 (114176)
   
   Astro CLI Version: 1.17.1
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to