This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 476dabe4d71 Add no-op _process_workloads to EdgeExecutor to improve
readability (#64236)
476dabe4d71 is described below
commit 476dabe4d71bc089993e092c3cb1b01e75681377
Author: Jeongwoo Do <[email protected]>
AuthorDate: Tue Mar 31 06:06:05 2026 +0900
Add no-op _process_workloads to EdgeExecutor to improve readability (#64236)
* Add no-op _process_workloads to EdgeExecutor to improve EdgeExecutor
readability
* add docs
---
airflow-core/docs/core-concepts/executor/index.rst | 1 +
.../edge3/src/airflow/providers/edge3/executors/edge_executor.py | 9 +++++++++
2 files changed, 10 insertions(+)
diff --git a/airflow-core/docs/core-concepts/executor/index.rst
b/airflow-core/docs/core-concepts/executor/index.rst
index 0600c6dd5a7..9420c55d84e 100644
--- a/airflow-core/docs/core-concepts/executor/index.rst
+++ b/airflow-core/docs/core-concepts/executor/index.rst
@@ -312,6 +312,7 @@ The following methods must be overridden at minimum to have
your executor suppor
* ``sync``: Sync will get called periodically during executor heartbeats.
Implement this method to update the state of the tasks which the executor knows
about. Optionally, attempting to execute queued tasks that have been received
from the scheduler.
* ``execute_async``: Executes a *workload* asynchronously. This method is
called (after a few layers) during executor heartbeat which is run periodically
by the scheduler. In practice, this method often just enqueues tasks into an
internal or external queue of tasks to be run (e.g. ``KubernetesExecutor``).
But can also execute the tasks directly as well (e.g. ``LocalExecutor``). This
will depend on the executor.
+* ``_process_workloads``: Processes a list of workloads that have been queued
via ``queue_workload``. This method is called during executor heartbeat and
defines how the executor handles the execution of workloads (e.g., queuing them
to workers, submitting to external systems, etc.).
Optional Interface Methods to Implement
diff --git
a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
index 22c93528fb2..9a88e6f145b 100644
--- a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
+++ b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
@@ -140,6 +140,15 @@ class EdgeExecutor(BaseExecutor):
)
)
+ def _process_workloads(self, workloads: Sequence[workloads.All]) -> None:
+ """
+ No-op: EdgeExecutor does not use the BaseExecutor workload pipeline.
+
+ EdgeExecutor handles task queuing directly in queue_workload() by
writing
+ to the EdgeJobModel database table, bypassing BaseExecutor's
queued_tasks.
+ Therefore, trigger_tasks() never accumulates workloads to pass here.
+ """
+
def _check_worker_liveness(self, session: Session) -> bool:
"""Reset worker state if heartbeat timed out."""
changed = False