webPato commented on issue #33994:
URL: https://github.com/apache/airflow/issues/33994#issuecomment-2125424390
> ### Apache Airflow version
>
> Other Airflow 2 version (please specify below)
> ### What happened
>
> I have a DAG that should generate some start and end dates for a query to
be executed on BigQuery. I'm using dynamic task mapping to determine the number
of days in the past that the query should take into consideration. E.g. one
dynamically mapped task will take the data from 3 days before, the other one 4
days before, 5, etc.
>
> This seems to work fine on regular runs, but the backfill job detects a
deadlock that I don't see.
>
> I'm using Airflow 2.6.1+astro2 and Postgres 13.11. The issue happens with
LocalExecutor during development.
> ### What you think should happen instead
>
> The backfill should not detect a deadlock in this scenario. Backfill job
should finish successfully.
> ### How to reproduce
>
> Here is an example I used to reproduce the issue using PythonOperators:
>
> ```
> from typing import Dict
>
> import pendulum
> from airflow import DAG
> from airflow.models import DagRun
> from airflow.operators.python import PythonOperator
> from airflow.utils.types import DagRunType
>
> from dags.include.helpers.dag_helpers import merge_with_default_args
>
> _QUERY_INTERVAL_START_OFFSET = 14
> _QUERY_INTERVAL_END_OFFSET = 2
>
>
> def _get_start_end_dates(dag_run: DagRun, data_interval_end:
pendulum.DateTime):
> if dag_run.run_type in [DagRunType.BACKFILL_JOB, DagRunType.MANUAL]:
> start_date =
data_interval_end.subtract(days=_QUERY_INTERVAL_END_OFFSET).date()
> end_date =
data_interval_end.subtract(days=_QUERY_INTERVAL_END_OFFSET).date()
>
> return [
> {
> "start_date": start_date.isoformat(),
> "end_date": end_date.isoformat(),
> }
> ]
>
> return [
> {
> "start_date":
data_interval_end.subtract(days=i).date().isoformat(),
> "end_date":
data_interval_end.subtract(days=i).date().isoformat(),
> }
> for i in range(_QUERY_INTERVAL_END_OFFSET,
_QUERY_INTERVAL_START_OFFSET + 1)
> ]
>
>
> def _cleanup(start_date: str, end_date: str, current_date: str):
> print(start_date)
> print(end_date)
> print(current_date)
>
>
> def _insert(run_opts: Dict[str, str]):
> print(run_opts)
>
> def _get_insert_run_data(
> dag_run: DagRun, data_interval_end: pendulum.DateTime
> ):
> current_date = data_interval_end.date().isoformat()
> return [
> {"current_date": current_date, **dates}
> for dates in _get_start_end_dates(dag_run, data_interval_end)
> ]
>
>
> with DAG(
> dag_id="deadlock_reprod",
> catchup=False,
> start_date=pendulum.datetime(2023, 6, 7),
> template_searchpath=[
> "/usr/local/airflow/dags/include",
> "/usr/local/airflow/dags/sem",
> ],
> default_args=merge_with_default_args(),
> schedule="0 6 * * *", # At 06:00 UTC every day
> max_active_runs=8,
> max_active_tasks=8,
> ):
> get_run_data = PythonOperator(
> task_id="get_run_data",
> python_callable=_get_insert_run_data,
> )
>
> cleanup = PythonOperator.partial(
> task_id="cleanup",
> python_callable=_cleanup
> ).expand(op_args=get_run_data.output)
>
> insert = PythonOperator.partial(
> task_id="insert",
> python_callable=_cleanup
> ).expand(op_args=get_run_data.output)
>
> cleanup >> insert
> ```
>
> Then I let the first DAGRun run (one which starts automatically when
enabling the DAG).
>
> Then I execute into the docker container (scheduler) and run: `airflow
dags backfill deadlock_reprod --start-date 2023-07-01 --end-date 2023-08-01 -B`
>
> Here is the log:
>
> ```
> WARNING - Deadlock discovered for
ti_status.to_run=dict_values([<TaskInstance: deadlock_reprod.insert
backfill__2023-07-25T06:00:00+00:00 map_index=0 [scheduled]>, <TaskInstance:
deadlock_reprod.insert backfill__2023-07-28T06:00:00+00:00 map_index=0
[scheduled]>, <TaskInstance: deadlock_reprod.insert
backfill__2023-07-24T06:00:00+00:00 map_index=0 [scheduled]>, <TaskInstance:
deadlock_reprod.insert backfill__2023-07-27T06:00:00+00:00 map_index=0
[scheduled]>, <TaskInstance: deadlock_reprod.insert
backfill__2023-07-30T06:00:00+00:00 map_index=0 [scheduled]>, <TaskInstance:
deadlock_reprod.insert backfill__2023-07-26T06:00:00+00:00 map_index=0
[scheduled]>, <TaskInstance: deadlock_reprod.insert
backfill__2023-07-29T06:00:00+00:00 map_index=0 [scheduled]>, <TaskInstance:
deadlock_reprod.insert backfill__2023-07-31T06:00:00+00:00 map_index=0
[scheduled]>])
> [2023-09-01T06:54:37.183+0000] {backfill_job_runner.py:408} INFO -
[backfill progress] | finished run 0 of 31 | tasks waiting: 0 | succeeded: 8 |
running: 0 | failed: 0 | skipped: 0 | deadlocked: 8 | not ready: 8
> [2023-09-01T06:54:37.309+0000] {local_executor.py:400} INFO - Shutting
down LocalExecutor; waiting for running tasks to finish. Signal again if you
don't want to wait.
> Traceback (most recent call last):
> File "/home/astro/.local/bin/airflow", line 8, in <module>
> sys.exit(main())
> File "/usr/local/lib/python3.10/site-packages/airflow/__main__.py", line
48, in main
> args.func(args)
> File
"/usr/local/lib/python3.10/site-packages/airflow/cli/cli_config.py", line 51,
in command
> return func(*args, **kwargs)
> File "/usr/local/lib/python3.10/site-packages/airflow/utils/cli.py",
line 112, in wrapper
> return f(*args, **kwargs)
> File
"/usr/local/lib/python3.10/site-packages/airflow/cli/commands/dag_command.py",
line 139, in dag_backfill
> _run_dag_backfill(dags, args)
> File
"/usr/local/lib/python3.10/site-packages/airflow/cli/commands/dag_command.py",
line 92, in _run_dag_backfill
> dag.run(
> File "/usr/local/lib/python3.10/site-packages/airflow/models/dag.py",
line 2490, in run
> run_job(job=job, execute_callable=job_runner._execute)
> File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py",
line 76, in wrapper
> return func(*args, session=session, **kwargs)
> File "/usr/local/lib/python3.10/site-packages/airflow/jobs/job.py", line
284, in run_job
> return execute_job(job, execute_callable=execute_callable)
> File "/usr/local/lib/python3.10/site-packages/airflow/jobs/job.py", line
313, in execute_job
> ret = execute_callable()
> File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py",
line 76, in wrapper
> return func(*args, session=session, **kwargs)
> File
"/usr/local/lib/python3.10/site-packages/airflow/jobs/backfill_job_runner.py",
line 925, in _execute
> raise BackfillUnfinished(err, ti_status)
> airflow.exceptions.BackfillUnfinished: BackfillJob is deadlocked.
> These tasks have succeeded:
> DAG ID Task ID Run ID Try
number
> --------------- ------------ -----------------------------------
------------
> deadlock_reprod get_run_data backfill__2023-07-24T06:00:00+00:00
1
> deadlock_reprod get_run_data backfill__2023-07-25T06:00:00+00:00
1
> deadlock_reprod get_run_data backfill__2023-07-26T06:00:00+00:00
1
> deadlock_reprod get_run_data backfill__2023-07-27T06:00:00+00:00
1
> deadlock_reprod get_run_data backfill__2023-07-28T06:00:00+00:00
1
> deadlock_reprod get_run_data backfill__2023-07-29T06:00:00+00:00
1
> deadlock_reprod get_run_data backfill__2023-07-30T06:00:00+00:00
1
> deadlock_reprod get_run_data backfill__2023-07-31T06:00:00+00:00
1
>
> These tasks are running:
> DAG ID Task ID Run ID Try number
> -------- --------- -------- ------------
>
> These tasks have failed:
> DAG ID Task ID Run ID Try number
> -------- --------- -------- ------------
>
> These tasks are skipped:
> DAG ID Task ID Run ID Try number
> -------- --------- -------- ------------
>
> These tasks are deadlocked:
> DAG ID Task ID Run ID Map
Index Try number
> --------------- --------- -----------------------------------
----------- ------------
> deadlock_reprod insert backfill__2023-07-24T06:00:00+00:00
1 0
> deadlock_reprod insert backfill__2023-07-25T06:00:00+00:00
1 0
> deadlock_reprod insert backfill__2023-07-26T06:00:00+00:00
1 0
> deadlock_reprod insert backfill__2023-07-27T06:00:00+00:00
1 0
> deadlock_reprod insert backfill__2023-07-28T06:00:00+00:00
1 0
> deadlock_reprod insert backfill__2023-07-29T06:00:00+00:00
1 0
> deadlock_reprod insert backfill__2023-07-30T06:00:00+00:00
1 0
> deadlock_reprod insert backfill__2023-07-31T06:00:00+00:00
1 0
> ```
>
> ### Operating System
>
> OS X (Linux in Docker container)
> ### Versions of Apache Airflow Providers
>
> _No response_
> ### Deployment
>
> Issue occurs locally in Docker
> ### Deployment details
>
> Docker: Engine: 24.0.2 Compose: v2.19.1
>
> Docker desktop: Version 4.21.1 (114176)
>
> Astro CLI Version: 1.17.1
> ### Anything else
>
> _No response_
> ### Are you willing to submit PR?
>
> * [ ] Yes I am willing to submit a PR!
>
>
> ### Code of Conduct
>
> * [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
Me pasó lo mismo, lo pudieron resolver?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]