This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-7-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit e9c82d9319c0f27d40f49aca612eb1713e06f4ea Author: Jarek Potiuk <[email protected]> AuthorDate: Wed Aug 9 13:00:48 2023 +0200 wOrkaround failing dedlock when running backfill (#32991) The dask_executor backfill tests started to fail recently more often due to backfill exception, and the likely cause for it is that it is now better parallelise execution and triggering of the deadlocks because of contention betwee dag_run state update and task state update had become much easier. While this PR does not fix the underlying issue, it catches the operational error where the deadlock occured during the backfill. and rolls back the operation. This **should** be safe. backfil has a built-in mechanism to loop and retry failed tasks and the test passed multiple times, completing the backfill after this fix was applied. It was not easy to reproduce it locally but it failed every 20-30 times. When extra logging was added, it was always connected to OperationalException raised (and caught) right after _per_task_process. The same exception was observed few times when rollback was added, and despite it backfill job retried and completed the process successfully every time. We also leave the logs with exceptions and add reassuring messages that should make it clear that in case backfill completes, the exceptions can be ignored as the updates will be retried by the backfill job. Fixes: #32778 (cherry picked from commit f616ee8dff8e6ba9b37cbce0d22235dc47d4fc93) --- airflow/jobs/backfill_job_runner.py | 117 +++++++++++++-------- tests/providers/daskexecutor/test_dask_executor.py | 1 - 2 files changed, 73 insertions(+), 45 deletions(-) diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index 162f103642..3d2c20c612 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -588,61 +588,83 @@ class BackfillJobRunner(BaseJobRunner[Job], LoggingMixin): try: for task in self.dag.topological_sort(include_subdag_tasks=True): for key, ti in list(ti_status.to_run.items()): - if task.task_id != ti.task_id: - continue - - pool = session.scalar( - select(models.Pool).where(models.Pool.pool == task.pool).limit(1) - ) - if not pool: - raise PoolNotFound(f"Unknown pool: {task.pool}") - - open_slots = pool.open_slots(session=session) - if open_slots <= 0: - raise NoAvailablePoolSlot( - f"Not scheduling since there are {open_slots} open slots in pool {task.pool}" - ) - - num_running_task_instances_in_dag = DAG.get_num_task_instances( - self.dag_id, - states=self.STATES_COUNT_AS_RUNNING, - session=session, - ) - - if num_running_task_instances_in_dag >= self.dag.max_active_tasks: - raise DagConcurrencyLimitReached( - "Not scheduling since DAG max_active_tasks limit is reached." + # Attempt to workaround deadlock on backfill by attempting to commit the transaction + # state update few times before giving up + max_attempts = 5 + for i in range(max_attempts): + if task.task_id != ti.task_id: + continue + + pool = session.scalar( + select(models.Pool).where(models.Pool.pool == task.pool).limit(1) ) + if not pool: + raise PoolNotFound(f"Unknown pool: {task.pool}") + + open_slots = pool.open_slots(session=session) + if open_slots <= 0: + raise NoAvailablePoolSlot( + f"Not scheduling since there are {open_slots} " + f"open slots in pool {task.pool}" + ) - if task.max_active_tis_per_dag is not None: - num_running_task_instances_in_task = DAG.get_num_task_instances( - dag_id=self.dag_id, - task_ids=[task.task_id], + num_running_task_instances_in_dag = DAG.get_num_task_instances( + self.dag_id, states=self.STATES_COUNT_AS_RUNNING, session=session, ) - if num_running_task_instances_in_task >= task.max_active_tis_per_dag: - raise TaskConcurrencyLimitReached( - "Not scheduling since Task concurrency limit is reached." + if num_running_task_instances_in_dag >= self.dag.max_active_tasks: + raise DagConcurrencyLimitReached( + "Not scheduling since DAG max_active_tasks limit is reached." ) - if task.max_active_tis_per_dagrun is not None: - num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances( - dag_id=self.dag_id, - run_id=ti.run_id, - task_ids=[task.task_id], - states=self.STATES_COUNT_AS_RUNNING, - session=session, - ) + if task.max_active_tis_per_dag is not None: + num_running_task_instances_in_task = DAG.get_num_task_instances( + dag_id=self.dag_id, + task_ids=[task.task_id], + states=self.STATES_COUNT_AS_RUNNING, + session=session, + ) - if num_running_task_instances_in_task_dagrun >= task.max_active_tis_per_dagrun: - raise TaskConcurrencyLimitReached( - "Not scheduling since Task concurrency per DAG run limit is reached." + if num_running_task_instances_in_task >= task.max_active_tis_per_dag: + raise TaskConcurrencyLimitReached( + "Not scheduling since Task concurrency limit is reached." + ) + + if task.max_active_tis_per_dagrun is not None: + num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances( + dag_id=self.dag_id, + run_id=ti.run_id, + task_ids=[task.task_id], + states=self.STATES_COUNT_AS_RUNNING, + session=session, ) - _per_task_process(key, ti, session) - session.commit() + if ( + num_running_task_instances_in_task_dagrun + >= task.max_active_tis_per_dagrun + ): + raise TaskConcurrencyLimitReached( + "Not scheduling since Task concurrency per DAG run limit is reached." + ) + + _per_task_process(key, ti, session) + try: + session.commit() + # break the retry loop + break + except OperationalError: + self.log.error( + "Failed to commit task state due to operational error. " + "The job will retry this operation so if your backfill succeeds, " + "you can safely ignore this message.", + exc_info=True, + ) + session.rollback() + if i == max_attempts - 1: + raise + # retry the loop except (NoAvailablePoolSlot, DagConcurrencyLimitReached, TaskConcurrencyLimitReached) as e: self.log.debug(e) @@ -939,6 +961,13 @@ class BackfillJobRunner(BaseJobRunner[Job], LoggingMixin): # TODO: we will need to terminate running task instances and set the # state to failed. self._set_unfinished_dag_runs_to_failed(ti_status.active_runs) + except OperationalError: + self.log.error( + "Backfill job dead-locked. The job will retry the job so it is likely " + "to heal itself. If your backfill succeeds you can ignore this exception.", + exc_info=True, + ) + raise finally: session.commit() executor.end() diff --git a/tests/providers/daskexecutor/test_dask_executor.py b/tests/providers/daskexecutor/test_dask_executor.py index 2d559eaa40..c5773bd83d 100644 --- a/tests/providers/daskexecutor/test_dask_executor.py +++ b/tests/providers/daskexecutor/test_dask_executor.py @@ -104,7 +104,6 @@ class TestDaskExecutor(TestBaseDask): # This test is quarantined because it became rather flaky on our CI in July 2023 and reason for this # is unknown. An issue for that was created: https://github.com/apache/airflow/issues/32778 and the # marker should be removed while (possibly) the reason for flaky behaviour is found and fixed. - @pytest.mark.quarantined @pytest.mark.execution_timeout(180) def test_backfill_integration(self): """
