ferruzzi commented on code in PR #62343:
URL: https://github.com/apache/airflow/pull/62343#discussion_r2897389432
##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -573,13 +592,24 @@ def try_adopt_task_instances(self, tis:
Sequence[TaskInstance]) -> Sequence[Task
@property
def slots_available(self):
- """Number of new workloads (tasks and callbacks) this executor
instance can accept."""
- return self.parallelism - len(self.running) - len(self.queued_tasks) -
len(self.queued_callbacks)
+ """Number of new workloads (tasks, callbacks, and connection tests)
this executor instance can accept."""
+ return (
+ self.parallelism
+ - len(self.running)
+ - len(self.queued_tasks)
+ - len(self.queued_callbacks)
+ - len(self.queued_connection_tests)
+ )
Review Comment:
Maybe the queue_type should also include something to indicate queue
priority and whether that queue is intended to be used FIFO or prioritized?
Something vaguely like
If we add a NamedTuple called `WorkloadQueueDef(scheduling_tier: int,
sort_key: int)` into the base workload and they are defined like this in the
individual workloads:
```python
class TestConnection:
@property
def queue_def(self) -> WorkloadQueueDef :
return WorkloadQueueDef(
scheduling_tier = -1, # connection tests are super fast so
let's always get them out of the way first
sort_key = 0, # FIFO order
)
class ExecuteCallback:
@property
def queue_def(self) -> WorkloadQueueDef:
return WorkloadQueueDef(
scheduling_tier = 0, # always finish a callback before starting
a new task, but don't block the much faster fast connection tests for a
callback
sort_key = 0, # FIFO order
)
class ExecuteTask:
@property
def queue_def(self) -> WorkloadQueueDef:
return WorkloadQueueDef(
# after all connection tests and callbacks are running, take the
task with the highest priority
scheduling_tier = 1,
sort_key = self.ti.priority_weight,
)
```
((IN fact, we can set them to both default to 0 and that gets even more
simplified.... but you get the idea))
and replace all of the "supports_callbacks", "supports_connection_test", etc
in each executor can be replaced by a `supported_workload_types: set`. in
LocalExecutr it might look like `supported_workload_types = {"ExecuteTask",
"ExecuteCallback", "TestConnection"}`
then the entire chunk of code in _get_workloads_to_schedule that sorts out
which jobs to pick up would be something real simple like
```python
def _get_workloads_to_schedule(self, open_slots: int) ->
list[tuple[WorkloadKey, ExecutorWorkload]]:
# create a meta_queue which consists of each sub-queue, sorted by that
sub-queue's defined sort_key in the order of the scheduling tiers. Then take
the top `open_slots` number of jobs off the list.
all_workloads: list[ExecutorWorkload] = [
workload
for queue in self.executor_queues.values()
for workload in queue.values()
]
# Sort by scheduling tier first, then by sort_key within each tier.
all_workloads.sort(key=lambda workload: workload.queue_def)
return [(workload.key, workload) for workload in
all_workloads[:open_slots]]
```
As a bonus, this also assures that future workload types honor slots by
default; if some future workload type wanted to break that, it would have to be
done intentionally. It also means `slots_available` and `slots_occupied` turn
into simple comprehensions along the lines of `sum(len(q) for q in
self.executor_queues.values())` that don't have to be updated with every new
workload. The code overall gets much cleaner.
I'm not sure, I haven't thought that through fully, but as we add more
workload types we may need to get that sorted out before long. I have some
rough ideas but they were all overkill for only having two types at the time so
i figured this was a good place to get those thoughts "on paper". I'll resolve
this comment so nobody sees it as blocking, but feel free to message me on
Slack and/or tag me in the PR if you get to it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]