ferruzzi commented on code in PR #61153:
URL: https://github.com/apache/airflow/pull/61153#discussion_r2738604506


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -883,6 +883,103 @@ def _critical_section_enqueue_task_instances(self, 
session: Session) -> int:
 
         return len(queued_tis)
 
+    def _enqueue_executor_callbacks(self, session: Session) -> None:
+        """
+        Enqueue ExecutorCallback workloads to executors.
+
+        Similar to _enqueue_task_instances, but for callbacks that need to run 
on executors.
+        Queries for QUEUED ExecutorCallback instances and routes them to the 
appropriate executor.
+
+        :param session: The database session
+        """
+        # Query for QUEUED ExecutorCallback instances
+        from airflow.models.callback import CallbackType
+
+        queued_callbacks = session.scalars(
+            select(ExecutorCallback)
+            .where(ExecutorCallback.type == CallbackType.EXECUTOR)
+            .where(ExecutorCallback.state == CallbackState.QUEUED)
+            .order_by(ExecutorCallback.priority_weight.desc())
+            .limit(conf.getint("scheduler", "max_callback_workloads_per_loop", 
fallback=100))

Review Comment:
   Ah.   I think I see what you are saying, I missed that.   I may need to chat 
offline to clarify, but I'll look into it.  Thanks for catching that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to