Copilot commented on code in PR #63491:
URL: https://github.com/apache/airflow/pull/63491#discussion_r2963401490


##########
airflow-core/src/airflow/executors/workloads/task.py:
##########
@@ -71,7 +71,17 @@ class ExecuteTask(BaseDagBundleWorkload):
     ti: TaskInstanceDTO
     sentry_integration: str = ""
 
-    type: Literal["ExecuteTask"] = Field(init=False, default="ExecuteTask")
+    type: Literal[WorkloadType.EXECUTE_TASK] = Field(init=False, 
default=WorkloadType.EXECUTE_TASK)
+
+    @property
+    def queue_key(self) -> TaskInstanceKey:
+        """Return the task instance key as the queue key."""
+        return self.ti.key
+
+    @property
+    def sort_key(self) -> int:
+        """Return the task priority weight for sorting (lower = higher 
priority)."""
+        return self.ti.priority_weight

Review Comment:
   ExecuteTask.sort_key returns ti.priority_weight but the docstring says 
“lower = higher priority”. In Airflow core, higher priority_weight is treated 
as higher priority (e.g. scheduler orders by -priority_weight), so this either 
inverts priority or is misdocumented. To avoid unintended scheduling order 
changes, either invert the value returned here or adjust the ordering 
logic/docs to match Airflow’s priority semantics.
   ```suggestion
           return -self.ti.priority_weight
   ```



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py:
##########
@@ -98,7 +98,11 @@ def _task_event_logs(self, value):
     @property
     def queued_tasks(self) -> dict[TaskInstanceKey, Any]:
         """Return queued tasks from local and kubernetes executor."""
-        return self.local_executor.queued_tasks | 
self.kubernetes_executor.queued_tasks
+        queued_tasks = self.local_executor.queued_tasks.copy()
+        # TODO: fix this, there is misalignment between the types of 
queued_tasks so it is likely wrong
+        queued_tasks.update(self.kubernetes_executor.queued_tasks)  # type: 
ignore[arg-type]
+

Review Comment:
   This TODO indicates the merged queued_tasks view is probably incorrect and 
is currently masked with a type-ignore. If the value types differ between the 
wrapped executors, it’d be better to normalize the return type explicitly (e.g. 
cast both to dict[TaskInstanceKey, Any]) and/or add a small unit test asserting 
the merged mapping contains the expected keys/values for both executors, rather 
than leaving a “likely wrong” TODO in production code.



##########
providers/celery/tests/unit/celery/executors/test_celery_executor.py:
##########
@@ -166,9 +171,12 @@ def test_exception_propagation(self, caplog):
         assert FAKE_EXCEPTION_MSG in caplog.text, caplog.record_tuples
 
     
@mock.patch("airflow.providers.celery.executors.celery_executor.CeleryExecutor.sync")
-    
@mock.patch("airflow.providers.celery.executors.celery_executor.CeleryExecutor.trigger_tasks")
+    @mock.patch(
+        "airflow.providers.celery.executors.celery_executor.CeleryExecutor."
+        + ("trigger_workloads" if AIRFLOW_V_3_3_PLUS else "trigger_tasks")

Review Comment:
   The patch target is selected via AIRFLOW_V_3_3_PLUS, but Airflow core in 
this repo is 3.2.0 so this will resolve to trigger_tasks, which no longer 
exists after this refactor. This will fail test collection. Patch 
trigger_workloads unconditionally (or detect the attribute), or keep 
BaseExecutor.trigger_tasks as a deprecated alias until the version gate can be 
true.
   ```suggestion
           
"airflow.providers.celery.executors.celery_executor.CeleryExecutor.trigger_workloads",
   ```



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py:
##########
@@ -618,9 +618,12 @@ def test_run_next_pod_reconciliation_error(
 
     
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubeConfig")
     
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.sync")
-    @mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks")
+    @mock.patch(
+        "airflow.executors.base_executor.BaseExecutor."
+        + ("trigger_workloads" if AIRFLOW_V_3_3_PLUS else "trigger_tasks")
+    )

Review Comment:
   The patched method name is gated on AIRFLOW_V_3_3_PLUS, but this repo’s core 
Airflow version is currently 3.2.0 (so AIRFLOW_V_3_3_PLUS is False) while 
BaseExecutor no longer defines trigger_tasks. This will cause the patch 
decorator to fail at import/collection time. Patch trigger_workloads 
unconditionally here (or feature-detect with hasattr), or reintroduce 
trigger_tasks as a deprecated alias in BaseExecutor until the version boundary 
matches.
   ```suggestion
       
@mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_workloads")
   ```



##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -212,57 +214,70 @@ def __repr__(self):
         _repr += ")"
         return _repr
 
+    @property
+    def queued_tasks(self) -> dict:
+        """Backward-compat property: delegates to 
``executor_queues[WorkloadType.EXECUTE_TASK]``."""
+        warnings.warn(
+            "queued_tasks is deprecated. Use 
executor_queues[WorkloadType.EXECUTE_TASK] instead.",
+            RemovedInAirflow4Warning,
+            stacklevel=2,
+        )
+        return self.executor_queues[WorkloadType.EXECUTE_TASK]
+
+    @property
+    def queued_callbacks(self) -> dict:
+        """Backward-compat property: delegates to 
``executor_queues[WorkloadType.EXECUTE_CALLBACK]``."""
+        warnings.warn(
+            "queued_callbacks is deprecated. Use 
executor_queues[WorkloadType.EXECUTE_CALLBACK] instead.",
+            RemovedInAirflow4Warning,
+            stacklevel=2,
+        )
+        return self.executor_queues[WorkloadType.EXECUTE_CALLBACK]
+
+    @property
+    def supports_callbacks(self) -> bool:
+        """Backward-compat property: True if EXECUTE_CALLBACK is in 
supported_workload_types."""
+        warnings.warn(
+            "supports_callbacks is deprecated. "
+            "Use WorkloadType.EXECUTE_CALLBACK in supported_workload_types 
instead.",
+            RemovedInAirflow4Warning,
+            stacklevel=2,
+        )
+        return WorkloadType.EXECUTE_CALLBACK in self.supported_workload_types
+
     def start(self):  # pragma: no cover
         """Executors may need to get things started."""
 
     def log_task_event(self, *, event: str, extra: str, ti_key: 
TaskInstanceKey):
         """Add an event to the log table."""
         self._task_event_logs.append(Log(event=event, task_instance=ti_key, 
extra=extra))
 
-    def queue_workload(self, workload: workloads.All, session: Session) -> 
None:
-        if isinstance(workload, workloads.ExecuteTask):
-            ti = workload.ti
-            self.queued_tasks[ti.key] = workload
-        elif isinstance(workload, workloads.ExecuteCallback):
-            if not self.supports_callbacks:
-                raise NotImplementedError(
-                    f"{type(self).__name__} does not support ExecuteCallback 
workloads. "
-                    f"Set supports_callbacks = True and implement callback 
handling in _process_workloads(). "
-                    f"See LocalExecutor or CeleryExecutor for reference 
implementation."
-                )
-            self.queued_callbacks[workload.callback.id] = workload
-        else:
-            raise ValueError(
-                f"Un-handled workload type {type(workload).__name__!r} in 
{type(self).__name__}. "
-                f"Workload must be one of: ExecuteTask, ExecuteCallback."
+    def queue_workload(self, workload: QueueableWorkload, session: Session) -> 
None:
+        if workload.type not in self.supported_workload_types:
+            raise NotImplementedError(
+                f"{type(self).__name__} does not support {workload.type!r} 
workloads. "
+                f"Add {workload.type!r} to supported_workload_types and 
implement handling "
+                f"in _process_workloads()."
             )
+        self.executor_queues[workload.type][workload.queue_key] = workload
 
-    def _get_workloads_to_schedule(self, open_slots: int) -> 
list[tuple[WorkloadKey, workloads.All]]:
+    def _get_workloads_to_schedule(self, open_slots: int) -> 
list[tuple[WorkloadKey, QueueableWorkload]]:
         """
         Select and return the next batch of workloads to schedule, respecting 
priority policy.
 
-        Priority Policy: Callbacks are scheduled before tasks (callbacks 
complete existing work).
-        Callbacks are processed in FIFO order. Tasks are sorted by 
priority_weight (higher priority first).
+        Workloads are sorted by ``WORKLOAD_TYPE_TIER`` (tier assigned by 
workload type) first,
+        then by ``sort_key`` within the same tier.  Lower tier values are 
scheduled first;
+        within the same tier, lower ``sort_key`` values come first 
(``sort_key=0`` gives FIFO).
 
         :param open_slots: Number of available execution slots
         """
-        workloads_to_schedule: list[tuple[WorkloadKey, workloads.All]] = []
-
-        if self.queued_callbacks:
-            for key, workload in self.queued_callbacks.items():
-                if len(workloads_to_schedule) >= open_slots:
-                    break
-                workloads_to_schedule.append((key, workload))
-
-        if open_slots > len(workloads_to_schedule) and self.queued_tasks:
-            for task_key, task_workload in 
self.order_queued_tasks_by_priority():
-                if len(workloads_to_schedule) >= open_slots:
-                    break
-                workloads_to_schedule.append((task_key, task_workload))
+        all_workloads: list[tuple[WorkloadKey, QueueableWorkload]] = [
+            (key, workload) for queue in self.executor_queues.values() for 
key, workload in queue.items()
+        ]
+        all_workloads.sort(key=lambda item: 
(workloads.WORKLOAD_TYPE_TIER[item[1].type], item[1].sort_key))
+        return all_workloads[:open_slots]

Review Comment:
   _get_workloads_to_schedule indexes WORKLOAD_TYPE_TIER by workload.type, 
which will raise KeyError if a new supported workload type is queued but not 
registered in WORKLOAD_TYPE_TIER. Since this refactor aims to make adding 
workload types easier, validate workload.type is present in WORKLOAD_TYPE_TIER 
in queue_workload (and raise a clear error) or provide a safe default tier to 
avoid a later runtime failure during scheduling.



##########
airflow-core/src/airflow/executors/workloads/base.py:
##########
@@ -76,6 +94,23 @@ class BaseWorkloadSchema(BaseModel):
     def generate_token(sub_id: str, generator: JWTGenerator | None = None) -> 
str:
         return generator.generate({"sub": sub_id}) if generator else ""
 
+    @property
+    def queue_key(self) -> WorkloadKey:
+        """Return a unique key used to store/lookup this workload in the 
executor queue."""
+        raise NotImplementedError
+
+    @property
+    def sort_key(self) -> int:
+        """
+        Return the sort key for ordering workloads within the same tier.
+
+        The default of ``0`` gives FIFO behaviour (Python's stable sort 
preserves
+        insertion order among equal keys).  Override in subclasses that need
+        priority ordering within their tier — for example, ``ExecuteTask`` 
returns
+        ``self.ti.priority_weight`` so that higher-priority tasks are 
scheduled first.

Review Comment:
   The docstring example says ExecuteTask returns ti.priority_weight “so that 
higher-priority tasks are scheduled first”, but BaseExecutor sorts by lower 
sort_key values first. In core, higher priority_weight generally means higher 
priority, so either invert the value used for sorting (e.g. -priority_weight) 
or adjust the example/docs to match the actual ordering.
   ```suggestion
           Workloads are scheduled in ascending ``sort_key`` order, and Python's
           stable sort preserves insertion order among equal keys. The default 
of
           ``0`` therefore gives FIFO behaviour. Override in subclasses that 
need
           priority ordering within their tier — for example, an ``ExecuteTask``
           implementation can return ``-self.ti.priority_weight`` so that tasks
           with higher ``priority_weight`` are scheduled first.
   ```



##########
providers/celery/src/airflow/providers/celery/executors/celery_executor.py:
##########
@@ -92,18 +92,13 @@ class CeleryExecutor(BaseExecutor):
     """
 
     supports_ad_hoc_ti_run: bool = True
-    supports_callbacks: bool = True
+    supported_workload_types: frozenset[str] = frozenset({"ExecuteTask", 
"ExecuteCallback"})
     sentry_integration: str = 
"sentry_sdk.integrations.celery.CeleryIntegration"

Review Comment:
   supported_workload_types is being populated with raw strings here, while 
other executors use WorkloadType enum members. Since WorkloadType is the 
central registry introduced by this PR, using it consistently avoids subtle 
membership/hash issues and improves typing/readability (e.g. 
frozenset({WorkloadType.EXECUTE_TASK, WorkloadType.EXECUTE_CALLBACK})).



##########
providers/edge3/tests/unit/edge3/executors/test_edge_executor.py:
##########
@@ -53,7 +53,7 @@ def get_test_executor(self, pool_slots=1):
         ti.dag_run.run_id = key.run_id
         ti.dag_run.start_date = datetime(2021, 1, 1)
         executor = EdgeExecutor()
-        executor.queued_tasks = {key: [None, None, None, ti]}
+        executor.queued_tasks[key] = [None, None, None, ti]

Review Comment:
   This test now mutates executor.queued_tasks, which is a deprecated 
backward-compat property on BaseExecutor that emits RemovedInAirflow4Warning. 
To keep provider tests warning-free and aligned with the refactor, prefer 
writing directly to executor.executor_queues[WorkloadType.EXECUTE_TASK] (or 
using queue_workload with an ExecuteTask workload) instead of going through the 
deprecated property.



##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -354,27 +370,11 @@ def _emit_metrics(self, open_slots, num_running_tasks, 
num_queued_tasks):
             tags={"status": "running", "name": name},
         )
 
-    def order_queued_tasks_by_priority(self) -> list[tuple[TaskInstanceKey, 
workloads.ExecuteTask]]:
-        """
-        Orders the queued tasks by priority.
-
-        :return: List of workloads from the queued_tasks according to the 
priority.
-        """
-        if not self.queued_tasks:
-            return []
-
-        # V3 + new executor that supports workloads
-        return sorted(
-            self.queued_tasks.items(),
-            key=lambda x: x[1].ti.priority_weight,
-            reverse=False,
-        )
-
-    def trigger_tasks(self, open_slots: int) -> None:
+    def trigger_workloads(self, open_slots: int) -> None:
         """
-        Initiate async execution of queued workloads (tasks and callbacks), up 
to the number of available slots.
+        Initiate async execution of queued workloads, up to the number of 
available slots.
 
-        Callbacks are prioritized over tasks to complete existing work before 
starting new work.
+        Workloads are scheduled according to their ``WORKLOAD_TYPE_TIER`` and 
``sort_key``.

Review Comment:
   Renaming trigger_tasks to trigger_workloads removes the old method entirely, 
which breaks downstream/provider code (and your provider tests still reference 
trigger_tasks behind version gates). Consider keeping trigger_tasks as a 
deprecated alias that forwards to trigger_workloads (with 
RemovedInAirflow4Warning) for at least one release cycle, and only remove it 
once providers are updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to