This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-7-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit e9c82d9319c0f27d40f49aca612eb1713e06f4ea
Author: Jarek Potiuk <[email protected]>
AuthorDate: Wed Aug 9 13:00:48 2023 +0200

    wOrkaround failing dedlock when running backfill (#32991)
    
    The dask_executor backfill tests started to fail recently more often due
    to backfill exception, and the likely cause for it is that it is now
    better parallelise execution and triggering of the deadlocks because of
    contention betwee dag_run state update and task state update had
    become much easier.
    
    While this PR does not fix the underlying issue, it catches the
    operational error where the deadlock occured during the backfill.
    and rolls back the operation.
    
    This **should** be safe. backfil has a built-in mechanism to loop and
    retry failed tasks and the test passed multiple times, completing the
    backfill after this fix was applied. It was not easy to reproduce it
    locally but it failed every 20-30 times. When extra logging was added,
    it was always connected to OperationalException raised (and caught)
    right after _per_task_process. The same exception was observed few times
    when rollback was added, and despite it backfill job retried and
    completed the process successfully every time. We also leave the logs
    with exceptions and add reassuring messages that should make it clear
    that in case backfill completes, the exceptions can be ignored as
    the updates will be retried by the backfill job.
    
    Fixes: #32778
    (cherry picked from commit f616ee8dff8e6ba9b37cbce0d22235dc47d4fc93)
---
 airflow/jobs/backfill_job_runner.py                | 117 +++++++++++++--------
 tests/providers/daskexecutor/test_dask_executor.py |   1 -
 2 files changed, 73 insertions(+), 45 deletions(-)

diff --git a/airflow/jobs/backfill_job_runner.py 
b/airflow/jobs/backfill_job_runner.py
index 162f103642..3d2c20c612 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -588,61 +588,83 @@ class BackfillJobRunner(BaseJobRunner[Job], LoggingMixin):
             try:
                 for task in 
self.dag.topological_sort(include_subdag_tasks=True):
                     for key, ti in list(ti_status.to_run.items()):
-                        if task.task_id != ti.task_id:
-                            continue
-
-                        pool = session.scalar(
-                            select(models.Pool).where(models.Pool.pool == 
task.pool).limit(1)
-                        )
-                        if not pool:
-                            raise PoolNotFound(f"Unknown pool: {task.pool}")
-
-                        open_slots = pool.open_slots(session=session)
-                        if open_slots <= 0:
-                            raise NoAvailablePoolSlot(
-                                f"Not scheduling since there are {open_slots} 
open slots in pool {task.pool}"
-                            )
-
-                        num_running_task_instances_in_dag = 
DAG.get_num_task_instances(
-                            self.dag_id,
-                            states=self.STATES_COUNT_AS_RUNNING,
-                            session=session,
-                        )
-
-                        if num_running_task_instances_in_dag >= 
self.dag.max_active_tasks:
-                            raise DagConcurrencyLimitReached(
-                                "Not scheduling since DAG max_active_tasks 
limit is reached."
+                        # Attempt to workaround deadlock on backfill by 
attempting to commit the transaction
+                        # state update few times before giving up
+                        max_attempts = 5
+                        for i in range(max_attempts):
+                            if task.task_id != ti.task_id:
+                                continue
+
+                            pool = session.scalar(
+                                select(models.Pool).where(models.Pool.pool == 
task.pool).limit(1)
                             )
+                            if not pool:
+                                raise PoolNotFound(f"Unknown pool: 
{task.pool}")
+
+                            open_slots = pool.open_slots(session=session)
+                            if open_slots <= 0:
+                                raise NoAvailablePoolSlot(
+                                    f"Not scheduling since there are 
{open_slots} "
+                                    f"open slots in pool {task.pool}"
+                                )
 
-                        if task.max_active_tis_per_dag is not None:
-                            num_running_task_instances_in_task = 
DAG.get_num_task_instances(
-                                dag_id=self.dag_id,
-                                task_ids=[task.task_id],
+                            num_running_task_instances_in_dag = 
DAG.get_num_task_instances(
+                                self.dag_id,
                                 states=self.STATES_COUNT_AS_RUNNING,
                                 session=session,
                             )
 
-                            if num_running_task_instances_in_task >= 
task.max_active_tis_per_dag:
-                                raise TaskConcurrencyLimitReached(
-                                    "Not scheduling since Task concurrency 
limit is reached."
+                            if num_running_task_instances_in_dag >= 
self.dag.max_active_tasks:
+                                raise DagConcurrencyLimitReached(
+                                    "Not scheduling since DAG max_active_tasks 
limit is reached."
                                 )
 
-                        if task.max_active_tis_per_dagrun is not None:
-                            num_running_task_instances_in_task_dagrun = 
DAG.get_num_task_instances(
-                                dag_id=self.dag_id,
-                                run_id=ti.run_id,
-                                task_ids=[task.task_id],
-                                states=self.STATES_COUNT_AS_RUNNING,
-                                session=session,
-                            )
+                            if task.max_active_tis_per_dag is not None:
+                                num_running_task_instances_in_task = 
DAG.get_num_task_instances(
+                                    dag_id=self.dag_id,
+                                    task_ids=[task.task_id],
+                                    states=self.STATES_COUNT_AS_RUNNING,
+                                    session=session,
+                                )
 
-                            if num_running_task_instances_in_task_dagrun >= 
task.max_active_tis_per_dagrun:
-                                raise TaskConcurrencyLimitReached(
-                                    "Not scheduling since Task concurrency per 
DAG run limit is reached."
+                                if num_running_task_instances_in_task >= 
task.max_active_tis_per_dag:
+                                    raise TaskConcurrencyLimitReached(
+                                        "Not scheduling since Task concurrency 
limit is reached."
+                                    )
+
+                            if task.max_active_tis_per_dagrun is not None:
+                                num_running_task_instances_in_task_dagrun = 
DAG.get_num_task_instances(
+                                    dag_id=self.dag_id,
+                                    run_id=ti.run_id,
+                                    task_ids=[task.task_id],
+                                    states=self.STATES_COUNT_AS_RUNNING,
+                                    session=session,
                                 )
 
-                        _per_task_process(key, ti, session)
-                        session.commit()
+                                if (
+                                    num_running_task_instances_in_task_dagrun
+                                    >= task.max_active_tis_per_dagrun
+                                ):
+                                    raise TaskConcurrencyLimitReached(
+                                        "Not scheduling since Task concurrency 
per DAG run limit is reached."
+                                    )
+
+                            _per_task_process(key, ti, session)
+                            try:
+                                session.commit()
+                                # break the retry loop
+                                break
+                            except OperationalError:
+                                self.log.error(
+                                    "Failed to commit task state due to 
operational error. "
+                                    "The job will retry this operation so if 
your backfill succeeds, "
+                                    "you can safely ignore this message.",
+                                    exc_info=True,
+                                )
+                                session.rollback()
+                                if i == max_attempts - 1:
+                                    raise
+                                # retry the loop
             except (NoAvailablePoolSlot, DagConcurrencyLimitReached, 
TaskConcurrencyLimitReached) as e:
                 self.log.debug(e)
 
@@ -939,6 +961,13 @@ class BackfillJobRunner(BaseJobRunner[Job], LoggingMixin):
             # TODO: we will need to terminate running task instances and set 
the
             # state to failed.
             self._set_unfinished_dag_runs_to_failed(ti_status.active_runs)
+        except OperationalError:
+            self.log.error(
+                "Backfill job dead-locked. The job will retry the job so it is 
likely "
+                "to heal itself. If your backfill succeeds you can ignore this 
exception.",
+                exc_info=True,
+            )
+            raise
         finally:
             session.commit()
             executor.end()
diff --git a/tests/providers/daskexecutor/test_dask_executor.py 
b/tests/providers/daskexecutor/test_dask_executor.py
index 2d559eaa40..c5773bd83d 100644
--- a/tests/providers/daskexecutor/test_dask_executor.py
+++ b/tests/providers/daskexecutor/test_dask_executor.py
@@ -104,7 +104,6 @@ class TestDaskExecutor(TestBaseDask):
     # This test is quarantined because it became rather flaky on our CI in 
July 2023 and reason for this
     # is unknown. An issue for that was created: 
https://github.com/apache/airflow/issues/32778 and the
     # marker should be removed while (possibly) the reason for flaky behaviour 
is found and fixed.
-    @pytest.mark.quarantined
     @pytest.mark.execution_timeout(180)
     def test_backfill_integration(self):
         """

Reply via email to