Copilot commented on code in PR #63491:
URL: https://github.com/apache/airflow/pull/63491#discussion_r2963401490
##########
airflow-core/src/airflow/executors/workloads/task.py:
##########
@@ -71,7 +71,17 @@ class ExecuteTask(BaseDagBundleWorkload):
ti: TaskInstanceDTO
sentry_integration: str = ""
- type: Literal["ExecuteTask"] = Field(init=False, default="ExecuteTask")
+ type: Literal[WorkloadType.EXECUTE_TASK] = Field(init=False,
default=WorkloadType.EXECUTE_TASK)
+
+ @property
+ def queue_key(self) -> TaskInstanceKey:
+ """Return the task instance key as the queue key."""
+ return self.ti.key
+
+ @property
+ def sort_key(self) -> int:
+ """Return the task priority weight for sorting (lower = higher
priority)."""
+ return self.ti.priority_weight
Review Comment:
ExecuteTask.sort_key returns ti.priority_weight but the docstring says
“lower = higher priority”. In Airflow core, higher priority_weight is treated
as higher priority (e.g. scheduler orders by -priority_weight), so this either
inverts priority or is misdocumented. To avoid unintended scheduling order
changes, either invert the value returned here or adjust the ordering
logic/docs to match Airflow’s priority semantics.
```suggestion
return -self.ti.priority_weight
```
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py:
##########
@@ -98,7 +98,11 @@ def _task_event_logs(self, value):
@property
def queued_tasks(self) -> dict[TaskInstanceKey, Any]:
"""Return queued tasks from local and kubernetes executor."""
- return self.local_executor.queued_tasks |
self.kubernetes_executor.queued_tasks
+ queued_tasks = self.local_executor.queued_tasks.copy()
+ # TODO: fix this, there is misalignment between the types of
queued_tasks so it is likely wrong
+ queued_tasks.update(self.kubernetes_executor.queued_tasks) # type:
ignore[arg-type]
+
Review Comment:
This TODO indicates the merged queued_tasks view is probably incorrect and
is currently masked with a type-ignore. If the value types differ between the
wrapped executors, it’d be better to normalize the return type explicitly (e.g.
cast both to dict[TaskInstanceKey, Any]) and/or add a small unit test asserting
the merged mapping contains the expected keys/values for both executors, rather
than leaving a “likely wrong” TODO in production code.
##########
providers/celery/tests/unit/celery/executors/test_celery_executor.py:
##########
@@ -166,9 +171,12 @@ def test_exception_propagation(self, caplog):
assert FAKE_EXCEPTION_MSG in caplog.text, caplog.record_tuples
@mock.patch("airflow.providers.celery.executors.celery_executor.CeleryExecutor.sync")
-
@mock.patch("airflow.providers.celery.executors.celery_executor.CeleryExecutor.trigger_tasks")
+ @mock.patch(
+ "airflow.providers.celery.executors.celery_executor.CeleryExecutor."
+ + ("trigger_workloads" if AIRFLOW_V_3_3_PLUS else "trigger_tasks")
Review Comment:
The patch target is selected via AIRFLOW_V_3_3_PLUS, but Airflow core in
this repo is 3.2.0 so this will resolve to trigger_tasks, which no longer
exists after this refactor. This will fail test collection. Patch
trigger_workloads unconditionally (or detect the attribute), or keep
BaseExecutor.trigger_tasks as a deprecated alias until the version gate can be
true.
```suggestion
"airflow.providers.celery.executors.celery_executor.CeleryExecutor.trigger_workloads",
```
##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py:
##########
@@ -618,9 +618,12 @@ def test_run_next_pod_reconciliation_error(
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubeConfig")
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.sync")
- @mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks")
+ @mock.patch(
+ "airflow.executors.base_executor.BaseExecutor."
+ + ("trigger_workloads" if AIRFLOW_V_3_3_PLUS else "trigger_tasks")
+ )
Review Comment:
The patched method name is gated on AIRFLOW_V_3_3_PLUS, but this repo’s core
Airflow version is currently 3.2.0 (so AIRFLOW_V_3_3_PLUS is False) while
BaseExecutor no longer defines trigger_tasks. This will cause the patch
decorator to fail at import/collection time. Patch trigger_workloads
unconditionally here (or feature-detect with hasattr), or reintroduce
trigger_tasks as a deprecated alias in BaseExecutor until the version boundary
matches.
```suggestion
@mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_workloads")
```
##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -212,57 +214,70 @@ def __repr__(self):
_repr += ")"
return _repr
+ @property
+ def queued_tasks(self) -> dict:
+ """Backward-compat property: delegates to
``executor_queues[WorkloadType.EXECUTE_TASK]``."""
+ warnings.warn(
+ "queued_tasks is deprecated. Use
executor_queues[WorkloadType.EXECUTE_TASK] instead.",
+ RemovedInAirflow4Warning,
+ stacklevel=2,
+ )
+ return self.executor_queues[WorkloadType.EXECUTE_TASK]
+
+ @property
+ def queued_callbacks(self) -> dict:
+ """Backward-compat property: delegates to
``executor_queues[WorkloadType.EXECUTE_CALLBACK]``."""
+ warnings.warn(
+ "queued_callbacks is deprecated. Use
executor_queues[WorkloadType.EXECUTE_CALLBACK] instead.",
+ RemovedInAirflow4Warning,
+ stacklevel=2,
+ )
+ return self.executor_queues[WorkloadType.EXECUTE_CALLBACK]
+
+ @property
+ def supports_callbacks(self) -> bool:
+ """Backward-compat property: True if EXECUTE_CALLBACK is in
supported_workload_types."""
+ warnings.warn(
+ "supports_callbacks is deprecated. "
+ "Use WorkloadType.EXECUTE_CALLBACK in supported_workload_types
instead.",
+ RemovedInAirflow4Warning,
+ stacklevel=2,
+ )
+ return WorkloadType.EXECUTE_CALLBACK in self.supported_workload_types
+
def start(self): # pragma: no cover
"""Executors may need to get things started."""
def log_task_event(self, *, event: str, extra: str, ti_key:
TaskInstanceKey):
"""Add an event to the log table."""
self._task_event_logs.append(Log(event=event, task_instance=ti_key,
extra=extra))
- def queue_workload(self, workload: workloads.All, session: Session) ->
None:
- if isinstance(workload, workloads.ExecuteTask):
- ti = workload.ti
- self.queued_tasks[ti.key] = workload
- elif isinstance(workload, workloads.ExecuteCallback):
- if not self.supports_callbacks:
- raise NotImplementedError(
- f"{type(self).__name__} does not support ExecuteCallback
workloads. "
- f"Set supports_callbacks = True and implement callback
handling in _process_workloads(). "
- f"See LocalExecutor or CeleryExecutor for reference
implementation."
- )
- self.queued_callbacks[workload.callback.id] = workload
- else:
- raise ValueError(
- f"Un-handled workload type {type(workload).__name__!r} in
{type(self).__name__}. "
- f"Workload must be one of: ExecuteTask, ExecuteCallback."
+ def queue_workload(self, workload: QueueableWorkload, session: Session) ->
None:
+ if workload.type not in self.supported_workload_types:
+ raise NotImplementedError(
+ f"{type(self).__name__} does not support {workload.type!r}
workloads. "
+ f"Add {workload.type!r} to supported_workload_types and
implement handling "
+ f"in _process_workloads()."
)
+ self.executor_queues[workload.type][workload.queue_key] = workload
- def _get_workloads_to_schedule(self, open_slots: int) ->
list[tuple[WorkloadKey, workloads.All]]:
+ def _get_workloads_to_schedule(self, open_slots: int) ->
list[tuple[WorkloadKey, QueueableWorkload]]:
"""
Select and return the next batch of workloads to schedule, respecting
priority policy.
- Priority Policy: Callbacks are scheduled before tasks (callbacks
complete existing work).
- Callbacks are processed in FIFO order. Tasks are sorted by
priority_weight (higher priority first).
+ Workloads are sorted by ``WORKLOAD_TYPE_TIER`` (tier assigned by
workload type) first,
+ then by ``sort_key`` within the same tier. Lower tier values are
scheduled first;
+ within the same tier, lower ``sort_key`` values come first
(``sort_key=0`` gives FIFO).
:param open_slots: Number of available execution slots
"""
- workloads_to_schedule: list[tuple[WorkloadKey, workloads.All]] = []
-
- if self.queued_callbacks:
- for key, workload in self.queued_callbacks.items():
- if len(workloads_to_schedule) >= open_slots:
- break
- workloads_to_schedule.append((key, workload))
-
- if open_slots > len(workloads_to_schedule) and self.queued_tasks:
- for task_key, task_workload in
self.order_queued_tasks_by_priority():
- if len(workloads_to_schedule) >= open_slots:
- break
- workloads_to_schedule.append((task_key, task_workload))
+ all_workloads: list[tuple[WorkloadKey, QueueableWorkload]] = [
+ (key, workload) for queue in self.executor_queues.values() for
key, workload in queue.items()
+ ]
+ all_workloads.sort(key=lambda item:
(workloads.WORKLOAD_TYPE_TIER[item[1].type], item[1].sort_key))
+ return all_workloads[:open_slots]
Review Comment:
_get_workloads_to_schedule indexes WORKLOAD_TYPE_TIER by workload.type,
which will raise KeyError if a new supported workload type is queued but not
registered in WORKLOAD_TYPE_TIER. Since this refactor aims to make adding
workload types easier, validate workload.type is present in WORKLOAD_TYPE_TIER
in queue_workload (and raise a clear error) or provide a safe default tier to
avoid a later runtime failure during scheduling.
##########
airflow-core/src/airflow/executors/workloads/base.py:
##########
@@ -76,6 +94,23 @@ class BaseWorkloadSchema(BaseModel):
def generate_token(sub_id: str, generator: JWTGenerator | None = None) ->
str:
return generator.generate({"sub": sub_id}) if generator else ""
+ @property
+ def queue_key(self) -> WorkloadKey:
+ """Return a unique key used to store/lookup this workload in the
executor queue."""
+ raise NotImplementedError
+
+ @property
+ def sort_key(self) -> int:
+ """
+ Return the sort key for ordering workloads within the same tier.
+
+ The default of ``0`` gives FIFO behaviour (Python's stable sort
preserves
+ insertion order among equal keys). Override in subclasses that need
+ priority ordering within their tier — for example, ``ExecuteTask``
returns
+ ``self.ti.priority_weight`` so that higher-priority tasks are
scheduled first.
Review Comment:
The docstring example says ExecuteTask returns ti.priority_weight “so that
higher-priority tasks are scheduled first”, but BaseExecutor sorts by lower
sort_key values first. In core, higher priority_weight generally means higher
priority, so either invert the value used for sorting (e.g. -priority_weight)
or adjust the example/docs to match the actual ordering.
```suggestion
Workloads are scheduled in ascending ``sort_key`` order, and Python's
stable sort preserves insertion order among equal keys. The default
of
``0`` therefore gives FIFO behaviour. Override in subclasses that
need
priority ordering within their tier — for example, an ``ExecuteTask``
implementation can return ``-self.ti.priority_weight`` so that tasks
with higher ``priority_weight`` are scheduled first.
```
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor.py:
##########
@@ -92,18 +92,13 @@ class CeleryExecutor(BaseExecutor):
"""
supports_ad_hoc_ti_run: bool = True
- supports_callbacks: bool = True
+ supported_workload_types: frozenset[str] = frozenset({"ExecuteTask",
"ExecuteCallback"})
sentry_integration: str =
"sentry_sdk.integrations.celery.CeleryIntegration"
Review Comment:
supported_workload_types is being populated with raw strings here, while
other executors use WorkloadType enum members. Since WorkloadType is the
central registry introduced by this PR, using it consistently avoids subtle
membership/hash issues and improves typing/readability (e.g.
frozenset({WorkloadType.EXECUTE_TASK, WorkloadType.EXECUTE_CALLBACK})).
##########
providers/edge3/tests/unit/edge3/executors/test_edge_executor.py:
##########
@@ -53,7 +53,7 @@ def get_test_executor(self, pool_slots=1):
ti.dag_run.run_id = key.run_id
ti.dag_run.start_date = datetime(2021, 1, 1)
executor = EdgeExecutor()
- executor.queued_tasks = {key: [None, None, None, ti]}
+ executor.queued_tasks[key] = [None, None, None, ti]
Review Comment:
This test now mutates executor.queued_tasks, which is a deprecated
backward-compat property on BaseExecutor that emits RemovedInAirflow4Warning.
To keep provider tests warning-free and aligned with the refactor, prefer
writing directly to executor.executor_queues[WorkloadType.EXECUTE_TASK] (or
using queue_workload with an ExecuteTask workload) instead of going through the
deprecated property.
##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -354,27 +370,11 @@ def _emit_metrics(self, open_slots, num_running_tasks,
num_queued_tasks):
tags={"status": "running", "name": name},
)
- def order_queued_tasks_by_priority(self) -> list[tuple[TaskInstanceKey,
workloads.ExecuteTask]]:
- """
- Orders the queued tasks by priority.
-
- :return: List of workloads from the queued_tasks according to the
priority.
- """
- if not self.queued_tasks:
- return []
-
- # V3 + new executor that supports workloads
- return sorted(
- self.queued_tasks.items(),
- key=lambda x: x[1].ti.priority_weight,
- reverse=False,
- )
-
- def trigger_tasks(self, open_slots: int) -> None:
+ def trigger_workloads(self, open_slots: int) -> None:
"""
- Initiate async execution of queued workloads (tasks and callbacks), up
to the number of available slots.
+ Initiate async execution of queued workloads, up to the number of
available slots.
- Callbacks are prioritized over tasks to complete existing work before
starting new work.
+ Workloads are scheduled according to their ``WORKLOAD_TYPE_TIER`` and
``sort_key``.
Review Comment:
Renaming trigger_tasks to trigger_workloads removes the old method entirely,
which breaks downstream/provider code (and your provider tests still reference
trigger_tasks behind version gates). Consider keeping trigger_tasks as a
deprecated alias that forwards to trigger_workloads (with
RemovedInAirflow4Warning) for at least one release cycle, and only remove it
once providers are updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]