ferruzzi commented on code in PR #61153:
URL: https://github.com/apache/airflow/pull/61153#discussion_r2791464599
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -979,6 +985,70 @@ def _critical_section_enqueue_task_instances(self,
session: Session) -> int:
return len(queued_tis)
+ def _enqueue_executor_callbacks(self, session: Session) -> None:
+ """
+ Enqueue ExecutorCallback workloads to executors.
+
+ Similar to _enqueue_task_instances, but for callbacks that need to run
on executors.
+ Queries for QUEUED ExecutorCallback instances and routes them to the
appropriate executor.
+
+ :param session: The database session
+ """
+ num_occupied_slots = sum(executor.slots_occupied for executor in
self.job.executors)
+ max_callbacks = conf.getint("core", "parallelism") - num_occupied_slots
+
+ if max_callbacks <= 0:
+ self.log.debug("No available slots for callbacks; all executors at
capacity")
+ return
+
+ queued_callbacks = session.scalars(
+ select(ExecutorCallback)
+ .where(ExecutorCallback.type == CallbackType.EXECUTOR)
+ .where(ExecutorCallback.state == CallbackState.QUEUED)
+ .order_by(ExecutorCallback.priority_weight.desc())
+ .limit(max_callbacks)
+ ).all()
+
+ if not queued_callbacks:
+ return
+
+ # Route callbacks to executors using the generalized routing method
+ executor_to_callbacks = self._executor_to_workloads(queued_callbacks,
session)
+
+ # Enqueue callbacks for each executor
+ for executor, callbacks in executor_to_callbacks.items():
+ for callback in callbacks:
+ if not isinstance(callback, ExecutorCallback):
+ # Can't happen since we queried ExecutorCallback, but
satisfies mypy.
+ continue
+ dag_run = None
+ if isinstance(callback.data, dict) and "dag_run_id" in
callback.data:
+ dag_run_id = callback.data["dag_run_id"]
+ dag_run = session.get(DagRun, dag_run_id)
+ elif isinstance(callback.data, dict) and "dag_id" in
callback.data:
+ # Fallback: try to find the latest dag_run for the dag_id
+ dag_id = callback.data["dag_id"]
+ dag_run = session.scalars(
+ select(DagRun)
+ .where(DagRun.dag_id == dag_id)
+ .order_by(DagRun.execution_date.desc())
+ .limit(1)
+ ).first()
+
+ if dag_run is None:
+ self.log.warning("Could not find DagRun for callback %s",
callback.id)
+ continue
+
+ workload = workloads.ExecuteCallback.make(
+ callback=callback,
+ dag_run=dag_run,
+ generator=executor.jwt_generator,
+ )
+
+ executor.queue_workload(workload, session=session)
+ callback.state = CallbackState.RUNNING
Review Comment:
I'm not sure it's behind, but the callbacks are missing a QUEUED state.
PENDING in the Callback.init()
QUEUED by Deadline.handle_miss
RUNNING here after we've queued it.... yeah, alright, I see what you
mean....
I'll add a scheduled state and realign these
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]