This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1685c9f1e71 Fix misaligned `queued_tasks` types in hybrid executors
(#63744)
1685c9f1e71 is described below
commit 1685c9f1e713cc62fc137995399b192fbabf9c51
Author: ANKIT KUMAR <[email protected]>
AuthorDate: Wed Mar 18 02:36:35 2026 +0530
Fix misaligned `queued_tasks` types in hybrid executors (#63744)
The `queued_tasks` property in `LocalKubernetesExecutor` and
`CeleryKubernetesExecutor` incorrectly merged base executor tasks.
`dict.update()` modifies the dictionary in place which could lead to race
conditions during rapid dict updates. This commit replaces `dict.update()` with
the python dictionary union operator `|` for a safer and immutable map
combination.
Signed-off-by: Ankit Kumar <[email protected]>
Co-authored-by: Jens Scheffler <[email protected]>
Co-authored-by: Jarek Potiuk <[email protected]>
---
.../providers/celery/executors/celery_kubernetes_executor.py | 5 +----
.../cncf/kubernetes/executors/local_kubernetes_executor.py | 6 +-----
2 files changed, 2 insertions(+), 9 deletions(-)
diff --git
a/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py
b/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py
index 34cfb27e86a..9d55cda5376 100644
---
a/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py
+++
b/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py
@@ -103,10 +103,7 @@ class CeleryKubernetesExecutor(BaseExecutor):
@property
def queued_tasks(self) -> dict[TaskInstanceKey, Any]:
"""Return queued tasks from celery and kubernetes executor."""
- queued_tasks = self.celery_executor.queued_tasks.copy()
- queued_tasks.update(self.kubernetes_executor.queued_tasks) # type:
ignore[arg-type]
-
- return queued_tasks # type: ignore[return-value]
+ return self.celery_executor.queued_tasks |
self.kubernetes_executor.queued_tasks
@queued_tasks.setter
def queued_tasks(self, value) -> None:
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py
index f2eb64e46c5..7b8d59bcfac 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py
@@ -98,11 +98,7 @@ class LocalKubernetesExecutor(BaseExecutor):
@property
def queued_tasks(self) -> dict[TaskInstanceKey, Any]:
"""Return queued tasks from local and kubernetes executor."""
- queued_tasks = self.local_executor.queued_tasks.copy()
- # TODO: fix this, there is misalignment between the types of
queued_tasks so it is likely wrong
- queued_tasks.update(self.kubernetes_executor.queued_tasks) # type:
ignore[arg-type]
-
- return queued_tasks
+ return self.local_executor.queued_tasks |
self.kubernetes_executor.queued_tasks
@queued_tasks.setter
def queued_tasks(self, value) -> None: