ferruzzi commented on code in PR #61153:
URL: https://github.com/apache/airflow/pull/61153#discussion_r2791408220
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -979,6 +985,70 @@ def _critical_section_enqueue_task_instances(self,
session: Session) -> int:
return len(queued_tis)
+ def _enqueue_executor_callbacks(self, session: Session) -> None:
+ """
+ Enqueue ExecutorCallback workloads to executors.
+
+ Similar to _enqueue_task_instances, but for callbacks that need to run
on executors.
+ Queries for QUEUED ExecutorCallback instances and routes them to the
appropriate executor.
+
+ :param session: The database session
+ """
+ num_occupied_slots = sum(executor.slots_occupied for executor in
self.job.executors)
+ max_callbacks = conf.getint("core", "parallelism") - num_occupied_slots
+
+ if max_callbacks <= 0:
+ self.log.debug("No available slots for callbacks; all executors at
capacity")
+ return
+
+ queued_callbacks = session.scalars(
+ select(ExecutorCallback)
+ .where(ExecutorCallback.type == CallbackType.EXECUTOR)
+ .where(ExecutorCallback.state == CallbackState.QUEUED)
+ .order_by(ExecutorCallback.priority_weight.desc())
+ .limit(max_callbacks)
+ ).all()
+
+ if not queued_callbacks:
+ return
+
+ # Route callbacks to executors using the generalized routing method
+ executor_to_callbacks = self._executor_to_workloads(queued_callbacks,
session)
+
+ # Enqueue callbacks for each executor
+ for executor, callbacks in executor_to_callbacks.items():
+ for callback in callbacks:
+ if not isinstance(callback, ExecutorCallback):
+ # Can't happen since we queried ExecutorCallback, but
satisfies mypy.
+ continue
+ dag_run = None
Review Comment:
Just me being too defensive and planning for an unknown future.
As it stands, a deadline is the only way to have a Callback workload and a
deadline callback has the darun_id embedded in the callback.data dict, but I
was thinking of some unknown future case (like moving the
on_(success|failure)_callback over to this workload paradigm where that may not
be the case.
I've simplified it for now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]