potiuk opened a new issue, #39858:
URL: https://github.com/apache/airflow/issues/39858
### Body
The test `test_backfill_failed_dag_with_upstream_failed_task` in
tests/jobs/test_backfill_job.py often fails due to deadlock detected.
Here is typical stack trace/error:
```python
______ TestBackfillJob.test_backfill_failed_dag_with_upstream_failed_task
______
self = <tests.jobs.test_backfill_job.TestBackfillJob object at
0x7f0f6532ef10>
dag_maker = <tests.conftest.dag_maker.<locals>.DagFactory object at
0x7f0f3bfbe9d0>
def test_backfill_failed_dag_with_upstream_failed_task(self, dag_maker):
self.dagbag.process_file(str(TEST_DAGS_FOLDER /
"test_backfill_with_upstream_failed_task.py"))
dag = self.dagbag.get_dag("test_backfill_with_upstream_failed_task")
# We have to use the "fake" version of perform_heartbeat due to the
'is_unit_test' check in
# the original one. However, instead of using the original version
of perform_heartbeat,
# we can simply wait for a LocalExecutor's worker cycle. The
approach with sleep works well now,
# but it can be replaced with checking the state of the LocalTaskJob.
def fake_perform_heartbeat(*args, **kwargs):
import time
time.sleep(1)
with
mock.patch("airflow.jobs.backfill_job_runner.perform_heartbeat",
fake_perform_heartbeat):
job = Job(executor=ExecutorLoader.load_executor("LocalExecutor"))
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE,
rerun_failed_tasks=True,
)
with pytest.raises(BackfillUnfinished):
> run_job(job=job, execute_callable=job_runner._execute)
tests/jobs/test_backfill_job.py:2142:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
airflow/utils/session.py:84: in wrapper
return func(*args, session=session, **kwargs)
airflow/jobs/job.py:410: in run_job
return execute_job(job, execute_callable=execute_callable)
airflow/jobs/job.py:439: in execute_job
ret = execute_callable()
airflow/utils/session.py:84: in wrapper
return func(*args, session=session, **kwargs)
airflow/jobs/backfill_job_runner.py:980: in _execute
self._execute_dagruns(
airflow/utils/session.py:81: in wrapper
return func(*args, **kwargs)
airflow/jobs/backfill_job_runner.py:869: in _execute_dagruns
processed_dag_run_dates = self._process_backfill_task_instances(
airflow/jobs/backfill_job_runner.py:698: in _process_backfill_task_instances
_per_task_process(key, ti, session)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
key = TaskInstanceKey(dag_id='test_backfill_with_upstream_failed_task',
task_id='downstream_task', run_id='backfill__2016-01-01T00:00:00+00:00',
try_number=0, map_index=-1)
ti = <TaskInstance: test_backfill_with_upstream_failed_task.downstream_task
backfill__2016-01-01T00:00:00+00:00 [upstream_failed]>
session = <sqlalchemy.orm.session.Session object at 0x7f0f3bd93c10>
def _per_task_process(key, ti: TaskInstance, session):
ti.refresh_from_db(lock_for_update=True, session=session)
task = self.dag.get_task(ti.task_id, include_subdags=True)
ti.task = task
self.log.debug("Task instance to run %s state %s", ti, ti.state)
# The task was already marked successful or skipped by a
# different Job. Don't rerun it.
if ti.state == TaskInstanceState.SUCCESS:
ti_status.succeeded.add(key)
self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
ti_status.to_run.pop(key)
if key in ti_status.running:
ti_status.running.pop(key)
return
elif ti.state == TaskInstanceState.SKIPPED:
ti_status.skipped.add(key)
self.log.debug("Task instance %s skipped. Don't rerun.", ti)
ti_status.to_run.pop(key)
if key in ti_status.running:
ti_status.running.pop(key)
return
if self.rerun_failed_tasks:
# Rerun failed tasks or upstreamed failed tasks
if ti.state in (TaskInstanceState.FAILED,
TaskInstanceState.UPSTREAM_FAILED):
self.log.error("Task instance %s with state %s", ti,
ti.state)
if key in ti_status.running:
ti_status.running.pop(key)
# Reset the failed task in backfill to scheduled state
ti.try_number += 1
ti.set_state(TaskInstanceState.SCHEDULED, session=session)
if ti.dag_run not in ti_status.active_runs:
ti_status.active_runs.add(ti.dag_run)
else:
# Default behaviour which works for subdag.
if ti.state in (TaskInstanceState.FAILED,
TaskInstanceState.UPSTREAM_FAILED):
self.log.error("Task instance %s with state %s", ti,
ti.state)
ti_status.failed.add(key)
ti_status.to_run.pop(key)
if key in ti_status.running:
ti_status.running.pop(key)
return
if self.ignore_first_depends_on_past:
dagrun = ti.get_dagrun(session=session)
ignore_depends_on_past = dagrun.execution_date == (start_date or
ti.start_date)
else:
ignore_depends_on_past = False
backfill_context = DepContext(
deps=BACKFILL_QUEUED_DEPS,
ignore_depends_on_past=ignore_depends_on_past,
ignore_task_deps=self.ignore_task_deps,
wait_for_past_depends_before_skipping=False,
flag_upstream_failed=True,
)
# Is the task runnable? -- then run it
# the dependency checker can change states of tis
if ti.are_dependencies_met(
dep_context=backfill_context, session=session,
verbose=self.verbose
):
if executor.has_task(ti):
self.log.debug("Task Instance %s already in executor waiting
for queue to clear", ti)
else:
self.log.debug("Sending %s to executor", ti)
# Skip scheduled state, we are executing immediately
if ti.state in (TaskInstanceState.UP_FOR_RETRY, None):
# i am not sure why this is necessary.
# seemingly a quirk of backfill runner.
# it should be handled elsewhere i think.
# seems the leaf tasks are set SCHEDULED but others not.
# but i am not going to look too closely since we need
# to nuke the current backfill approach anyway.
ti.try_number += 1
ti.state = TaskInstanceState.QUEUED
ti.queued_by_job_id = self.job.id
ti.queued_dttm = timezone.utcnow()
session.merge(ti)
try:
session.commit()
except OperationalError:
self.log.exception("Failed to commit task state change
due to operational error")
session.rollback()
# early exit so the outer loop can retry
return
cfg_path = None
if executor.is_local:
cfg_path = tmp_configuration_copy()
executor.queue_task_instance(
ti,
mark_success=self.mark_success,
pickle_id=pickle_id,
ignore_task_deps=self.ignore_task_deps,
ignore_depends_on_past=ignore_depends_on_past,
wait_for_past_depends_before_skipping=False,
pool=self.pool,
cfg_path=cfg_path,
)
ti_status.running[key] = ti
ti_status.to_run.pop(key)
return
if ti.state == TaskInstanceState.UPSTREAM_FAILED:
self.log.error("Task instance %s upstream failed", ti)
ti_status.failed.add(key)
> ti_status.to_run.pop(key)
E KeyError:
TaskInstanceKey(dag_id='test_backfill_with_upstream_failed_task',
task_id='downstream_task', run_id='backfill__2016-01-01T00:00:00+00:00',
try_number=0, map_index=-1)
airflow/jobs/backfill_job_runner.py:609: KeyError
---------------------------- Captured stderr setup
-----------------------------
INFO [airflow.models.dagbag.DagBag] Filling up the DagBag from /dev/null
------------------------------ Captured log setup
------------------------------
INFO airflow.models.dagbag.DagBag:dagbag.py:574 Filling up the DagBag
from /dev/null
----------------------------- Captured stderr call
-----------------------------
INFO [airflow.executors.local_executor.LocalExecutor] Adding to queue:
['airflow', 'tasks', 'run', 'test_backfill_with_upstream_failed_task',
'failing_task', 'backfill__2016-01-01T00:00:00+00:00', '--local', '--pool',
'default_pool', '--subdir',
'DAGS_FOLDER/test_backfill_with_upstream_failed_task.py', '--cfg-path',
'/tmp/tmpbhrfampg']
INFO [airflow.executors.local_executor.QueuedLocalWorker] QueuedLocalWorker
running ['airflow', 'tasks', 'run', 'test_backfill_with_upstream_failed_task',
'failing_task', 'backfill__2016-01-01T00:00:00+00:00', '--local', '--pool',
'default_pool', '--subdir',
'DAGS_FOLDER/test_backfill_with_upstream_failed_task.py', '--cfg-path',
'/tmp/tmpbhrfampg']
INFO [airflow.jobs.backfill_job_runner.BackfillJobRunner] [backfill
progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 0 | running: 1
| failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
INFO [airflow.models.dagbag.DagBag] Filling up the DagBag from
/opt/airflow/tests/dags/test_backfill_with_upstream_failed_task.py
INFO [airflow.cli.commands.task_command] Running <TaskInstance:
test_backfill_with_upstream_failed_task.failing_task
backfill__2016-01-01T00:00:00+00:00 [queued]> on host baa783c46d36
Traceback (most recent call last):
File "/opt/airflow/airflow/jobs/backfill_job_runner.py", line 700, in
_process_backfill_task_instances
session.commit()
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 1454, in commit
self._transaction.commit(_to_root=self.future)
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 832, in commit
self._prepare_impl()
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 811, in _prepare_impl
self.session.flush()
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 3449, in flush
self._flush(objects)
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 3588, in _flush
with util.safe_reraise():
File
"/usr/local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line
70, in __exit__
compat.raise_(
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
raise exception
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 3549, in _flush
flush_context.execute()
File
"/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line
456, in execute
rec.execute(self)
File
"/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line
630, in execute
util.preloaded.orm_persistence.save_obj(
File
"/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line
237, in save_obj
_emit_update_statements(
File
"/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line
1001, in _emit_update_statements
c = connection._execute_20(
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py",
line 1710, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py",
line 334, in _execute_on_connection
return connection._execute_clauseelement(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py",
line 1577, in _execute_clauseelement
ret = self._execute_context(
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py",
line 1953, in _execute_context
self._handle_dbapi_exception(
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py",
line 2134, in _handle_dbapi_exception
util.raise_(
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
raise exception
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py",
line 1910, in _execute_context
self.dialect.do_execute(
File
"/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line
736, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.11/site-packages/MySQLdb/cursors.py", line
179, in execute
res = self._query(mogrified_query)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/MySQLdb/cursors.py", line
330, in _query
db.query(q)
File "/usr/local/lib/python3.11/site-packages/MySQLdb/connections.py",
line 261, in query
_mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (1213, 'Deadlock
found when trying to get lock; try restarting transaction')
[SQL: UPDATE dag_run SET last_scheduling_decision=%s, updated_at=%s WHERE
dag_run.id = %s]
[parameters: (None, datetime.datetime(2024, 5, 26, 18, 4, 3, 111505), 150)]
```
### Committer
- [X] I acknowledge that I am a maintainer/committer of the Apache Airflow
project.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]