ephraimbuddy commented on code in PR #58807:
URL: https://github.com/apache/airflow/pull/58807#discussion_r2580428737
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1916,11 +1916,40 @@ def _should_update_dag_next_dagruns(
return False
return True
+ def _lock_backfills(self, dag_runs: Collection[DagRun], session: Session)
-> dict[int, Backfill]:
+ """
+ Lock Backfill rows to prevent race conditions when multiple schedulers
run concurrently.
+
+ :param dag_runs: Collection of Dag runs to process
+ :param session: DB session
+ :return: Dict mapping backfill_id to locked Backfill objects
+ """
+ if not (backfill_ids := {dr.backfill_id for dr in dag_runs if
dr.backfill_id is not None}):
+ return {}
+
+ locked_backfills = {
+ b.id: b
+ for b in session.scalars(
+
select(Backfill).where(Backfill.id.in_(backfill_ids)).with_for_update(skip_locked=True)
+ )
+ }
+
+ if (skipped_backfills := backfill_ids - locked_backfills.keys()):
Review Comment:
```suggestion
if skipped_backfills := backfill_ids - locked_backfills.keys():
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]