ephraimbuddy commented on code in PR #58807:
URL: https://github.com/apache/airflow/pull/58807#discussion_r2571011929
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1916,11 +1916,42 @@ def _should_update_dag_next_dagruns(
return False
return True
+ def _lock_backfills(self, dag_runs: Collection[DagRun], session: Session)
-> dict[int, Backfill]:
+ """
+ Lock Backfill rows to prevent race conditions when multiple schedulers
run concurrently.
+
+ :param dag_runs: Collection of dag runs to process
+ :param session: DB session
+ :return: Dict mapping backfill_id to locked Backfill objects
+ """
+ backfill_ids = {dr.backfill_id for dr in dag_runs if dr.backfill_id is
not None}
+ if not backfill_ids:
+ return {}
+
+ locked_backfills = {
+ b.id: b
+ for b in session.scalars(
+
select(Backfill).where(Backfill.id.in_(backfill_ids)).with_for_update(skip_locked=True)
+ )
+ }
+
+ skipped_backfills = backfill_ids - locked_backfills.keys()
+ if skipped_backfills:
+ self.log.debug(
+ "Skipping backfill runs for backfill_ids=%s - locked by
another scheduler",
+ skipped_backfills,
+ )
+
+ return locked_backfills
+
@add_debug_span
def _start_queued_dagruns(self, session: Session) -> None:
"""Find DagRuns in queued state and decide moving them to running
state."""
- # added all() to save runtime, otherwise query is executed more than
once
Review Comment:
This is outdated as the code it referred to has been refactored long ago
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]