Lee-W commented on code in PR #58807:
URL: https://github.com/apache/airflow/pull/58807#discussion_r2580036927


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1916,11 +1916,42 @@ def _should_update_dag_next_dagruns(
             return False
         return True
 
+    def _lock_backfills(self, dag_runs: Collection[DagRun], session: Session) 
-> dict[int, Backfill]:
+        """
+        Lock Backfill rows to prevent race conditions when multiple schedulers 
run concurrently.
+
+        :param dag_runs: Collection of dag runs to process

Review Comment:
   ```suggestion
           :param dag_runs: Collection of Dag runs to process
   ```



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1916,11 +1916,42 @@ def _should_update_dag_next_dagruns(
             return False
         return True
 
+    def _lock_backfills(self, dag_runs: Collection[DagRun], session: Session) 
-> dict[int, Backfill]:
+        """
+        Lock Backfill rows to prevent race conditions when multiple schedulers 
run concurrently.
+
+        :param dag_runs: Collection of dag runs to process
+        :param session: DB session
+        :return: Dict mapping backfill_id to locked Backfill objects
+        """
+        backfill_ids = {dr.backfill_id for dr in dag_runs if dr.backfill_id is 
not None}
+        if not backfill_ids:
+            return {}

Review Comment:
   ```suggestion
           if not (backfill_ids := {dr.backfill_id for dr in dag_runs if 
dr.backfill_id is not None}):
               return {}
   ```



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1916,11 +1916,42 @@ def _should_update_dag_next_dagruns(
             return False
         return True
 
+    def _lock_backfills(self, dag_runs: Collection[DagRun], session: Session) 
-> dict[int, Backfill]:
+        """
+        Lock Backfill rows to prevent race conditions when multiple schedulers 
run concurrently.
+
+        :param dag_runs: Collection of dag runs to process
+        :param session: DB session
+        :return: Dict mapping backfill_id to locked Backfill objects
+        """
+        backfill_ids = {dr.backfill_id for dr in dag_runs if dr.backfill_id is 
not None}
+        if not backfill_ids:
+            return {}
+
+        locked_backfills = {
+            b.id: b
+            for b in session.scalars(
+                
select(Backfill).where(Backfill.id.in_(backfill_ids)).with_for_update(skip_locked=True)
+            )
+        }
+
+        skipped_backfills = backfill_ids - locked_backfills.keys()
+        if skipped_backfills:
+            self.log.debug(
+                "Skipping backfill runs for backfill_ids=%s - locked by 
another scheduler",
+                skipped_backfills,
+            )

Review Comment:
   ```suggestion
           if (skipped_backfills := backfill_ids - locked_backfills.keys()):
               self.log.debug(
                   "Skipping backfill runs for backfill_ids=%s - locked by 
another scheduler",
                   skipped_backfills,
               )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to