o-nikolas commented on code in PR #61153:
URL: https://github.com/apache/airflow/pull/61153#discussion_r2734269009
##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -53,6 +53,16 @@
TaskInstanceStateType = tuple[workloads.TaskInstance, TaskInstanceState,
Exception | None]
+def _get_executor_process_title_prefix(team_name: str | None) -> str:
Review Comment:
These multi team related changes probably shouldn't be showing up in this
diff right?
##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -227,10 +229,46 @@ def log_task_event(self, *, event: str, extra: str,
ti_key: TaskInstanceKey):
self._task_event_logs.append(Log(event=event, task_instance=ti_key,
extra=extra))
def queue_workload(self, workload: workloads.All, session: Session) ->
None:
- if not isinstance(workload, workloads.ExecuteTask):
+ if isinstance(workload, workloads.ExecuteTask):
+ ti = workload.ti
+ self.queued_tasks[ti.key] = workload
+ elif isinstance(workload, workloads.ExecuteCallback):
+ self.queued_callbacks[workload.callback.id] = workload
+ else:
raise ValueError(f"Un-handled workload kind
{type(workload).__name__!r} in {type(self).__name__}")
- ti = workload.ti
- self.queued_tasks[ti.key] = workload
+
+ def _get_workloads_to_schedule(
+ self, open_slots: int
+ ) -> list[tuple[TaskInstanceKey | str, workloads.All]]:
+ """
+ Select and return the next batch of workloads to schedule, respecting
priority policy.
+
+ Priority Policy: Callbacks are scheduled before tasks (callbacks
complete existing work).
+ Callbacks are processed in FIFO order. Tasks are sorted by
priority_weight (higher priority first).
+
+ :param open_slots: Number of available execution slots
+ """
+ workloads_to_schedule: list[tuple[TaskInstanceKey | str,
workloads.All]] = []
+
+ if self.queued_callbacks:
+ for key, workload in self.queued_callbacks.items():
+ if len(workloads_to_schedule) >= open_slots:
+ break
+ workloads_to_schedule.append((key, workload))
+
+ remaining_slots = open_slots - len(workloads_to_schedule)
+ if remaining_slots and self.queued_tasks:
Review Comment:
nit: remaining_slots is only used once. You could just put something like
`open_slots > len(workloads_to_schedule)` in the if expression
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -883,6 +883,103 @@ def _critical_section_enqueue_task_instances(self,
session: Session) -> int:
return len(queued_tis)
+ def _enqueue_executor_callbacks(self, session: Session) -> None:
+ """
+ Enqueue ExecutorCallback workloads to executors.
+
+ Similar to _enqueue_task_instances, but for callbacks that need to run
on executors.
+ Queries for QUEUED ExecutorCallback instances and routes them to the
appropriate executor.
+
+ :param session: The database session
+ """
+ # Query for QUEUED ExecutorCallback instances
+ from airflow.models.callback import CallbackType
+
+ queued_callbacks = session.scalars(
+ select(ExecutorCallback)
+ .where(ExecutorCallback.type == CallbackType.EXECUTOR)
+ .where(ExecutorCallback.state == CallbackState.QUEUED)
+ .order_by(ExecutorCallback.priority_weight.desc())
+ .limit(conf.getint("scheduler", "max_callback_workloads_per_loop",
fallback=100))
Review Comment:
Here and down below in the final loop over executors/workloads we're just
queueing a static amount each time. But it is the schedulers responsibility now
(in the world of multiple executors and now multi-team) to ensure we don't ever
schedule more tasks (now, workloads) than we have executor slots for. You can
see how we do this math for tasks currently here:
https://github.com/apache/airflow/blob/056e24e023a32dbcd5d0be9da45dc4eede770916/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L854-L871
We need to ensure that math now includes callbacks because they also take up
worker slots.
I think this will work for now, as long as this method is always called
before the critical section. Since callbacks will increase occupied slots in
the executors which should be taken into account in the critical section. BUT
this code here needs to ensure it doesn't over subscribe the executors. So some
similar logic to the critical section needs to be done here. E.g. we're taking
a flat 100 here (by default anyway) but there may only be 20 free executor
slots.
##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -227,10 +229,46 @@ def log_task_event(self, *, event: str, extra: str,
ti_key: TaskInstanceKey):
self._task_event_logs.append(Log(event=event, task_instance=ti_key,
extra=extra))
def queue_workload(self, workload: workloads.All, session: Session) ->
None:
- if not isinstance(workload, workloads.ExecuteTask):
+ if isinstance(workload, workloads.ExecuteTask):
+ ti = workload.ti
+ self.queued_tasks[ti.key] = workload
+ elif isinstance(workload, workloads.ExecuteCallback):
+ self.queued_callbacks[workload.callback.id] = workload
+ else:
raise ValueError(f"Un-handled workload kind
{type(workload).__name__!r} in {type(self).__name__}")
- ti = workload.ti
- self.queued_tasks[ti.key] = workload
+
+ def _get_workloads_to_schedule(
+ self, open_slots: int
+ ) -> list[tuple[TaskInstanceKey | str, workloads.All]]:
+ """
+ Select and return the next batch of workloads to schedule, respecting
priority policy.
+
+ Priority Policy: Callbacks are scheduled before tasks (callbacks
complete existing work).
+ Callbacks are processed in FIFO order. Tasks are sorted by
priority_weight (higher priority first).
+
+ :param open_slots: Number of available execution slots
+ """
+ workloads_to_schedule: list[tuple[TaskInstanceKey | str,
workloads.All]] = []
+
+ if self.queued_callbacks:
+ for key, workload in self.queued_callbacks.items():
+ if len(workloads_to_schedule) >= open_slots:
+ break
+ workloads_to_schedule.append((key, workload))
+
+ remaining_slots = open_slots - len(workloads_to_schedule)
+ if remaining_slots and self.queued_tasks:
+ sorted_tasks = sorted(
Review Comment:
Why didn't you use the existing `order_queued_tasks_by_priority()` method?
##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -402,9 +440,20 @@ def trigger_tasks(self, open_slots: int) -> None:
carrier = Trace.inject()
ti.context_carrier = carrier
- workload_list.append(item)
+ workload_list.append(workload)
+
if workload_list:
- self._process_workloads(workload_list)
+ try:
+ self._process_workloads(workload_list)
+ except AttributeError as e:
+ if any(isinstance(workload, workloads.ExecuteCallback) for
workload in workload_list):
Review Comment:
If we know exactly how to check for the unsupported use case, why don't we
just check before trying to call `__process_workloads()`? Also, we can check
much earlier because we can check much earlier in the queueing of workloads
because we can check the `supports_callback` attr?
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -883,6 +883,103 @@ def _critical_section_enqueue_task_instances(self,
session: Session) -> int:
return len(queued_tis)
+ def _enqueue_executor_callbacks(self, session: Session) -> None:
+ """
+ Enqueue ExecutorCallback workloads to executors.
+
+ Similar to _enqueue_task_instances, but for callbacks that need to run
on executors.
+ Queries for QUEUED ExecutorCallback instances and routes them to the
appropriate executor.
+
+ :param session: The database session
+ """
+ # Query for QUEUED ExecutorCallback instances
+ from airflow.models.callback import CallbackType
+
+ queued_callbacks = session.scalars(
+ select(ExecutorCallback)
+ .where(ExecutorCallback.type == CallbackType.EXECUTOR)
+ .where(ExecutorCallback.state == CallbackState.QUEUED)
+ .order_by(ExecutorCallback.priority_weight.desc())
+ .limit(conf.getint("scheduler", "max_callback_workloads_per_loop",
fallback=100))
+ ).all()
+
+ if not queued_callbacks:
+ return
+
+ # Group callbacks by executor (based on callback executor attribute or
default executor)
+ executor_to_callbacks: dict[BaseExecutor, list[ExecutorCallback]] =
defaultdict(list)
+
+ for callback in queued_callbacks:
+ # Get the executor name from callback data if specified
+ executor_name = None
+ if isinstance(callback.data, dict):
+ executor_name = callback.data.get("executor")
+
+ # Find the appropriate executor
+ executor = None
+ if executor_name:
+ # Find executor by name - try multiple matching strategies
+ for exec in self.job.executors:
+ # Match by class name (e.g., "CeleryExecutor")
+ if exec.__class__.__name__ == executor_name:
+ executor = exec
+ break
+ # Match by executor name attribute if available
+ if hasattr(exec, "name") and exec.name and str(exec.name)
== executor_name:
+ executor = exec
+ break
+ # Match by executor name attribute if available
+ if hasattr(exec, "executor_name") and exec.executor_name
== executor_name:
+ executor = exec
+ break
+
+ # Default to first executor if no specific executor found
+ if executor is None:
+ executor = self.job.executors[0] if self.job.executors else
None
+
+ if executor is None:
+ self.log.warning("No executor available for callback %s",
callback.id)
+ continue
Review Comment:
This is also missing multi-team logic which we need to stay up to date with
at this point. It also is duplicating a lot of the work in
`_try_to_load_executor` which is made to do exactly this kind of lookup. I
think it's going to save you a bunch of effort and future maintenance to update
`_try_to_load_executor` to support workloads generally instead of just ti
(basically exactly the type of coding you did in the base executor and local
executor changes).
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -883,6 +883,103 @@ def _critical_section_enqueue_task_instances(self,
session: Session) -> int:
return len(queued_tis)
+ def _enqueue_executor_callbacks(self, session: Session) -> None:
+ """
+ Enqueue ExecutorCallback workloads to executors.
+
+ Similar to _enqueue_task_instances, but for callbacks that need to run
on executors.
+ Queries for QUEUED ExecutorCallback instances and routes them to the
appropriate executor.
+
+ :param session: The database session
+ """
+ # Query for QUEUED ExecutorCallback instances
+ from airflow.models.callback import CallbackType
+
+ queued_callbacks = session.scalars(
+ select(ExecutorCallback)
+ .where(ExecutorCallback.type == CallbackType.EXECUTOR)
+ .where(ExecutorCallback.state == CallbackState.QUEUED)
+ .order_by(ExecutorCallback.priority_weight.desc())
+ .limit(conf.getint("scheduler", "max_callback_workloads_per_loop",
fallback=100))
+ ).all()
+
+ if not queued_callbacks:
+ return
+
+ # Group callbacks by executor (based on callback executor attribute or
default executor)
+ executor_to_callbacks: dict[BaseExecutor, list[ExecutorCallback]] =
defaultdict(list)
+
+ for callback in queued_callbacks:
+ # Get the executor name from callback data if specified
+ executor_name = None
+ if isinstance(callback.data, dict):
+ executor_name = callback.data.get("executor")
+
+ # Find the appropriate executor
+ executor = None
+ if executor_name:
+ # Find executor by name - try multiple matching strategies
+ for exec in self.job.executors:
+ # Match by class name (e.g., "CeleryExecutor")
+ if exec.__class__.__name__ == executor_name:
+ executor = exec
+ break
+ # Match by executor name attribute if available
+ if hasattr(exec, "name") and exec.name and str(exec.name)
== executor_name:
+ executor = exec
+ break
+ # Match by executor name attribute if available
+ if hasattr(exec, "executor_name") and exec.executor_name
== executor_name:
+ executor = exec
+ break
+
+ # Default to first executor if no specific executor found
+ if executor is None:
+ executor = self.job.executors[0] if self.job.executors else
None
+
+ if executor is None:
+ self.log.warning("No executor available for callback %s",
callback.id)
+ continue
+
+ executor_to_callbacks[executor].append(callback)
Review Comment:
Similar to the above, there is already a `_executor_to_tis` which is doing
exactly this but for tis, could be generalized.
##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -87,25 +94,31 @@ def _run_worker(
# Received poison pill, no more tasks to run
return
- if not isinstance(workload, workloads.ExecuteTask):
- raise ValueError(f"LocalExecutor does not know how to handle
{type(workload)}")
-
# Decrement this as soon as we pick up a message off the queue
with unread_messages:
unread_messages.value -= 1
- key = None
- if ti := getattr(workload, "ti", None):
- key = ti.key
- else:
- raise TypeError(f"Don't know how to get ti key from
{type(workload).__name__}")
- try:
- _execute_work(log, workload, team_conf)
+ # Handle different workload types
+ if isinstance(workload, workloads.ExecuteTask):
+ key = workload.ti.key
+ try:
+ _execute_work(log, workload, team_conf)
+ output.put((key, TaskInstanceState.SUCCESS, None))
+ except Exception as e:
+ log.exception("Task execution failed.")
+ output.put((key, TaskInstanceState.FAILED, e))
+
+ elif isinstance(workload, workloads.ExecuteCallback):
+ key = workload.callback.id
+ try:
+ _execute_callback(log, workload, team_conf)
+ output.put((key, TaskInstanceState.SUCCESS, None))
Review Comment:
Mostly just curious: We still use TaskInstanceState here even those these
are callbacks?
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -945,21 +1042,44 @@ def process_executor_events(
ti_primary_key_to_try_number_map: dict[tuple[str, str, str, int], int]
= {}
event_buffer = executor.get_event_buffer()
tis_with_right_state: list[TaskInstanceKey] = []
-
- # Report execution
- for ti_key, (state, _) in event_buffer.items():
- # We create map (dag_id, task_id, logical_date) -> in-memory
try_number
- ti_primary_key_to_try_number_map[ti_key.primary] =
ti_key.try_number
-
- cls.logger().info("Received executor event with state %s for task
instance %s", state, ti_key)
- if state in (
- TaskInstanceState.FAILED,
- TaskInstanceState.SUCCESS,
- TaskInstanceState.QUEUED,
- TaskInstanceState.RUNNING,
- TaskInstanceState.RESTARTING,
- ):
- tis_with_right_state.append(ti_key)
+ callback_keys_with_events: list[str] = []
+
+ # Report execution - handle both task and callback events
+ for key, (state, _) in event_buffer.items():
+ if isinstance(key, TaskInstanceKey):
+ ti_primary_key_to_try_number_map[key.primary] = key.try_number
+ cls.logger().info("Received executor event with state %s for
task instance %s", state, key)
+ if state in (
+ TaskInstanceState.FAILED,
+ TaskInstanceState.SUCCESS,
+ TaskInstanceState.QUEUED,
+ TaskInstanceState.RUNNING,
+ TaskInstanceState.RESTARTING,
+ ):
+ tis_with_right_state.append(key)
+ else:
+ # Callback event (key is string UUID)
+ cls.logger().info("Received executor event with state %s for
callback %s", state, key)
+ if state in (TaskInstanceState.FAILED,
TaskInstanceState.SUCCESS):
+ callback_keys_with_events.append(key)
+
+ # Handle callback completion events
+ for callback_id in callback_keys_with_events:
+ state, info = event_buffer.pop(callback_id)
+ callback = session.get(Callback, callback_id)
+ if callback:
+ # Note: We receive TaskInstanceState from executor
(SUCCESS/FAILED) but convert to CallbackState here.
+ # This is intentional - executor layer uses generic completion
states, scheduler converts to proper types.
+ if state == TaskInstanceState.SUCCESS:
Review Comment:
I think this is fine for now, but would be cool if Callbacks were fully
first class citizens in executors. Including executors reporting the right
state back.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]