o-nikolas commented on code in PR #61153:
URL: https://github.com/apache/airflow/pull/61153#discussion_r2734269009


##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -53,6 +53,16 @@
     TaskInstanceStateType = tuple[workloads.TaskInstance, TaskInstanceState, 
Exception | None]
 
 
+def _get_executor_process_title_prefix(team_name: str | None) -> str:

Review Comment:
   These multi team related changes probably shouldn't be showing up in this 
diff right?



##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -227,10 +229,46 @@ def log_task_event(self, *, event: str, extra: str, 
ti_key: TaskInstanceKey):
         self._task_event_logs.append(Log(event=event, task_instance=ti_key, 
extra=extra))
 
     def queue_workload(self, workload: workloads.All, session: Session) -> 
None:
-        if not isinstance(workload, workloads.ExecuteTask):
+        if isinstance(workload, workloads.ExecuteTask):
+            ti = workload.ti
+            self.queued_tasks[ti.key] = workload
+        elif isinstance(workload, workloads.ExecuteCallback):
+            self.queued_callbacks[workload.callback.id] = workload
+        else:
             raise ValueError(f"Un-handled workload kind 
{type(workload).__name__!r} in {type(self).__name__}")
-        ti = workload.ti
-        self.queued_tasks[ti.key] = workload
+
+    def _get_workloads_to_schedule(
+        self, open_slots: int
+    ) -> list[tuple[TaskInstanceKey | str, workloads.All]]:
+        """
+        Select and return the next batch of workloads to schedule, respecting 
priority policy.
+
+        Priority Policy: Callbacks are scheduled before tasks (callbacks 
complete existing work).
+        Callbacks are processed in FIFO order. Tasks are sorted by 
priority_weight (higher priority first).
+
+        :param open_slots: Number of available execution slots
+        """
+        workloads_to_schedule: list[tuple[TaskInstanceKey | str, 
workloads.All]] = []
+
+        if self.queued_callbacks:
+            for key, workload in self.queued_callbacks.items():
+                if len(workloads_to_schedule) >= open_slots:
+                    break
+                workloads_to_schedule.append((key, workload))
+
+        remaining_slots = open_slots - len(workloads_to_schedule)
+        if remaining_slots and self.queued_tasks:

Review Comment:
   nit: remaining_slots is only used once. You could just put something like 
`open_slots > len(workloads_to_schedule)` in the if expression



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -883,6 +883,103 @@ def _critical_section_enqueue_task_instances(self, 
session: Session) -> int:
 
         return len(queued_tis)
 
+    def _enqueue_executor_callbacks(self, session: Session) -> None:
+        """
+        Enqueue ExecutorCallback workloads to executors.
+
+        Similar to _enqueue_task_instances, but for callbacks that need to run 
on executors.
+        Queries for QUEUED ExecutorCallback instances and routes them to the 
appropriate executor.
+
+        :param session: The database session
+        """
+        # Query for QUEUED ExecutorCallback instances
+        from airflow.models.callback import CallbackType
+
+        queued_callbacks = session.scalars(
+            select(ExecutorCallback)
+            .where(ExecutorCallback.type == CallbackType.EXECUTOR)
+            .where(ExecutorCallback.state == CallbackState.QUEUED)
+            .order_by(ExecutorCallback.priority_weight.desc())
+            .limit(conf.getint("scheduler", "max_callback_workloads_per_loop", 
fallback=100))

Review Comment:
   Here and down below in the final loop over executors/workloads we're just 
queueing a static amount each time. But it is the schedulers responsibility now 
(in the world of multiple executors and now multi-team) to ensure we don't ever 
schedule more tasks (now, workloads) than we have executor slots for. You can 
see how we do this math for tasks currently here:
   
   
https://github.com/apache/airflow/blob/056e24e023a32dbcd5d0be9da45dc4eede770916/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L854-L871
   
   We need to ensure that math now includes callbacks because they also take up 
worker slots.
   
   I think this will work for now, as long as this method is always called 
before the critical section. Since callbacks will increase occupied slots in 
the executors which should be taken into account in the critical section. BUT 
this code here needs to ensure it doesn't over subscribe the executors. So some 
similar logic to the critical section needs to be done here. E.g. we're taking 
a flat 100 here (by default anyway) but there may only be 20 free executor 
slots.



##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -227,10 +229,46 @@ def log_task_event(self, *, event: str, extra: str, 
ti_key: TaskInstanceKey):
         self._task_event_logs.append(Log(event=event, task_instance=ti_key, 
extra=extra))
 
     def queue_workload(self, workload: workloads.All, session: Session) -> 
None:
-        if not isinstance(workload, workloads.ExecuteTask):
+        if isinstance(workload, workloads.ExecuteTask):
+            ti = workload.ti
+            self.queued_tasks[ti.key] = workload
+        elif isinstance(workload, workloads.ExecuteCallback):
+            self.queued_callbacks[workload.callback.id] = workload
+        else:
             raise ValueError(f"Un-handled workload kind 
{type(workload).__name__!r} in {type(self).__name__}")
-        ti = workload.ti
-        self.queued_tasks[ti.key] = workload
+
+    def _get_workloads_to_schedule(
+        self, open_slots: int
+    ) -> list[tuple[TaskInstanceKey | str, workloads.All]]:
+        """
+        Select and return the next batch of workloads to schedule, respecting 
priority policy.
+
+        Priority Policy: Callbacks are scheduled before tasks (callbacks 
complete existing work).
+        Callbacks are processed in FIFO order. Tasks are sorted by 
priority_weight (higher priority first).
+
+        :param open_slots: Number of available execution slots
+        """
+        workloads_to_schedule: list[tuple[TaskInstanceKey | str, 
workloads.All]] = []
+
+        if self.queued_callbacks:
+            for key, workload in self.queued_callbacks.items():
+                if len(workloads_to_schedule) >= open_slots:
+                    break
+                workloads_to_schedule.append((key, workload))
+
+        remaining_slots = open_slots - len(workloads_to_schedule)
+        if remaining_slots and self.queued_tasks:
+            sorted_tasks = sorted(

Review Comment:
   Why didn't you use the existing `order_queued_tasks_by_priority()` method?



##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -402,9 +440,20 @@ def trigger_tasks(self, open_slots: int) -> None:
                     carrier = Trace.inject()
                     ti.context_carrier = carrier
 
-                workload_list.append(item)
+            workload_list.append(workload)
+
         if workload_list:
-            self._process_workloads(workload_list)
+            try:
+                self._process_workloads(workload_list)
+            except AttributeError as e:
+                if any(isinstance(workload, workloads.ExecuteCallback) for 
workload in workload_list):

Review Comment:
   If we know exactly how to check for the unsupported use case, why don't we 
just check before trying to call `__process_workloads()`? Also, we can check 
much earlier because we can check much earlier in the queueing of workloads 
because we can check the `supports_callback` attr?



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -883,6 +883,103 @@ def _critical_section_enqueue_task_instances(self, 
session: Session) -> int:
 
         return len(queued_tis)
 
+    def _enqueue_executor_callbacks(self, session: Session) -> None:
+        """
+        Enqueue ExecutorCallback workloads to executors.
+
+        Similar to _enqueue_task_instances, but for callbacks that need to run 
on executors.
+        Queries for QUEUED ExecutorCallback instances and routes them to the 
appropriate executor.
+
+        :param session: The database session
+        """
+        # Query for QUEUED ExecutorCallback instances
+        from airflow.models.callback import CallbackType
+
+        queued_callbacks = session.scalars(
+            select(ExecutorCallback)
+            .where(ExecutorCallback.type == CallbackType.EXECUTOR)
+            .where(ExecutorCallback.state == CallbackState.QUEUED)
+            .order_by(ExecutorCallback.priority_weight.desc())
+            .limit(conf.getint("scheduler", "max_callback_workloads_per_loop", 
fallback=100))
+        ).all()
+
+        if not queued_callbacks:
+            return
+
+        # Group callbacks by executor (based on callback executor attribute or 
default executor)
+        executor_to_callbacks: dict[BaseExecutor, list[ExecutorCallback]] = 
defaultdict(list)
+
+        for callback in queued_callbacks:
+            # Get the executor name from callback data if specified
+            executor_name = None
+            if isinstance(callback.data, dict):
+                executor_name = callback.data.get("executor")
+
+            # Find the appropriate executor
+            executor = None
+            if executor_name:
+                # Find executor by name - try multiple matching strategies
+                for exec in self.job.executors:
+                    # Match by class name (e.g., "CeleryExecutor")
+                    if exec.__class__.__name__ == executor_name:
+                        executor = exec
+                        break
+                    # Match by executor name attribute if available
+                    if hasattr(exec, "name") and exec.name and str(exec.name) 
== executor_name:
+                        executor = exec
+                        break
+                    # Match by executor name attribute if available
+                    if hasattr(exec, "executor_name") and exec.executor_name 
== executor_name:
+                        executor = exec
+                        break
+
+            # Default to first executor if no specific executor found
+            if executor is None:
+                executor = self.job.executors[0] if self.job.executors else 
None
+
+            if executor is None:
+                self.log.warning("No executor available for callback %s", 
callback.id)
+                continue

Review Comment:
   This is also missing multi-team logic which we need to stay up to date with 
at this point. It also is duplicating a lot of the work in 
`_try_to_load_executor` which is made to do exactly this kind of lookup. I 
think it's going to save you a bunch of effort and future maintenance to update 
 `_try_to_load_executor` to support workloads  generally instead of just ti 
(basically exactly the type of coding you did in the base executor and local 
executor changes).



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -883,6 +883,103 @@ def _critical_section_enqueue_task_instances(self, 
session: Session) -> int:
 
         return len(queued_tis)
 
+    def _enqueue_executor_callbacks(self, session: Session) -> None:
+        """
+        Enqueue ExecutorCallback workloads to executors.
+
+        Similar to _enqueue_task_instances, but for callbacks that need to run 
on executors.
+        Queries for QUEUED ExecutorCallback instances and routes them to the 
appropriate executor.
+
+        :param session: The database session
+        """
+        # Query for QUEUED ExecutorCallback instances
+        from airflow.models.callback import CallbackType
+
+        queued_callbacks = session.scalars(
+            select(ExecutorCallback)
+            .where(ExecutorCallback.type == CallbackType.EXECUTOR)
+            .where(ExecutorCallback.state == CallbackState.QUEUED)
+            .order_by(ExecutorCallback.priority_weight.desc())
+            .limit(conf.getint("scheduler", "max_callback_workloads_per_loop", 
fallback=100))
+        ).all()
+
+        if not queued_callbacks:
+            return
+
+        # Group callbacks by executor (based on callback executor attribute or 
default executor)
+        executor_to_callbacks: dict[BaseExecutor, list[ExecutorCallback]] = 
defaultdict(list)
+
+        for callback in queued_callbacks:
+            # Get the executor name from callback data if specified
+            executor_name = None
+            if isinstance(callback.data, dict):
+                executor_name = callback.data.get("executor")
+
+            # Find the appropriate executor
+            executor = None
+            if executor_name:
+                # Find executor by name - try multiple matching strategies
+                for exec in self.job.executors:
+                    # Match by class name (e.g., "CeleryExecutor")
+                    if exec.__class__.__name__ == executor_name:
+                        executor = exec
+                        break
+                    # Match by executor name attribute if available
+                    if hasattr(exec, "name") and exec.name and str(exec.name) 
== executor_name:
+                        executor = exec
+                        break
+                    # Match by executor name attribute if available
+                    if hasattr(exec, "executor_name") and exec.executor_name 
== executor_name:
+                        executor = exec
+                        break
+
+            # Default to first executor if no specific executor found
+            if executor is None:
+                executor = self.job.executors[0] if self.job.executors else 
None
+
+            if executor is None:
+                self.log.warning("No executor available for callback %s", 
callback.id)
+                continue
+
+            executor_to_callbacks[executor].append(callback)

Review Comment:
   Similar to the above, there is already a `_executor_to_tis` which is doing 
exactly this but for tis, could be generalized.



##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -87,25 +94,31 @@ def _run_worker(
             # Received poison pill, no more tasks to run
             return
 
-        if not isinstance(workload, workloads.ExecuteTask):
-            raise ValueError(f"LocalExecutor does not know how to handle 
{type(workload)}")
-
         # Decrement this as soon as we pick up a message off the queue
         with unread_messages:
             unread_messages.value -= 1
-        key = None
-        if ti := getattr(workload, "ti", None):
-            key = ti.key
-        else:
-            raise TypeError(f"Don't know how to get ti key from 
{type(workload).__name__}")
 
-        try:
-            _execute_work(log, workload, team_conf)
+        # Handle different workload types
+        if isinstance(workload, workloads.ExecuteTask):
+            key = workload.ti.key
+            try:
+                _execute_work(log, workload, team_conf)
+                output.put((key, TaskInstanceState.SUCCESS, None))
+            except Exception as e:
+                log.exception("Task execution failed.")
+                output.put((key, TaskInstanceState.FAILED, e))
+
+        elif isinstance(workload, workloads.ExecuteCallback):
+            key = workload.callback.id
+            try:
+                _execute_callback(log, workload, team_conf)
+                output.put((key, TaskInstanceState.SUCCESS, None))

Review Comment:
   Mostly just curious: We still use TaskInstanceState here even those these 
are callbacks?



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -945,21 +1042,44 @@ def process_executor_events(
         ti_primary_key_to_try_number_map: dict[tuple[str, str, str, int], int] 
= {}
         event_buffer = executor.get_event_buffer()
         tis_with_right_state: list[TaskInstanceKey] = []
-
-        # Report execution
-        for ti_key, (state, _) in event_buffer.items():
-            # We create map (dag_id, task_id, logical_date) -> in-memory 
try_number
-            ti_primary_key_to_try_number_map[ti_key.primary] = 
ti_key.try_number
-
-            cls.logger().info("Received executor event with state %s for task 
instance %s", state, ti_key)
-            if state in (
-                TaskInstanceState.FAILED,
-                TaskInstanceState.SUCCESS,
-                TaskInstanceState.QUEUED,
-                TaskInstanceState.RUNNING,
-                TaskInstanceState.RESTARTING,
-            ):
-                tis_with_right_state.append(ti_key)
+        callback_keys_with_events: list[str] = []
+
+        # Report execution - handle both task and callback events
+        for key, (state, _) in event_buffer.items():
+            if isinstance(key, TaskInstanceKey):
+                ti_primary_key_to_try_number_map[key.primary] = key.try_number
+                cls.logger().info("Received executor event with state %s for 
task instance %s", state, key)
+                if state in (
+                    TaskInstanceState.FAILED,
+                    TaskInstanceState.SUCCESS,
+                    TaskInstanceState.QUEUED,
+                    TaskInstanceState.RUNNING,
+                    TaskInstanceState.RESTARTING,
+                ):
+                    tis_with_right_state.append(key)
+            else:
+                # Callback event (key is string UUID)
+                cls.logger().info("Received executor event with state %s for 
callback %s", state, key)
+                if state in (TaskInstanceState.FAILED, 
TaskInstanceState.SUCCESS):
+                    callback_keys_with_events.append(key)
+
+        # Handle callback completion events
+        for callback_id in callback_keys_with_events:
+            state, info = event_buffer.pop(callback_id)
+            callback = session.get(Callback, callback_id)
+            if callback:
+                # Note: We receive TaskInstanceState from executor 
(SUCCESS/FAILED) but convert to CallbackState here.
+                # This is intentional - executor layer uses generic completion 
states, scheduler converts to proper types.
+                if state == TaskInstanceState.SUCCESS:

Review Comment:
   I think this is fine for now, but would be cool if Callbacks were fully 
first class citizens in executors. Including executors reporting the right 
state back.
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to