Hi Niko,
thanks for the feedback.
(1) I think the use case is not really narrow, @tirkarthi also pointed
to issuehttps://github.com/apache/airflow/issues/57210 - So this would
be closed as well
(2) I aimed to include support for Multi-Team (that was even adding some
complexity compared to the local patch in 3.1.7. Yes so the config
property is atm global such that if you set it enabled for
CeleryExecutor it would be for all Executors in all teams. But if you
wish and see a reason we can also model the config being team specific
(e.g. only team_a uses CeleryExecutor and team_b does not optimize -
though not sure if there is a need to separate). Routing and queueing
for sure is respectinv the correct Executor/queue instance to route to
in the PR.
See
https://github.com/apache/airflow/pull/63489/changes#diff-c30603fe0a3527e23af541ba115c91c85c9c213e6d105af6a48c88a7018a5799R333
and
https://github.com/apache/airflow/pull/63489/changes#diff-c30603fe0a3527e23af541ba115c91c85c9c213e6d105af6a48c88a7018a5799R347
Jens
On 18.03.26 23:34, Oliveira, Niko wrote:
Thanks for the write-up Jens, it helps to have the full context of your
thought process.
The code changes themselves are small and fairly elegant. But this
breaks the invariant that there is only ever one instance of each executor
(per team) and that they live in the scheduler process and the scheduler is
the only thing that interacts with them in a scheduling capacity. To me
it's quite a large logical change in Airflow behaviour/operation. When
reasoning about execution and scheduling there is now another source that
will always need to be considered, ensuring it works and is tested when
executor related changes are made, etc. This has been fraught for us in the
past in ways that were hard to predict beforehand.
The usecase seems quite narrow and focused on how you folks use Airflow.
Are you hearing from any other users who are asking for something like
this? I'm just not sure I see enough evidence that it truly belongs in
apache/airflow main.
Cheers,
Niko
________________________________
From: Jens Scheffler<[email protected]>
Sent: Wednesday, March 18, 2026 2:56 PM
To:[email protected] <[email protected]>
Subject: [EXT] [DISCUSS] PR: Option to directly submit to worker queue
from Triggerer
CAUTION: This email originated from outside of the organization. Do not
click links or open attachments unless you can confirm the sender and know
the content is safe.
AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur
externe. Ne cliquez sur aucun lien et n’ouvrez aucune pièce jointe si vous
ne pouvez pas confirmer l’identité de l’expéditeur et si vous n’êtes pas
certain que le contenu ne présente aucun risque.
Dear Airflow Devs!
TLDR: Because of operational problems in processing workload we propose
an extension allowing to directly re-queue tasks from triggerer. The PR
raised demand to discuss to ensure awareness for the change is available.
Pull Request: Allow direct queueing from triggerer
<https://github.com/apache/airflow/pull/63489#top> #63489
https://github.com/apache/airflow/pull/63489
The Use Case/Problem Statement:
We use Airflow for many workflows of scaled long and large Dags in
running 80% KPO workload. To ensure KPO can run scaled and long w/o
operation interruptions (worker restart due to re-deployment, Pods with
workload sometimes running 4-10h) and to be able to scale to thousands
of running KPO Pods we need to use and leverage deferred mode
excessively.
In KPO with deferred a task is first scheduled to a (Celery in our case)
worker which prepares the Pod manifest and starts the Pod. From there it
hands-over to triggerer which monitors the Pod running and tails the log
so that a user can watch progress. Once the Pod is completed it returns
back to a (Celery) worker that finishes-up work, extracts XCom, makes
error handling and cleans-up the Pod from K8s. This also means that the
Pod is only finished when the XCom is pulled from side-car, the "base"
container might be completed and the Pod is only done and deleted when
the XCom is collected. Until KPO collects XCom the Pod keeps running.
The current method of scheduling in Airflow is that the Scheduler checks
all rules of concurrency (max_active_tasks, max_tis_per_dagrun,
pools...) in state scheduled before a task is queued to be started. On
the worker when started it is directly set to "deferred" and then a
triggerer picks-up (no re-scheduling or active distribution to a
triggerer). On the way back today triggerer marks the task "scheduled"
which means the scheduler logic needs to pick-up the task again for
competition. With all other workload. And re-schedule with all
concurrency and priority checks like initially to get to queued to be
re-assigned to a (Celery) worker. This implicitly means leaving the
state of "deferred" to "scheduled" the task loses the allocated pool
slot and also need to re-allocate this.
It most regular situations this is okay. In our scenario it is a
problem: We have many Dags competing for the K8s cluster resources and
the concurrency features of Airflow joined with priority controls should
ensure that important workload runs first. Once there is residual
capacity less important batches can consume cluster resources. And with
"consume resources" also refers to Pods sitting on the cluster. They
free up the cluster space only at point of XCom collected and Pod
removed. Before they still consume CPU and ephemeral storage
allocations. We limit the amount of workload being able to be sent to
K8s by Airflow pools which are the ultimate limit for concurrency on
different node pools (e.g. nodes with GPU and nodes w/o GPU). Other
workload often runs on Edge workers or directly as Python in Celery.
With multiple Dags and different priorities we had these two effects:
(1) A lower priority batch is running ~N*100 Pods in deferred. A higher
priority large batch is started. Pods finishing from the lower priority
tasks are assumed to drain the cluster, when they end the task instances
are set to "scheduled" and... then stay there until all tasks of the
higher priority tasks are worked off (assuming the higher priority tasks
are not limited leaving room for the lower priority tasks). So base
container of the Pods are completed, the XCom side car waits long - we
have seen even 24h - to be XCom collected to be cleaned.
(a) Additional side effect if pending long the AutoScaler might
pick such a node as scaling victim because really idle and after grace
period kills the Pod - Later when the workload returns to worker the Pod
is showing a HTTP 404 as being gone, XCom is lost... in most cases need
to run a retry, else it is anyway a delay and additional hours of
re-execution. If no retry just raising failures to users.
(b) We had the side effect that newer high priority workload was
not scheduled by K8s to the (almost idle) Nodes because the previous
pending Pods allocated still ephemeral storage and not sufficient space
was on K8s nodes for new workload... so the old Pods blocked the new and
the higher priority task instances blocked the cleanup of the lower prio
instances. A lot of tasks were in a kind of dead-lock.
(c) As the re-set to state "scheduled" from triggerer also sets the
"scheduled" date of the task instance also the from the same "low
priority" Dag other pending scheduled task instances are often started
earlier. So workers pick-up new tasks to start new Pods but a lot of old
Pods are sitting there idle waiting to collect XCom to clean-up
(2) Also sometimes because of operational urgency we use the
"enable/disable" scheduling flag on Dags in the UI to administratively
turn-off Dag scheduling to leave space for other Dags... or to drain the
cluster for some operational procedures e.g. to have a safe ground
before maintenance. But as the Dag needs to be actively scheduled to
process the return from triggerer. If you turn off scheduling the
workload in flight is never finishing and is getting stuck like
described before. Pods are stale on the cluster, nobody picks-up the
XCom. And the problematic scenario is also there is no way to "clean up"
such tasks to finish these Dags w/o turning on scheduling... but then
also new tasks are queued and you are just not able to drain the
cluster. I know we discussed multiple times that we might need a "drain"
mode to let existing Dags finish but not scheduling new Dag runs... but
such feature is also missing. To say: Scheduling new tasks is tightly
coupled with the scheduling of cleanup. Not possible to separate.
Getting to the problems as 1 (a-c) as well in the scenario (2).
We thought a while about which options we would have to contribute to
improve in general. Assumption and condition is that the initial start
on the (Celery) Worker is fast, most time is spent (once) on the
triggerer and the return to worker is actually only made for a few
seconds to clean-up. And of course we want to minimize latency to (I)
free the allocated resources and (II) not to have any additional
artificial delay for the user. Which a bit contradicts with the efforts
to flip from worker to triggerer and back again.
Options we considered:
* Proposing a new "state" for a task instance, e.g. "re-schedule" that
is handled with priority by scheduler.
But the scheduler is already a big beast of complexity, adding
another loop to handle re-scheduled with all existing complexity
might be a large complexity to be added and adding another state in
the state model also adds a lot of overhead from documentation to
UI...
* Finish-up the work on triggerer w/o return to Worker. It is only
about cleaning the KPO and...
Unfortunately more than just monitoring is very complex to implement
and especially XCom DB access is not a desired concept and triggerer
does not have support. We also have some specific triaging and error
handling automation extended on top of KPO which all in async with
the limited capabilities of triggerer would be hard to implement.
Main blocker in this view is XCom access.
* Dynamically increase priority of a task returning from triggerer.
We considered "patching" the priority_weight value of the task
instance on the triggerer before return to ensure that tasks
returning are just elevated in priority. First we made this from the
side via SQL (UPDATE task_instance SET priority_weight=1000 WHERE
state='scheduled' AND next_method='trigger_reentry') but actually if
the task failed and restarted then it is hard to find and reset the
priority back... still a retry would need to be reset down... all
feels like a workaround.
* Implementing a special mode in Scheduler to select tasks with
"next_method" being set as signal they are returning from triggerer
in a special way... assuming they have a Pool slot and exclude some
of the concurrency checks (As in "scheduled" state the pool slot is
actually "lost")
But this hard to really propose... as this might be even harder than
the first option as well as the today complex code would get even
harder in scheduler to consider exceptions in concurrency... with
the risk that such special cases exceed the planned concurrency
limits if otherwise the pools are exhausted before already.
* Adding a REST API that the triggerer can call on scheduler to
cross-post workload.
That would need to add a new connection and component bundling, a
REST API endpoint would need to be added for schedulers to receive
these push calls. Probably an alternative but also adds
architectural dependability.
* The PR we propose to discuss here: If the task skips "scheduled"
state and moves to queue directly the pool slot keeps allocated
(assuming that Deferred in actively counting into pool and
concurrency limits). Code looked not too complex as just the
enqueuing logic from Scheduler could be integrated.
In this it is considering that such direct queuing is only possible
if the executor supports queues (not working for LocalExecutor!). So
the proposed PR made it explicitly opt-in.
Reasons (and pro-arguments) why we propose to have the PR on main:
* As of a lot of operational problems recently we tested this and
patched this locally into our 3.1.7 triggerer. Works smoothly on
production since ~1 week
* If something goes wrong or Executor is not supported then the
existing path setting to "scheduled" is always used as safe fallback
* It is selective and is an opt-in feature
* We dramatically reduced latency from Pod completion to cleanup some
sometimes 6-24h to a few seconds
* We assume the cleanup as return from Worker is a small effort only
so no harm even if temporarily over-loading some limits
* ...But frankly speaking the concurrency limits and Pools were
checked initially at time of start. Limiting cleanup later on
concurrency limits is not adding any benefits but just delays and
problems. We just want to finish-up work.
* But finally actually over-loading is not possible as still the Redis
queue is in between - so any free Celery Worker will pick the task.
Even in over-load it will just sit in Celery queue for a moment.
* It is a relatively small change
* Off-loads scheduler by 50% for all deferred tasks (need to pass
scheduler only once)
* Due to reduced latency on cleanup more "net workload" schedulable on
the cluster, higher cloud utilization / less idle time.
Hearing the feedback on PR reflecting with the devils advocate I could
understand the counter arguments:
* In Airflow 2 there was a mini-scheduler, there was a hard fight in
Airflow 3 to get rid of this!
Understand. But we do not want to add a "mini scheduler" we just
want to use parts of the Executor code to push the task instance to
queue and skip scheduling. It is NOT the target to make any more and
schedule anything else.
* This would skip all concurrency checks and potentially over-load the
workers!
No. Concurrency rules are checked when the workload is initially
started. I know there are parallel bugs we are fighting with to
ensure deferred status is counted on all levels into concurrency to
correctly keep limits. Assuming that you enable counting deferred
into pools, a direct re-queue to worker is just keeping the level of
concurrency not adding more workload... just transferring back. And
Celery for example has a queue so not really over-loading. It is
mainly intended to clean-up workload which is a low effort task.
* We plan to cut-off components and untangle package dependencies.
After worker the Dag parser and triggerer are next. Linking to
Executor defeats these plans!
Yes, understood. But also today the setting of the task_instance is
using direct DB access... and would in such surgery need to be cut
to the level that the DB access would need to be moved to execution
API back-end. So the cut for re-queueing would move to execution API
in future, not triggerer. I think it would be valid to think about
the options if such distribution is made how that might evolve in
future.
* This option is risky and we have concerns people have more errors.
Feature is opt-in, need to be configured. Per default as proposed in
the PR it is not active. Would be also acceptable to mark this
experimental for a while.
Sorry, a bit longer text. Happy to get feedback.
Jens
---------------------------------------------------------------------
To unsubscribe, e-mail:[email protected]
For additional commands, e-mail:[email protected]