ephraimbuddy commented on code in PR #58807:
URL: https://github.com/apache/airflow/pull/58807#discussion_r2580428737


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1916,11 +1916,40 @@ def _should_update_dag_next_dagruns(
             return False
         return True
 
+    def _lock_backfills(self, dag_runs: Collection[DagRun], session: Session) 
-> dict[int, Backfill]:
+        """
+        Lock Backfill rows to prevent race conditions when multiple schedulers 
run concurrently.
+
+        :param dag_runs: Collection of Dag runs to process
+        :param session: DB session
+        :return: Dict mapping backfill_id to locked Backfill objects
+        """
+        if not (backfill_ids := {dr.backfill_id for dr in dag_runs if 
dr.backfill_id is not None}):
+            return {}
+
+        locked_backfills = {
+            b.id: b
+            for b in session.scalars(
+                
select(Backfill).where(Backfill.id.in_(backfill_ids)).with_for_update(skip_locked=True)
+            )
+        }
+
+        if (skipped_backfills := backfill_ids - locked_backfills.keys()):

Review Comment:
   ```suggestion
           if skipped_backfills := backfill_ids - locked_backfills.keys():
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to