This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fd4fd67c0f2 Improve performance of task dequeuing (#61376)
fd4fd67c0f2 is described below

commit fd4fd67c0f24471707a95a589f14414b55083dba
Author: Steve Ahn <[email protected]>
AuthorDate: Sun Feb 15 01:44:45 2026 -0800

    Improve performance of task dequeuing (#61376)
    
    * pop(0) to popleft()
    
    * revert with reverse list
---
 airflow-core/src/airflow/executors/base_executor.py             | 4 ++--
 airflow-core/src/airflow/serialization/definitions/taskgroup.py | 5 +++--
 task-sdk/src/airflow/sdk/definitions/taskgroup.py               | 5 +++--
 3 files changed, 8 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/executors/base_executor.py 
b/airflow-core/src/airflow/executors/base_executor.py
index a54e1464c24..927e4c07d64 100644
--- a/airflow-core/src/airflow/executors/base_executor.py
+++ b/airflow-core/src/airflow/executors/base_executor.py
@@ -349,7 +349,7 @@ class BaseExecutor(LoggingMixin):
         return sorted(
             self.queued_tasks.items(),
             key=lambda x: x[1].ti.priority_weight,
-            reverse=True,
+            reverse=False,
         )
 
     @add_debug_span
@@ -363,7 +363,7 @@ class BaseExecutor(LoggingMixin):
         workload_list = []
 
         for _ in range(min((open_slots, len(self.queued_tasks)))):
-            key, item = sorted_queue.pop(0)
+            key, item = sorted_queue.pop()
 
             # If a task makes it here but is still understood by the executor
             # to be running, it generally means that the task has been killed
diff --git a/airflow-core/src/airflow/serialization/definitions/taskgroup.py 
b/airflow-core/src/airflow/serialization/definitions/taskgroup.py
index 4a4ee0588cf..d971c303c7c 100644
--- a/airflow-core/src/airflow/serialization/definitions/taskgroup.py
+++ b/airflow-core/src/airflow/serialization/definitions/taskgroup.py
@@ -22,6 +22,7 @@ import copy
 import functools
 import operator
 import weakref
+from collections import deque
 from typing import TYPE_CHECKING
 
 import attrs
@@ -188,9 +189,9 @@ class SerializedTaskGroup(DAGNode):
         from airflow.serialization.definitions.baseoperator import 
SerializedBaseOperator
         from airflow.serialization.definitions.mappedoperator import 
SerializedMappedOperator
 
-        groups_to_visit = [self]
+        groups_to_visit = deque([self])
         while groups_to_visit:
-            for child in groups_to_visit.pop(0).children.values():
+            for child in groups_to_visit.popleft().children.values():
                 if isinstance(child, (SerializedMappedOperator, 
SerializedBaseOperator)):
                     yield child
                 elif isinstance(child, SerializedTaskGroup):
diff --git a/task-sdk/src/airflow/sdk/definitions/taskgroup.py 
b/task-sdk/src/airflow/sdk/definitions/taskgroup.py
index c7c5da21140..c47b1f360ae 100644
--- a/task-sdk/src/airflow/sdk/definitions/taskgroup.py
+++ b/task-sdk/src/airflow/sdk/definitions/taskgroup.py
@@ -22,6 +22,7 @@ from __future__ import annotations
 import copy
 import re
 import weakref
+from collections import deque
 from collections.abc import Generator, Iterator, Sequence
 from typing import TYPE_CHECKING, Any
 
@@ -586,10 +587,10 @@ class TaskGroup(DAGNode):
         """Return an iterator of the child tasks."""
         from airflow.sdk.definitions._internal.abstractoperator import 
AbstractOperator
 
-        groups_to_visit = [self]
+        groups_to_visit = deque([self])
 
         while groups_to_visit:
-            visiting = groups_to_visit.pop(0)
+            visiting = groups_to_visit.popleft()
 
             for child in visiting.children.values():
                 if isinstance(child, AbstractOperator):

Reply via email to