ferruzzi commented on code in PR #61153:
URL: https://github.com/apache/airflow/pull/61153#discussion_r2766710378
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -883,6 +883,103 @@ def _critical_section_enqueue_task_instances(self,
session: Session) -> int:
return len(queued_tis)
+ def _enqueue_executor_callbacks(self, session: Session) -> None:
+ """
+ Enqueue ExecutorCallback workloads to executors.
+
+ Similar to _enqueue_task_instances, but for callbacks that need to run
on executors.
+ Queries for QUEUED ExecutorCallback instances and routes them to the
appropriate executor.
+
+ :param session: The database session
+ """
+ # Query for QUEUED ExecutorCallback instances
+ from airflow.models.callback import CallbackType
+
+ queued_callbacks = session.scalars(
+ select(ExecutorCallback)
+ .where(ExecutorCallback.type == CallbackType.EXECUTOR)
+ .where(ExecutorCallback.state == CallbackState.QUEUED)
+ .order_by(ExecutorCallback.priority_weight.desc())
+ .limit(conf.getint("scheduler", "max_callback_workloads_per_loop",
fallback=100))
+ ).all()
+
+ if not queued_callbacks:
+ return
+
+ # Group callbacks by executor (based on callback executor attribute or
default executor)
+ executor_to_callbacks: dict[BaseExecutor, list[ExecutorCallback]] =
defaultdict(list)
+
+ for callback in queued_callbacks:
+ # Get the executor name from callback data if specified
+ executor_name = None
+ if isinstance(callback.data, dict):
+ executor_name = callback.data.get("executor")
+
+ # Find the appropriate executor
+ executor = None
+ if executor_name:
+ # Find executor by name - try multiple matching strategies
+ for exec in self.job.executors:
+ # Match by class name (e.g., "CeleryExecutor")
+ if exec.__class__.__name__ == executor_name:
+ executor = exec
+ break
+ # Match by executor name attribute if available
+ if hasattr(exec, "name") and exec.name and str(exec.name)
== executor_name:
+ executor = exec
+ break
+ # Match by executor name attribute if available
+ if hasattr(exec, "executor_name") and exec.executor_name
== executor_name:
+ executor = exec
+ break
+
+ # Default to first executor if no specific executor found
+ if executor is None:
+ executor = self.job.executors[0] if self.job.executors else
None
+
+ if executor is None:
+ self.log.warning("No executor available for callback %s",
callback.id)
+ continue
Review Comment:
This should be addressed with the generalization of _executor_to_workloads
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]