mrn-aglic opened a new issue, #33994:
URL: https://github.com/apache/airflow/issues/33994
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
I have a DAG that should generate some start and end dates for a query to be
executed on BigQuery. I'm using dynamic task mapping to determine the number of
days in the past that the query should take into consideration. E.g. one
dynamically mapped task will take the data from 3 days before, the other one 4
days before, 5, etc.
This seems to work fine on regular runs, but the backfill job detects a
deadlock that I don't see.
I'm using Airflow 2.6.1+astro2 and Postgres 13.11. The issue happens with
LocalExecutor during development.
### What you think should happen instead
The backfill should not detect a deadlock in this scenario. Backfill job
should finish successfully.
### How to reproduce
Here is an example I used to reproduce the issue using PythonOperators:
```
from typing import Dict
import pendulum
from airflow import DAG
from airflow.models import DagRun
from airflow.operators.python import PythonOperator
from airflow.utils.types import DagRunType
from dags.include.helpers.dag_helpers import merge_with_default_args
_QUERY_INTERVAL_START_OFFSET = 14
_QUERY_INTERVAL_END_OFFSET = 2
def _get_start_end_dates(dag_run: DagRun, data_interval_end:
pendulum.DateTime):
if dag_run.run_type in [DagRunType.BACKFILL_JOB, DagRunType.MANUAL]:
start_date =
data_interval_end.subtract(days=_QUERY_INTERVAL_END_OFFSET).date()
end_date =
data_interval_end.subtract(days=_QUERY_INTERVAL_END_OFFSET).date()
return [
{
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat(),
}
]
return [
{
"start_date":
data_interval_end.subtract(days=i).date().isoformat(),
"end_date":
data_interval_end.subtract(days=i).date().isoformat(),
}
for i in range(_QUERY_INTERVAL_END_OFFSET,
_QUERY_INTERVAL_START_OFFSET + 1)
]
def _cleanup(start_date: str, end_date: str, current_date: str):
print(start_date)
print(end_date)
print(current_date)
def _insert(run_opts: Dict[str, str]):
print(run_opts)
def _get_insert_run_data(
dag_run: DagRun, data_interval_end: pendulum.DateTime
):
current_date = data_interval_end.date().isoformat()
return [
{"current_date": current_date, **dates}
for dates in _get_start_end_dates(dag_run, data_interval_end)
]
with DAG(
dag_id="deadlock_reprod",
catchup=False,
start_date=pendulum.datetime(2023, 6, 7),
template_searchpath=[
"/usr/local/airflow/dags/include",
"/usr/local/airflow/dags/sem",
],
default_args=merge_with_default_args(),
schedule="0 6 * * *", # At 06:00 UTC every day
max_active_runs=8,
max_active_tasks=8,
):
get_run_data = PythonOperator(
task_id="get_run_data",
python_callable=_get_insert_run_data,
)
cleanup = PythonOperator.partial(
task_id="cleanup",
python_callable=_cleanup
).expand(op_args=get_run_data.output)
insert = PythonOperator.partial(
task_id="insert",
python_callable=_cleanup
).expand(op_args=get_run_data.output)
cleanup >> insert
```
Then I let the first DAGRun run (one which starts automatically when
enabling the DAG).
Then I execute into the docker container (scheduler) and run:
`airflow dags backfill deadlock_reprod --start-date 2023-07-01 --end-date
2023-08-01 -B`
Here is the log:
```
WARNING - Deadlock discovered for
ti_status.to_run=dict_values([<TaskInstance: deadlock_reprod.insert
backfill__2023-07-25T06:00:00+00:00 map_index=0 [scheduled]>, <TaskInstance:
deadlock_reprod.insert backfill__2023-07-28T06:00:00+00:00 map_index=0
[scheduled]>, <TaskInstance: deadlock_reprod.insert
backfill__2023-07-24T06:00:00+00:00 map_index=0 [scheduled]>, <TaskInstance:
deadlock_reprod.insert backfill__2023-07-27T06:00:00+00:00 map_index=0
[scheduled]>, <TaskInstance: deadlock_reprod.insert
backfill__2023-07-30T06:00:00+00:00 map_index=0 [scheduled]>, <TaskInstance:
deadlock_reprod.insert backfill__2023-07-26T06:00:00+00:00 map_index=0
[scheduled]>, <TaskInstance: deadlock_reprod.insert
backfill__2023-07-29T06:00:00+00:00 map_index=0 [scheduled]>, <TaskInstance:
deadlock_reprod.insert backfill__2023-07-31T06:00:00+00:00 map_index=0
[scheduled]>])
[2023-09-01T06:54:37.183+0000] {backfill_job_runner.py:408} INFO - [backfill
progress] | finished run 0 of 31 | tasks waiting: 0 | succeeded: 8 | running: 0
| failed: 0 | skipped: 0 | deadlocked: 8 | not ready: 8
[2023-09-01T06:54:37.309+0000] {local_executor.py:400} INFO - Shutting down
LocalExecutor; waiting for running tasks to finish. Signal again if you don't
want to wait.
Traceback (most recent call last):
File "/home/astro/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.10/site-packages/airflow/__main__.py", line
48, in main
args.func(args)
File "/usr/local/lib/python3.10/site-packages/airflow/cli/cli_config.py",
line 51, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/airflow/utils/cli.py", line
112, in wrapper
return f(*args, **kwargs)
File
"/usr/local/lib/python3.10/site-packages/airflow/cli/commands/dag_command.py",
line 139, in dag_backfill
_run_dag_backfill(dags, args)
File
"/usr/local/lib/python3.10/site-packages/airflow/cli/commands/dag_command.py",
line 92, in _run_dag_backfill
dag.run(
File "/usr/local/lib/python3.10/site-packages/airflow/models/dag.py", line
2490, in run
run_job(job=job, execute_callable=job_runner._execute)
File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py",
line 76, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.10/site-packages/airflow/jobs/job.py", line
284, in run_job
return execute_job(job, execute_callable=execute_callable)
File "/usr/local/lib/python3.10/site-packages/airflow/jobs/job.py", line
313, in execute_job
ret = execute_callable()
File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py",
line 76, in wrapper
return func(*args, session=session, **kwargs)
File
"/usr/local/lib/python3.10/site-packages/airflow/jobs/backfill_job_runner.py",
line 925, in _execute
raise BackfillUnfinished(err, ti_status)
airflow.exceptions.BackfillUnfinished: BackfillJob is deadlocked.
These tasks have succeeded:
DAG ID Task ID Run ID Try
number
--------------- ------------ -----------------------------------
------------
deadlock_reprod get_run_data backfill__2023-07-24T06:00:00+00:00
1
deadlock_reprod get_run_data backfill__2023-07-25T06:00:00+00:00
1
deadlock_reprod get_run_data backfill__2023-07-26T06:00:00+00:00
1
deadlock_reprod get_run_data backfill__2023-07-27T06:00:00+00:00
1
deadlock_reprod get_run_data backfill__2023-07-28T06:00:00+00:00
1
deadlock_reprod get_run_data backfill__2023-07-29T06:00:00+00:00
1
deadlock_reprod get_run_data backfill__2023-07-30T06:00:00+00:00
1
deadlock_reprod get_run_data backfill__2023-07-31T06:00:00+00:00
1
These tasks are running:
DAG ID Task ID Run ID Try number
-------- --------- -------- ------------
These tasks have failed:
DAG ID Task ID Run ID Try number
-------- --------- -------- ------------
These tasks are skipped:
DAG ID Task ID Run ID Try number
-------- --------- -------- ------------
These tasks are deadlocked:
DAG ID Task ID Run ID Map Index
Try number
--------------- --------- ----------------------------------- -----------
------------
deadlock_reprod insert backfill__2023-07-24T06:00:00+00:00 1
0
deadlock_reprod insert backfill__2023-07-25T06:00:00+00:00 1
0
deadlock_reprod insert backfill__2023-07-26T06:00:00+00:00 1
0
deadlock_reprod insert backfill__2023-07-27T06:00:00+00:00 1
0
deadlock_reprod insert backfill__2023-07-28T06:00:00+00:00 1
0
deadlock_reprod insert backfill__2023-07-29T06:00:00+00:00 1
0
deadlock_reprod insert backfill__2023-07-30T06:00:00+00:00 1
0
deadlock_reprod insert backfill__2023-07-31T06:00:00+00:00 1
0
```
### Operating System
OS X (Linux in Docker container)
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
Docker:
Engine: 24.0.2
Compose: v2.19.1
Docker desktop:
Version
4.21.1 (114176)
Astro CLI Version: 1.17.1
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]