o-nikolas commented on code in PR #61153:
URL: https://github.com/apache/airflow/pull/61153#discussion_r2830069032


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3201,22 +3313,28 @@ def _try_to_load_executor(self, ti: TaskInstance, 
session, team_name=NOTSET) ->
                     # No executor found for that team, fall back to global 
default
                     executor = self.executor
         else:
-            # An executor is specified on the TaskInstance (as a str), so we 
need to find it in the list of executors
+            # An executor is specified on the workload (as a str), so we need 
to find it in the list of executors
             for _executor in self.executors:
-                if _executor.name and ti.executor in (_executor.name.alias, 
_executor.name.module_path):
+                if _executor.name and workload.get_executor_name() in (
+                    _executor.name.alias,
+                    _executor.name.module_path,
+                ):
                     # The executor must either match the team or be global 
(i.e. team_name is None)
                     if team_name and _executor.team_name == team_name or 
_executor.team_name is None:
                         executor = _executor
 
         if executor is not None:
-            self.log.debug("Found executor %s for task %s (team: %s)", 
executor.name, ti, team_name)
+            self.log.debug("Found executor %s for workload %s (team: %s)", 
executor.name, workload, team_name)
         else:
             # This case should not happen unless some (as of now unknown) edge 
case occurs or direct DB
             # modification, since the DAG parser will validate the tasks in 
the DAG and ensure the executor
             # they request is available and if not, disallow the DAG to be 
scheduled.
             # Keeping this exception handling because this is a critical issue 
if we do somehow find
             # ourselves here and the user should get some feedback about that.
-            self.log.warning("Executor, %s, was not found but a Task was 
configured to use it", ti.executor)
+            self.log.warning(
+                "Executor, %s, was not found but a workload was configured to 
use it",

Review Comment:
   Maybe "Task or Callback" instead of "workload"? Will users know what a 
workload is? Here and elsewhere, anytime it's user facing I'm not sure we want 
to use "workload", at least not without a considered marketing push to make 
users aware of what it is? Or maybe they already are and I'm being too cautious 
😆 



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -979,6 +985,70 @@ def _critical_section_enqueue_task_instances(self, 
session: Session) -> int:
 
         return len(queued_tis)
 
+    def _enqueue_executor_callbacks(self, session: Session) -> None:
+        """
+        Enqueue ExecutorCallback workloads to executors.
+
+        Similar to _enqueue_task_instances, but for callbacks that need to run 
on executors.
+        Queries for QUEUED ExecutorCallback instances and routes them to the 
appropriate executor.
+
+        :param session: The database session
+        """
+        num_occupied_slots = sum(executor.slots_occupied for executor in 
self.job.executors)
+        max_callbacks = conf.getint("core", "parallelism") - num_occupied_slots
+
+        if max_callbacks <= 0:
+            self.log.debug("No available slots for callbacks; all executors at 
capacity")
+            return
+
+        queued_callbacks = session.scalars(
+            select(ExecutorCallback)
+            .where(ExecutorCallback.type == CallbackType.EXECUTOR)
+            .where(ExecutorCallback.state == CallbackState.QUEUED)
+            .order_by(ExecutorCallback.priority_weight.desc())
+            .limit(max_callbacks)
+        ).all()
+
+        if not queued_callbacks:
+            return
+
+        # Route callbacks to executors using the generalized routing method
+        executor_to_callbacks = self._executor_to_workloads(queued_callbacks, 
session)
+
+        # Enqueue callbacks for each executor
+        for executor, callbacks in executor_to_callbacks.items():
+            for callback in callbacks:
+                if not isinstance(callback, ExecutorCallback):
+                    # Can't happen since we queried ExecutorCallback, but 
satisfies mypy.
+                    continue
+                dag_run = None
+                if isinstance(callback.data, dict) and "dag_run_id" in 
callback.data:
+                    dag_run_id = callback.data["dag_run_id"]
+                    dag_run = session.get(DagRun, dag_run_id)
+                elif isinstance(callback.data, dict) and "dag_id" in 
callback.data:
+                    # Fallback: try to find the latest dag_run for the dag_id
+                    dag_id = callback.data["dag_id"]
+                    dag_run = session.scalars(
+                        select(DagRun)
+                        .where(DagRun.dag_id == dag_id)
+                        .order_by(DagRun.execution_date.desc())
+                        .limit(1)
+                    ).first()
+
+                if dag_run is None:
+                    self.log.warning("Could not find DagRun for callback %s", 
callback.id)
+                    continue
+
+                workload = workloads.ExecuteCallback.make(
+                    callback=callback,
+                    dag_run=dag_run,
+                    generator=executor.jwt_generator,
+                )
+
+                executor.queue_workload(workload, session=session)
+                callback.state = CallbackState.RUNNING

Review Comment:
   Looks better now, it's going to queued once we queue it with the executor 
instead of directly to running. This is more correct because there is no 
guarantee that the executor is able to get the workload running.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to