webPato commented on issue #33994:
URL: https://github.com/apache/airflow/issues/33994#issuecomment-2125424390

   > ### Apache Airflow version
   > 
   > Other Airflow 2 version (please specify below)
   > ### What happened
   > 
   > I have a DAG that should generate some start and end dates for a query to 
be executed on BigQuery. I'm using dynamic task mapping to determine the number 
of days in the past that the query should take into consideration. E.g. one 
dynamically mapped task will take the data from 3 days before, the other one 4 
days before, 5, etc.
   > 
   > This seems to work fine on regular runs, but the backfill job detects a 
deadlock that I don't see.
   > 
   > I'm using Airflow 2.6.1+astro2 and Postgres 13.11. The issue happens with 
LocalExecutor during development.
   > ### What you think should happen instead
   > 
   > The backfill should not detect a deadlock in this scenario. Backfill job 
should finish successfully.
   > ### How to reproduce
   > 
   > Here is an example I used to reproduce the issue using PythonOperators:
   > 
   > ```
   > from typing import Dict
   > 
   > import pendulum
   > from airflow import DAG
   > from airflow.models import DagRun
   > from airflow.operators.python import PythonOperator
   > from airflow.utils.types import DagRunType
   > 
   > from dags.include.helpers.dag_helpers import merge_with_default_args
   > 
   > _QUERY_INTERVAL_START_OFFSET = 14
   > _QUERY_INTERVAL_END_OFFSET = 2
   > 
   > 
   > def _get_start_end_dates(dag_run: DagRun, data_interval_end: 
pendulum.DateTime):
   >     if dag_run.run_type in [DagRunType.BACKFILL_JOB, DagRunType.MANUAL]:
   >         start_date = 
data_interval_end.subtract(days=_QUERY_INTERVAL_END_OFFSET).date()
   >         end_date = 
data_interval_end.subtract(days=_QUERY_INTERVAL_END_OFFSET).date()
   > 
   >         return [
   >             {
   >                 "start_date": start_date.isoformat(),
   >                 "end_date": end_date.isoformat(),
   >             }
   >         ]
   > 
   >     return [
   >         {
   >             "start_date": 
data_interval_end.subtract(days=i).date().isoformat(),
   >             "end_date": 
data_interval_end.subtract(days=i).date().isoformat(),
   >         }
   >         for i in range(_QUERY_INTERVAL_END_OFFSET, 
_QUERY_INTERVAL_START_OFFSET + 1)
   >     ]
   > 
   > 
   > def _cleanup(start_date: str, end_date: str, current_date: str):
   >     print(start_date)
   >     print(end_date)
   >     print(current_date)
   > 
   > 
   > def _insert(run_opts: Dict[str, str]):
   >     print(run_opts)
   > 
   > def _get_insert_run_data(
   >     dag_run: DagRun, data_interval_end: pendulum.DateTime
   > ):
   >     current_date = data_interval_end.date().isoformat()
   >     return [
   >         {"current_date": current_date, **dates}
   >         for dates in _get_start_end_dates(dag_run, data_interval_end)
   >     ]
   > 
   > 
   > with DAG(
   >     dag_id="deadlock_reprod",
   >     catchup=False,
   >     start_date=pendulum.datetime(2023, 6, 7),
   >     template_searchpath=[
   >         "/usr/local/airflow/dags/include",
   >         "/usr/local/airflow/dags/sem",
   >     ],
   >     default_args=merge_with_default_args(),
   >     schedule="0 6 * * *",  # At 06:00 UTC every day
   >     max_active_runs=8,
   >     max_active_tasks=8,
   > ):
   >     get_run_data = PythonOperator(
   >         task_id="get_run_data",
   >         python_callable=_get_insert_run_data,
   >     )
   > 
   >     cleanup = PythonOperator.partial(
   >         task_id="cleanup",
   >         python_callable=_cleanup
   >     ).expand(op_args=get_run_data.output)
   > 
   >     insert = PythonOperator.partial(
   >         task_id="insert",
   >         python_callable=_cleanup
   >     ).expand(op_args=get_run_data.output)
   > 
   >     cleanup >> insert
   > ```
   > 
   > Then I let the first DAGRun run (one which starts automatically when 
enabling the DAG).
   > 
   > Then I execute into the docker container (scheduler) and run: `airflow 
dags backfill deadlock_reprod --start-date 2023-07-01 --end-date 2023-08-01 -B`
   > 
   > Here is the log:
   > 
   > ```
   > WARNING - Deadlock discovered for 
ti_status.to_run=dict_values([<TaskInstance: deadlock_reprod.insert 
backfill__2023-07-25T06:00:00+00:00 map_index=0 [scheduled]>, <TaskInstance: 
deadlock_reprod.insert backfill__2023-07-28T06:00:00+00:00 map_index=0 
[scheduled]>, <TaskInstance: deadlock_reprod.insert 
backfill__2023-07-24T06:00:00+00:00 map_index=0 [scheduled]>, <TaskInstance: 
deadlock_reprod.insert backfill__2023-07-27T06:00:00+00:00 map_index=0 
[scheduled]>, <TaskInstance: deadlock_reprod.insert 
backfill__2023-07-30T06:00:00+00:00 map_index=0 [scheduled]>, <TaskInstance: 
deadlock_reprod.insert backfill__2023-07-26T06:00:00+00:00 map_index=0 
[scheduled]>, <TaskInstance: deadlock_reprod.insert 
backfill__2023-07-29T06:00:00+00:00 map_index=0 [scheduled]>, <TaskInstance: 
deadlock_reprod.insert backfill__2023-07-31T06:00:00+00:00 map_index=0 
[scheduled]>])
   > [2023-09-01T06:54:37.183+0000] {backfill_job_runner.py:408} INFO - 
[backfill progress] | finished run 0 of 31 | tasks waiting: 0 | succeeded: 8 | 
running: 0 | failed: 0 | skipped: 0 | deadlocked: 8 | not ready: 8
   > [2023-09-01T06:54:37.309+0000] {local_executor.py:400} INFO - Shutting 
down LocalExecutor; waiting for running tasks to finish.  Signal again if you 
don't want to wait.
   > Traceback (most recent call last):
   >   File "/home/astro/.local/bin/airflow", line 8, in <module>
   >     sys.exit(main())
   >   File "/usr/local/lib/python3.10/site-packages/airflow/__main__.py", line 
48, in main
   >     args.func(args)
   >   File 
"/usr/local/lib/python3.10/site-packages/airflow/cli/cli_config.py", line 51, 
in command
   >     return func(*args, **kwargs)
   >   File "/usr/local/lib/python3.10/site-packages/airflow/utils/cli.py", 
line 112, in wrapper
   >     return f(*args, **kwargs)
   >   File 
"/usr/local/lib/python3.10/site-packages/airflow/cli/commands/dag_command.py", 
line 139, in dag_backfill
   >     _run_dag_backfill(dags, args)
   >   File 
"/usr/local/lib/python3.10/site-packages/airflow/cli/commands/dag_command.py", 
line 92, in _run_dag_backfill
   >     dag.run(
   >   File "/usr/local/lib/python3.10/site-packages/airflow/models/dag.py", 
line 2490, in run
   >     run_job(job=job, execute_callable=job_runner._execute)
   >   File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", 
line 76, in wrapper
   >     return func(*args, session=session, **kwargs)
   >   File "/usr/local/lib/python3.10/site-packages/airflow/jobs/job.py", line 
284, in run_job
   >     return execute_job(job, execute_callable=execute_callable)
   >   File "/usr/local/lib/python3.10/site-packages/airflow/jobs/job.py", line 
313, in execute_job
   >     ret = execute_callable()
   >   File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", 
line 76, in wrapper
   >     return func(*args, session=session, **kwargs)
   >   File 
"/usr/local/lib/python3.10/site-packages/airflow/jobs/backfill_job_runner.py", 
line 925, in _execute
   >     raise BackfillUnfinished(err, ti_status)
   > airflow.exceptions.BackfillUnfinished: BackfillJob is deadlocked.
   > These tasks have succeeded:
   > DAG ID           Task ID       Run ID                                 Try 
number
   > ---------------  ------------  -----------------------------------  
------------
   > deadlock_reprod  get_run_data  backfill__2023-07-24T06:00:00+00:00         
    1
   > deadlock_reprod  get_run_data  backfill__2023-07-25T06:00:00+00:00         
    1
   > deadlock_reprod  get_run_data  backfill__2023-07-26T06:00:00+00:00         
    1
   > deadlock_reprod  get_run_data  backfill__2023-07-27T06:00:00+00:00         
    1
   > deadlock_reprod  get_run_data  backfill__2023-07-28T06:00:00+00:00         
    1
   > deadlock_reprod  get_run_data  backfill__2023-07-29T06:00:00+00:00         
    1
   > deadlock_reprod  get_run_data  backfill__2023-07-30T06:00:00+00:00         
    1
   > deadlock_reprod  get_run_data  backfill__2023-07-31T06:00:00+00:00         
    1
   > 
   > These tasks are running:
   > DAG ID    Task ID    Run ID    Try number
   > --------  ---------  --------  ------------
   > 
   > These tasks have failed:
   > DAG ID    Task ID    Run ID    Try number
   > --------  ---------  --------  ------------
   > 
   > These tasks are skipped:
   > DAG ID    Task ID    Run ID    Try number
   > --------  ---------  --------  ------------
   > 
   > These tasks are deadlocked:
   > DAG ID           Task ID    Run ID                                 Map 
Index    Try number
   > ---------------  ---------  -----------------------------------  
-----------  ------------
   > deadlock_reprod  insert     backfill__2023-07-24T06:00:00+00:00            
1             0
   > deadlock_reprod  insert     backfill__2023-07-25T06:00:00+00:00            
1             0
   > deadlock_reprod  insert     backfill__2023-07-26T06:00:00+00:00            
1             0
   > deadlock_reprod  insert     backfill__2023-07-27T06:00:00+00:00            
1             0
   > deadlock_reprod  insert     backfill__2023-07-28T06:00:00+00:00            
1             0
   > deadlock_reprod  insert     backfill__2023-07-29T06:00:00+00:00            
1             0
   > deadlock_reprod  insert     backfill__2023-07-30T06:00:00+00:00            
1             0
   > deadlock_reprod  insert     backfill__2023-07-31T06:00:00+00:00            
1             0
   > ```
   > 
   > ### Operating System
   > 
   > OS X (Linux in Docker container)
   > ### Versions of Apache Airflow Providers
   > 
   > _No response_
   > ### Deployment
   > 
   > Issue occurs locally in Docker
   > ### Deployment details
   > 
   > Docker: Engine: 24.0.2 Compose: v2.19.1
   > 
   > Docker desktop: Version 4.21.1 (114176)
   > 
   > Astro CLI Version: 1.17.1
   > ### Anything else
   > 
   > _No response_
   > ### Are you willing to submit PR?
   > 
   >     * [ ]  Yes I am willing to submit a PR!
   > 
   > 
   > ### Code of Conduct
   > 
   >     * [x]  I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   
   Me pasó lo mismo, lo pudieron resolver?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to