This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f616ee8dff wOrkaround failing dedlock when running backfill (#32991)
f616ee8dff is described below
commit f616ee8dff8e6ba9b37cbce0d22235dc47d4fc93
Author: Jarek Potiuk <[email protected]>
AuthorDate: Wed Aug 9 13:00:48 2023 +0200
wOrkaround failing dedlock when running backfill (#32991)
The dask_executor backfill tests started to fail recently more often due
to backfill exception, and the likely cause for it is that it is now
better parallelise execution and triggering of the deadlocks because of
contention betwee dag_run state update and task state update had
become much easier.
While this PR does not fix the underlying issue, it catches the
operational error where the deadlock occured during the backfill.
and rolls back the operation.
This **should** be safe. backfil has a built-in mechanism to loop and
retry failed tasks and the test passed multiple times, completing the
backfill after this fix was applied. It was not easy to reproduce it
locally but it failed every 20-30 times. When extra logging was added,
it was always connected to OperationalException raised (and caught)
right after _per_task_process. The same exception was observed few times
when rollback was added, and despite it backfill job retried and
completed the process successfully every time. We also leave the logs
with exceptions and add reassuring messages that should make it clear
that in case backfill completes, the exceptions can be ignored as
the updates will be retried by the backfill job.
Fixes: #32778
---
airflow/jobs/backfill_job_runner.py | 117 +++++++++++++--------
tests/providers/daskexecutor/test_dask_executor.py | 1 -
2 files changed, 73 insertions(+), 45 deletions(-)
diff --git a/airflow/jobs/backfill_job_runner.py
b/airflow/jobs/backfill_job_runner.py
index 162f103642..3d2c20c612 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -588,61 +588,83 @@ class BackfillJobRunner(BaseJobRunner[Job], LoggingMixin):
try:
for task in
self.dag.topological_sort(include_subdag_tasks=True):
for key, ti in list(ti_status.to_run.items()):
- if task.task_id != ti.task_id:
- continue
-
- pool = session.scalar(
- select(models.Pool).where(models.Pool.pool ==
task.pool).limit(1)
- )
- if not pool:
- raise PoolNotFound(f"Unknown pool: {task.pool}")
-
- open_slots = pool.open_slots(session=session)
- if open_slots <= 0:
- raise NoAvailablePoolSlot(
- f"Not scheduling since there are {open_slots}
open slots in pool {task.pool}"
- )
-
- num_running_task_instances_in_dag =
DAG.get_num_task_instances(
- self.dag_id,
- states=self.STATES_COUNT_AS_RUNNING,
- session=session,
- )
-
- if num_running_task_instances_in_dag >=
self.dag.max_active_tasks:
- raise DagConcurrencyLimitReached(
- "Not scheduling since DAG max_active_tasks
limit is reached."
+ # Attempt to workaround deadlock on backfill by
attempting to commit the transaction
+ # state update few times before giving up
+ max_attempts = 5
+ for i in range(max_attempts):
+ if task.task_id != ti.task_id:
+ continue
+
+ pool = session.scalar(
+ select(models.Pool).where(models.Pool.pool ==
task.pool).limit(1)
)
+ if not pool:
+ raise PoolNotFound(f"Unknown pool:
{task.pool}")
+
+ open_slots = pool.open_slots(session=session)
+ if open_slots <= 0:
+ raise NoAvailablePoolSlot(
+ f"Not scheduling since there are
{open_slots} "
+ f"open slots in pool {task.pool}"
+ )
- if task.max_active_tis_per_dag is not None:
- num_running_task_instances_in_task =
DAG.get_num_task_instances(
- dag_id=self.dag_id,
- task_ids=[task.task_id],
+ num_running_task_instances_in_dag =
DAG.get_num_task_instances(
+ self.dag_id,
states=self.STATES_COUNT_AS_RUNNING,
session=session,
)
- if num_running_task_instances_in_task >=
task.max_active_tis_per_dag:
- raise TaskConcurrencyLimitReached(
- "Not scheduling since Task concurrency
limit is reached."
+ if num_running_task_instances_in_dag >=
self.dag.max_active_tasks:
+ raise DagConcurrencyLimitReached(
+ "Not scheduling since DAG max_active_tasks
limit is reached."
)
- if task.max_active_tis_per_dagrun is not None:
- num_running_task_instances_in_task_dagrun =
DAG.get_num_task_instances(
- dag_id=self.dag_id,
- run_id=ti.run_id,
- task_ids=[task.task_id],
- states=self.STATES_COUNT_AS_RUNNING,
- session=session,
- )
+ if task.max_active_tis_per_dag is not None:
+ num_running_task_instances_in_task =
DAG.get_num_task_instances(
+ dag_id=self.dag_id,
+ task_ids=[task.task_id],
+ states=self.STATES_COUNT_AS_RUNNING,
+ session=session,
+ )
- if num_running_task_instances_in_task_dagrun >=
task.max_active_tis_per_dagrun:
- raise TaskConcurrencyLimitReached(
- "Not scheduling since Task concurrency per
DAG run limit is reached."
+ if num_running_task_instances_in_task >=
task.max_active_tis_per_dag:
+ raise TaskConcurrencyLimitReached(
+ "Not scheduling since Task concurrency
limit is reached."
+ )
+
+ if task.max_active_tis_per_dagrun is not None:
+ num_running_task_instances_in_task_dagrun =
DAG.get_num_task_instances(
+ dag_id=self.dag_id,
+ run_id=ti.run_id,
+ task_ids=[task.task_id],
+ states=self.STATES_COUNT_AS_RUNNING,
+ session=session,
)
- _per_task_process(key, ti, session)
- session.commit()
+ if (
+ num_running_task_instances_in_task_dagrun
+ >= task.max_active_tis_per_dagrun
+ ):
+ raise TaskConcurrencyLimitReached(
+ "Not scheduling since Task concurrency
per DAG run limit is reached."
+ )
+
+ _per_task_process(key, ti, session)
+ try:
+ session.commit()
+ # break the retry loop
+ break
+ except OperationalError:
+ self.log.error(
+ "Failed to commit task state due to
operational error. "
+ "The job will retry this operation so if
your backfill succeeds, "
+ "you can safely ignore this message.",
+ exc_info=True,
+ )
+ session.rollback()
+ if i == max_attempts - 1:
+ raise
+ # retry the loop
except (NoAvailablePoolSlot, DagConcurrencyLimitReached,
TaskConcurrencyLimitReached) as e:
self.log.debug(e)
@@ -939,6 +961,13 @@ class BackfillJobRunner(BaseJobRunner[Job], LoggingMixin):
# TODO: we will need to terminate running task instances and set
the
# state to failed.
self._set_unfinished_dag_runs_to_failed(ti_status.active_runs)
+ except OperationalError:
+ self.log.error(
+ "Backfill job dead-locked. The job will retry the job so it is
likely "
+ "to heal itself. If your backfill succeeds you can ignore this
exception.",
+ exc_info=True,
+ )
+ raise
finally:
session.commit()
executor.end()
diff --git a/tests/providers/daskexecutor/test_dask_executor.py
b/tests/providers/daskexecutor/test_dask_executor.py
index 2d559eaa40..c5773bd83d 100644
--- a/tests/providers/daskexecutor/test_dask_executor.py
+++ b/tests/providers/daskexecutor/test_dask_executor.py
@@ -104,7 +104,6 @@ class TestDaskExecutor(TestBaseDask):
# This test is quarantined because it became rather flaky on our CI in
July 2023 and reason for this
# is unknown. An issue for that was created:
https://github.com/apache/airflow/issues/32778 and the
# marker should be removed while (possibly) the reason for flaky behaviour
is found and fixed.
- @pytest.mark.quarantined
@pytest.mark.execution_timeout(180)
def test_backfill_integration(self):
"""