Hi Jarek,

regarding:

And sorry for the rework, but this is pretty expected that such
customisation will need updating now and then. But then you have a nice
case for Bosch: "Look, this is what happens, when we do not upstream our
changes and do not do it in the way that is extendable and "product" wise.
So next time let's spend a bit more time when we customize things to make
upstreamable and do not double-spend on it. You will have a nice case to
show as an example (just saying - and I know it's not that easy - trying to
find the bright side of it).

...sorry need to dis-agree: We were under fire as of a lot of in-stabilities and this problem was the tip of the iceberg. We needed to improve radical in a short timeframe. No time to mave a multi-week discussion. Our house was on fire. And as initially described we considered multiple options and for us (short term) we went the route which was not complex / minimal invasive.

I think discussiosns are toally fair and the important hing are the reasons - which are given as good opinions that I accept. It is not about the rework but as of the feedback I assume many are suffering from this. Willr aise a VOTE hoping that the needed groundwork as enablement can be added to 3.2.0 such that the hotfix we needed to make does not need to be applied to all other installs suffering from the problem and it an be resolved with 3.2.0

On 25.03.26 00:39, Jarek Potiuk wrote:
Jens:

I think that looks way more reasonable for now. I would leave detailed
review to those who know that part better - but from the logical
/architectural point of view, this is way more "Airflow-y"

And sorry for the rework, but this is pretty expected that such
customisation will need updating now and then. But then you have a nice
case for Bosch: "Look, this is what happens, when we do not upstream our
changes and do not do it in the way that is extendable and "product" wise.
So next time let's spend a bit more time when we customize things to make
upstreamable and do not double-spend on it. You will have a nice case to
show as an example (just saying - and I know it's not that easy - trying to
find the bright side of it).

Przemysław:

  1.
Modification of priority weight to have a greater value for tasks which are
running (task has higher priority by design if it is an deferred task and
was already running in the past)

We had **LOTS** of talks and even AIP
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-100+Eliminate+Scheduler+Starvation+On+Concurrency+Limits?focusedCommentId=406619934
discussing options - I think after 3.3 one of the things to think about is
to lead that one to completion and redesign the priority weighs in general
finally - because it already reached it limits.

2. Doing something like pre-emption from Operating Systems, but on task,
which would mean that if higher priority task is waiting and lower priority
task is running, higher priority task can put lower one to sleep and take
its slot

Yes. IMHO preemption is the only way to actually deal with some of the
scenarios. Logically, you cannot achieve both optimal machine utilization
and optimal latency for task chain execution without preemption. And even
with preemption - you achieve it by taking a (smaller) hit on utilization.
Instead of reserving free slots for potential high-priority workloads, you
decide to kill some in-progress tasks to run high-priority ones - thus
lowering utilization due to all the already partially run but preempted
tasks. This is really nice if you have rare spikes in traffic, because
utilization only takes a hit when there is a high load. Reserving slots in
any way (queues, etc.) causes a utilization hit, except during spikes.

But it's not something we necessarily need to handle ourselves. Perhaps
this is a sign that we could use our "Airflow as a platform" approach to
delegate some of those tasks out long term. While Airflow is good in
scheduling and conditional logic of workflows of various provenance (which
is what Airflow actually excels at and this is our bread and butter). But
airflow had never been into optimizing of utilisation. And Yuvicorn is
designed for it (While it does not handle complex logic) - so the combo is
something I've always thought is a good idea. (and the good news is that
there is a rumour at the Airflow summit we will have a really nice talk
about it from one of our PMC member whos team already does it at scale in
one of the biggest companies in the word - just rumours ...)

J.


On Tue, Mar 24, 2026 at 11:37 PM Przemysław Mirowski <[email protected]>
wrote:

Hi all,

Thank you Jens for bringing up this topic as I would say it is not an
niche issue in higher load Airflow deployments which are using Deferred
Operators.

I haven't checked, in details, all PRs created regarding this topic, but I
thought that I will share my 2c.

 From my perspective, the issue is affecting any setup which is using
Triggerer as mentioned by André in the past. I'm not sure if there is ideal
solution to this issue, so probably we could do a "patch" thing until the
proper one will be designed, implemented and tested.

One idea which I haven't seen on the devlist yet could be:

   1.
Modification of priority weight to have a greater value for tasks which
are running (task has higher priority by design if it is an deferred task
and was already running in the past)
   2.
Doing something like pre-emption from Operating Systems, but on task,
which would mean that if higher priority task is waiting and lower priority
task is running, higher priority task can put lower one to sleep and take
its slot

Of course, doing step 2 would require design changes and a smart way of
stopping tasks without breaking their operations (probably per operator
implementation something like on_kill method), but at the end it could
possibly resolve an issue. That approach could lead to starvation on the
lower priority tasks, but as they are lower priority, it should not be a
huge issue.

Regarding the different options - I would not really be in favour of doing
something which would be next to some different logic with exclusion of any
limit. In high workload environment that could lead to e.g. higher resource
consumption on particular components (which would be hard to predict) which
could lead to stability/performance issues.

Przemek

________________________________
From: Jens Scheffler <[email protected]>
Sent: 22 March 2026 18:25
To: [email protected] <[email protected]>
Subject: Re: [DISCUSS] PR: Option to directly submit to worker queue
from Triggerer
Hi Airflow Dev's,

According to the feedback and a bit of Claude I have made two PRs which
would resolve the issue at least for KPO and only for cases where no
callback is needed:

   * https://github.com/apache/airflow/pull/64068 - Feature in Core to
     support XCom results in Triggerer
   * https://github.com/apache/airflow/pull/64069 - Implement XCom push
     from Triggerer in KPO

I'd be VERY happy if discussion gets in a path that one or the other
solution gets into 3.2.0 (else we will need to locally patch for another
months...)

Compared to the initial proposals this is not adding any executor
coupling to Triggerer but with the two PRs only applies to KPO and only
if no callbacks are needed.

For us personally the alternative is a bit harder as we subclassed KPO
and all additional features then need to be re-implemented in a
triggerer.
All other operators in a similar fashion (KubernetesJobOperator or all
other Deferred wo return from Triggerer to Worker (could only fing AWS
EKS and Google GKE but there mgiht be more around) would need similar
extension/changes. And all users depending on callbacks are also still
on the same problem.

Jens

On 21.03.26 23:22, Jarek Potiuk wrote:
The idea to put all the closing into KPO would mean this is then only a
solution in KPO cases, all other tasks related to Triggerer
re-scheduling would still suffer from the same problem. So it would be a
point solution.

All other cases do not suffer from resources taken by the KPO pod. I
think
the case you described is **really** KPO bound and the real problem is
not
that "next" method has lower priority. I believe that executing
higher-priority tasks before lower-priority "next" tasks, after the
trigger
completes, is exactly how Airflow is designed. In this case I think the
problem is that we are using the "next" deferrable method in KPOTrigger
to
do something that was **never** supposed to run as "next." A side effect
is
that the "pod" must be kept for much longer than necessary just to
extract
the Xcom. That, to be honest, makes very little sense; it seems like
using
a screwdriver to put in nails instead of a hammer.

If the "next" method is only supposed to retrieve the XCom and save it to
the database, there is absolutely no reason to have a scheduled task for
it—this is, in my humble opinion (IMHO), the biggest problem to solve. Do
you think there is a good reason to use the sidecar volume to store XCom
data in this case, rather than Airflow's regular storage for XCom? For me
that looks like a bad idea where we essntially uise Pod's side-car to
store
a value in some kind of intermediate storage, while there is nothing to
prevent us to store it in the actual "target" storage.

But I think this is also one of the shortcomings that @dabla mentioned
multiple times when he was starting to contribute the Async Iterator - I
see no API endpoint in triggerer code (see e.g.

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py)
where the current API has an option to get a DB, supervisor or XCom
context. Maybe I am blind not seeing this but I fear this is just
missing and would be a larger enablement in the core for Triggerer to
have such functions that in regular tasks executed by SDK have as
context. Also there is no `context` object which providers this... but
happy if you can enlighten me where and how code in Triggerer would be
able to write an XCom if today available.

We can add it if it's missing; that's not an issue. It's far less a
disruptive chance than triggering communication with other components.
This
is a code change rather than an architectural change, which is always
easier for Airflow users to implement (you just upgrade the Airflow
version
- no other changes in deployment are needed.


In our case it is also a bit more complex - but would be fair to tell it
is then a Bosch only problem - we have inherited a custom operator on
top of KPO to cover specific error triaging which e.g. marks task failed
w/o retry if we see there is a deterministic error (like
ImagePullBackOff - we know is not there we do not want to retry - or
also for cases where our workload fails in problems we know are
deterministic and a retry would be just waste of time and resources like
an assert in the application we are testing). If all that logic is moved
to triggerer it will in-deed be a bit harder as you know deploying
changes to triggerer is less flexible than some operator code in Dag
source tree. But we can also make this... with more time invested. But
XCom need to be resolved.

Yes, I understand that - but IMHO this is building on a flaw in the
current
implementation (which was never intended to be extended). And in this
case
as I understand it, that would involve adding the same functionality to
KPOTrigger, not KPO itself. Yes. Breaking chance for you, but I think
overall a better future for Airflow maintainability. With this change, I
think the optimisation is even better than patching the behavior of the
current KPO. And if needs be - we can even add some "extension points"
and
the ability to use a custom KPOTrigger rather than a custom KPO. I don't
think this is a major change (even if it is somewhat more complex for
some
users - like Bosch). However, with this you can get even better
performance
than with the "hack" using an executor because you will entirely skip
re-entry and the need to start a new pod **just** to pull the XCom and
save
it in the database. That sounds like an overall win-win.

That might be a major change (breaking, requiring a significant
info/changelog for KPO) - reqiuiring new version of Airflow to provide
the
API that is possibly missing in 3.1.0 - but IMHO absolutlely worth doing
-
coul be also nicely conditioned by version of Airflow cncf.Kubernetes is
installed on.

I thinnk while slightly more complex for Bosh to pull-off, it might be
better for Airflow as a product.

J.



On Sat, Mar 21, 2026 at 10:49 PM Jens Scheffler<[email protected]>
wrote:
Hi Jarek, et al.

You are not missing any obvious and I understand the rationale of
coupling is what your concern is.

So your counter proposal would get back to the point I had in my list of
alternatives with the name "Finish-up the work on triggerer w/o return
to Worker". That had the ending with "Main blocker in this view is XCom
access."

The idea to put all the closing into KPO would mean this is then only a
solution in KPO cases, all other tasks related to Triggerer
re-scheduling would still suffer from the same problem. So it would be a
point solution.

But I think this is also one of the shortcomings that @dabla mentioned
multiple times when he was starting to contribute the Async Iterator - I
see no API endpoint in triggerer code (see e.g.

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py)
where the current API has an option to get a DB, supervisor or XCom
context. Maybe I am blind not seeing this but I fear this is just
missing and would be a larger enablement in the core for Triggerer to
have such functions that in regular tasks executed by SDK have as
context. Also there is no `context` object which providers this... but
happy if you can enlighten me where and how code in Triggerer would be
able to write an XCom if today available.

In our case it is also a bit more complex - but would be fair to tell it
is then a Bosch only problem - we have inherited a custom operator on
top of KPO to cover specific error triaging which e.g. marks task failed
w/o retry if we see there is a deterministic error (like
ImagePullBackOff - we know is not there we do not want to retry - or
also for cases where our workload fails in problems we know are
deterministic and a retry would be just waste of time and resources like
an assert in the application we are testing). If all that logic is moved
to triggerer it will in-deed be a bit harder as you know deploying
changes to triggerer is less flexible than some operator code in Dag
source tree. But we can also make this... with more time invested. But
XCom need to be resolved.

Jens

On 21.03.26 15:56, Jarek Potiuk wrote:
It took me a bit time - I wanted to go a bit deeper and look closer
aht e
I also share a bit of Niko's concerns. Running executor code from the
Triggerer significantly changes the architectural approach and
boundaries.
And yeah - it's not really tied to multi-team, it's a general issue (in
multi-team triggerer is per team, so you can be sure that "what is in
team,
stays in team").

But it does change which system components communicate with each other.
For
example, the Triggerer needs access to the Celery queue for Celery
Executor, which is otherwise unnecessary. Similarly, the Triggerer will
need access to start Pods if the K8S executor is used. I'm not even
sure
what would happen with the Edge executor. From what I understand, the
Edge
Executor actively loops and checks for tasks, purges jobs etc. It pulls
data from the database so I can imagine it simply enqueues the workload
and
then the "real" Edge executor takes over (I believe that will be the
case).
However, this significantly crosses the current boundaries of which
component does what.

Not mentioning the secutity implication - the JWT token generator in
your
solution works on triggerer, and that's pretty much breaks the (future)
assumption that Triggerer does not need to know the secret necessary to
generate the token - and has a lot implications (for multi-team for
example
- but not only) - generally a lot of the future task isolation work is
based that the user code has no chances to see the secret to generate
the
tokens.

While I think this is a good temporary solution for you, I understand
the
use case and its merit; it's not a niche one. Pretty much anyone with
KPO
and lots of deferred tasks will have this case. But I've been thinking
outside of the box actually. I am not sure if I'm right, but this seems
to
*really* be a problem with how KPO's Xcom passing is done via the
sidecar
for this Triggerer -> next() handover. But it does not have to be this
way.
Conceptually, I see absolutely no problem saving the XCom where it
belongs:
either the DB or XCom Backend. This operation is almost exclusively
I/O-bound, so it can easily be done asynchronously. It could be done in
KPOTrigger via the supervising process (which has DB access) instead of
passing things back for scheduling. If KPOTriggerer sees that the Pod
is
completed it could simply ask the supervisor to perform all the tasks
currently done in KPO's `trigger_reentry`. And that it would not even
cause
any re-entry, KPOTriggerer could wait for the supervisor to perform the
action and write to the XCom database or XCom backend, and would simply
"complete" the task. Triggerer could even have a separate event loop
for
such "finalization," and all of it could be done asynchronously to
scale
things.

There would be no "reentry" workload to run at all, because all
completion
could happen in the Triggerer. And I think the Triggerer, similar to
the
worker, can communicate with all components. It has access to the DB
(through triggerer supervisor process) and can also communicate with
the
XCom backend (you should be able to perform XCom Push from the
Triggerer
supervisor (not necessarily from the event loop process). Aside
probably
much more notwork traffic for the Triggerer to save/retrieve XComs from
multiple deferred KPOs. I can't see any problem with it ( and it can be
nicely scaled out by adding more triggerers)

Or am I missing something obvious ?

J.


On Sat, Mar 21, 2026 at 12:17 PM Jens Scheffler<[email protected]>
wrote:
Hey Niko,

thanks as well for the response with reasoning. I understand that you
do
not prefer a coupling for a +0.

Is there any other option (hint: some where listed "Options we
considered") that you would propose as better solution? Or is the +0
just the "least of worse"?

Jens

On 20.03.26 02:03, Oliveira, Niko wrote:
Hey Jens et al.,

That is three datapoints now so there is indeed some demand. Not sure
if
that pushes it over the limit of needing this, but it's more
compelling
now.
To your 2) in your last reply Jens: My concern wasn't Multi-Team
specifically or anything to do with config. More just that this
changes
the
(mostly unwritten) contract that the scheduler wholly owns and
interacts
with executors. Up until this change one could depend on this being
true
and that there is only ever one instance of any executor (per team
now,
since two teams can have the same executor between them). But now your
changes are instantiating executors and queuing work with them. So all
future executor/scheduling changes need to keep this in mind. It
simply
increases the surface area of execution/scheduling in Airflow and we
now
have a multi-writer situation which always brings extra complexity. If
there is enough demand then we should do it. Seems to be a few
requesters
now, which I think brings me to a +0 on this one.
Thanks again for the discussion!

________________________________
From: André Ahlert<[email protected]>
Sent: Thursday, March 19, 2026 1:15 PM
To:[email protected] <[email protected]>
Subject: RE: [EXT] [DISCUSS] PR: Option to directly submit to worker
queue from Triggerer
CAUTION: This email originated from outside of the organization. Do
not
click links or open attachments unless you can confirm the sender and
know
the content is safe.
AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur
externe. Ne cliquez sur aucun lien et n’ouvrez aucune pièce jointe si
vous
ne pouvez pas confirmer l’identité de l’expéditeur et si vous n’êtes
pas
certain que le contenu ne présente aucun risque.
Hi all,

This discussion resonates with problems we've seen on a fintech
client's
setup. Heavy KPO usage in deferred mode, Celery executor, pools with
include_deferred=True. When a batch of tasks finishes on the cluster
and
flips back to SCHEDULED, the pool slots are released and tasks have
to
recompete through the scheduler for what amounts to seconds of
cleanup
work. In our case the cloud cost was not the main concern, but it
raised
a
real architectural question: why does a task that already passed all
concurrency checks need to go through the entire scheduling loop
again
just
to collect XCom and delete a pod?

On the architectural concerns, I think the mini-scheduler comparison
from
Airflow 2 does not hold. That feature made full scheduling decisions
(concurrency, pools, priority). This PR makes none. It re-enqueues a
task
that allready satisfied all those checks. The task already had a
slot,
already was running. We are just sending it back to finish.

Fact that it is opt-in with a safe fallback to SCHEDULED makes this
very
low risk. If nobody configures direct_queueing_executors, behavior is
identical to today. Marking it experimental for a release or two
would
also
be fine.

On the use case being narrow: any deployment using deferred mode with
Celery at scale will eventually hit this. Issue #57210 shows
independent
reports of the same pattern. It is inherent to the DEFERRED ->
SCHEDULED
->
QUEUED state machine under pool contention, not specific to one
setup.
+1 (non-binding) on getting this into main.

Thanks Jens for bringing this one.

André Ahlert

Em qui., 19 de mar. de 2026 às 14:24, Jens Scheffler<
[email protected]
escreveu:

Hi Niko,

thanks for the feedback.

(1) I think the use case is not really narrow, @tirkarthi also
pointed
to issuehttps://github.com/apache/airflow/issues/57210 - So this
would
be closed as well

(2) I aimed to include support for Multi-Team (that was even adding
some
complexity compared to the local patch in 3.1.7. Yes so the config
property is atm global such that if you set it enabled for
CeleryExecutor it would be for all Executors in all teams. But if
you
wish and see a reason we can also model the config being team
specific
(e.g. only team_a uses CeleryExecutor and team_b does not optimize -
though not sure if there is a need to separate). Routing and
queueing
for sure is respectinv the correct Executor/queue instance to route
to
in the PR.
See


https://github.com/apache/airflow/pull/63489/changes#diff-c30603fe0a3527e23af541ba115c91c85c9c213e6d105af6a48c88a7018a5799R333
and


https://github.com/apache/airflow/pull/63489/changes#diff-c30603fe0a3527e23af541ba115c91c85c9c213e6d105af6a48c88a7018a5799R347
Jens

On 18.03.26 23:34, Oliveira, Niko wrote:
Thanks for the write-up Jens, it helps to have the full context of
your
thought process.
The code changes themselves are small and fairly elegant. But this
breaks the invariant that there is only ever one instance of each
executor
(per team) and that they live in the scheduler process and the
scheduler is
the only thing that interacts with them in a scheduling capacity. To
me
it's quite a large logical change in Airflow behaviour/operation.
When
reasoning about execution and scheduling there is now another source
that
will always need to be considered, ensuring it works and is tested
when
executor related changes are made, etc. This has been fraught for us
in
the
past in ways that were hard to predict beforehand.
The usecase seems quite narrow and focused on how you folks use
Airflow.
Are you hearing from any other users who are asking for something
like
this? I'm just not sure I see enough evidence that it truly belongs
in
apache/airflow main.
Cheers,
Niko

________________________________
From: Jens Scheffler<[email protected]>
Sent: Wednesday, March 18, 2026 2:56 PM
To:[email protected] <[email protected]>
Subject: [EXT] [DISCUSS] PR: Option to directly submit to worker
queue
from Triggerer
CAUTION: This email originated from outside of the organization. Do
not
click links or open attachments unless you can confirm the sender
and
know
the content is safe.
AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur
externe. Ne cliquez sur aucun lien et n’ouvrez aucune pièce jointe
si
vous
ne pouvez pas confirmer l’identité de l’expéditeur et si vous n’êtes
pas
certain que le contenu ne présente aucun risque.
Dear Airflow Devs!

TLDR: Because of operational problems in processing workload we
propose
an extension allowing to directly re-queue tasks from triggerer.
The
PR
raised demand to discuss to ensure awareness for the change is
available.
Pull Request: Allow direct queueing from triggerer
<https://github.com/apache/airflow/pull/63489#top> #63489
https://github.com/apache/airflow/pull/63489

The Use Case/Problem Statement:

We use Airflow for many workflows of scaled long and large Dags in
running 80% KPO workload. To ensure KPO can run scaled and long w/o
operation interruptions (worker restart due to re-deployment, Pods
with
workload sometimes running 4-10h) and to be able to scale to
thousands
of running KPO Pods we need to use and leverage deferred mode
excessively.
In KPO with deferred a task is first scheduled to a (Celery in our
case)
worker which prepares the Pod manifest and starts the Pod. From
there
it
hands-over to triggerer which monitors the Pod running and tails
the
log
so that a user can watch progress. Once the Pod is completed it
returns
back to a (Celery) worker that finishes-up work, extracts XCom,
makes
error handling and cleans-up the Pod from K8s. This also means that
the
Pod is only finished when the XCom is pulled from side-car, the
"base"
container might be completed and the Pod is only done and deleted
when
the XCom is collected. Until KPO collects XCom the Pod keeps
running.
The current method of scheduling in Airflow is that the Scheduler
checks
all rules of concurrency (max_active_tasks, max_tis_per_dagrun,
pools...) in state scheduled before a task is queued to be started.
On
the worker when started it is directly set to "deferred" and then a
triggerer picks-up (no re-scheduling or active distribution to a
triggerer). On the way back today triggerer marks the task
"scheduled"
which means the scheduler logic needs to pick-up the task again for
competition. With all other workload. And re-schedule with all
concurrency and priority checks like initially to get to queued to
be
re-assigned to a (Celery) worker. This implicitly means leaving the
state of "deferred" to "scheduled" the task loses the allocated
pool
slot and also need to re-allocate this.

It most regular situations this is okay. In our scenario it is a
problem: We have many Dags competing for the K8s cluster resources
and
the concurrency features of Airflow joined with priority controls
should
ensure that important workload runs first. Once there is residual
capacity less important batches can consume cluster resources. And
with
"consume resources" also refers to Pods sitting on the cluster.
They
free up the cluster space only at point of XCom collected and Pod
removed. Before they still consume CPU and ephemeral storage
allocations. We limit the amount of workload being able to be sent
to
K8s by Airflow pools which are the ultimate limit for concurrency
on
different node pools (e.g. nodes with GPU and nodes w/o GPU). Other
workload often runs on Edge workers or directly as Python in
Celery.
With multiple Dags and different priorities we had these two
effects:
(1) A lower priority batch is running ~N*100 Pods in deferred. A
higher
priority large batch is started. Pods finishing from the lower
priority
tasks are assumed to drain the cluster, when they end the task
instances
are set to "scheduled" and... then stay there until all tasks of
the
higher priority tasks are worked off (assuming the higher priority
tasks
are not limited leaving room for the lower priority tasks). So base
container of the Pods are completed, the XCom side car waits long -
we
have seen even 24h - to be XCom collected to be cleaned.

          (a) Additional side effect if pending long the AutoScaler
might
pick such a node as scaling victim because really idle and after
grace
period kills the Pod - Later when the workload returns to worker
the
Pod
is showing a HTTP 404 as being gone, XCom is lost... in most cases
need
to run a retry, else it is anyway a delay and additional hours of
re-execution. If no retry just raising failures to users.

          (b) We had the side effect that newer high priority
workload
was
not scheduled by K8s to the (almost idle) Nodes because the
previous
pending Pods allocated still ephemeral storage and not sufficient
space
was on K8s nodes for new workload... so the old Pods blocked the
new
and
the higher priority task instances blocked the cleanup of the lower
prio
instances. A lot of tasks were in a kind of dead-lock.

          (c) As the re-set to state "scheduled" from triggerer also
sets
the
"scheduled" date of the task instance also the from the same "low
priority" Dag other pending scheduled task instances are often
started
earlier. So workers pick-up new tasks to start new Pods but a lot
of
old
Pods are sitting there idle waiting to collect XCom to clean-up

(2) Also sometimes because of operational urgency we use the
"enable/disable" scheduling flag on Dags in the UI to
administratively
turn-off Dag scheduling to leave space for other Dags... or to
drain
the
cluster for some operational procedures e.g. to have a safe ground
before maintenance. But as the Dag needs to be actively scheduled
to
process the return from triggerer. If you turn off scheduling the
workload in flight is never finishing and is getting stuck like
described before. Pods are stale on the cluster, nobody picks-up
the
XCom. And the problematic scenario is also there is no way to
"clean
up"
such tasks to finish these Dags w/o turning on scheduling... but
then
also new tasks are queued and you are just not able to drain the
cluster. I know we discussed multiple times that we might need a
"drain"
mode to let existing Dags finish but not scheduling new Dag runs...
but
such feature is also missing. To say: Scheduling new tasks is
tightly
coupled with the scheduling of cleanup. Not possible to separate.
Getting to the problems as 1 (a-c) as well in the scenario (2).

We thought a while about which options we would have to contribute
to
improve in general. Assumption and condition is that the initial
start
on the (Celery) Worker is fast, most time is spent (once) on the
triggerer and the return to worker is actually only made for a few
seconds to clean-up. And of course we want to minimize latency to
(I)
free the allocated resources and (II) not to have any additional
artificial delay for the user. Which a bit contradicts with the
efforts
to flip from worker to triggerer and back again.

Options we considered:

       * Proposing a new "state" for a task instance, e.g.
"re-schedule"
that
         is handled with priority by scheduler.
         But the scheduler is already a big beast of complexity,
adding
         another loop to handle re-scheduled with all existing
complexity
         might be a large complexity to be added and adding another
state
in
         the state model also adds a lot of overhead from
documentation to
UI...
       * Finish-up the work on triggerer w/o return to Worker. It is
only
         about cleaning the KPO and...
         Unfortunately more than just monitoring is very complex to
implement
         and especially XCom DB access is not a desired concept and
triggerer
         does not have support. We also have some specific triaging
and
error
         handling automation extended on top of KPO which all in
async
with
         the limited capabilities of triggerer would be hard to
implement.
         Main blocker in this view is XCom access.
       * Dynamically increase priority of a task returning from
triggerer.
         We considered "patching" the priority_weight value of the
task
         instance on the triggerer before return to ensure that
tasks
         returning are just elevated in priority. First we made this
from
the
         side via SQL (UPDATE task_instance SET priority_weight=1000
WHERE
         state='scheduled' AND next_method='trigger_reentry') but
actually if
         the task failed and restarted then it is hard to find and
reset
the
         priority back... still a retry would need to be reset
down...
all
         feels like a workaround.
       * Implementing a special mode in Scheduler to select tasks
with
         "next_method" being set as signal they are returning from
triggerer
         in a special way... assuming they have a Pool slot and
exclude
some
         of the concurrency checks (As in "scheduled" state the pool
slot
is
         actually "lost")
         But this hard to really propose... as this might be even
harder
than
         the first option as well as the today complex code would
get
even
         harder in scheduler to consider exceptions in
concurrency...
with
         the risk that such special cases exceed the planned
concurrency
         limits if otherwise the pools are exhausted before already.
       * Adding a REST API that the triggerer can call on scheduler
to
         cross-post workload.
         That would need to add a new connection and component
bundling, a
         REST API endpoint would need to be added for schedulers to
receive
         these push calls. Probably an alternative but also adds
         architectural dependability.
       * The PR we propose to discuss here: If the task skips
"scheduled"
         state and moves to queue directly the pool slot keeps
allocated
         (assuming that Deferred in actively counting into pool and
         concurrency limits). Code looked not too complex as just
the
         enqueuing logic from Scheduler could be integrated.
         In this it is considering that such direct queuing is only
possible
         if the executor supports queues (not working for
LocalExecutor!). So
         the proposed PR made it explicitly opt-in.

Reasons (and pro-arguments) why we propose to have the PR on main:

       * As of a lot of operational problems recently we tested this
and
         patched this locally into our 3.1.7 triggerer. Works
smoothly
on
         production since ~1 week
       * If something goes wrong or Executor is not supported then
the
         existing path setting to "scheduled" is always used as safe
fallback
       * It is selective and is an opt-in feature
       * We dramatically reduced latency from Pod completion to
cleanup
some
         sometimes 6-24h to a few seconds
       * We assume the cleanup as return from Worker is a small
effort
only
         so no harm even if temporarily over-loading some limits
       * ...But frankly speaking the concurrency limits and Pools
were
         checked initially at time of start. Limiting cleanup later
on
         concurrency limits is not adding any benefits but just
delays
and
         problems. We just want to finish-up work.
       * But finally actually over-loading is not possible as still
the
Redis
         queue is in between - so any free Celery Worker will pick
the
task.
         Even in over-load it will just sit in Celery queue for a
moment.
       * It is a relatively small change
       * Off-loads scheduler by 50% for all deferred tasks (need to
pass
         scheduler only once)
       * Due to reduced latency on cleanup more "net workload"
schedulable on
         the cluster, higher cloud utilization / less idle time.

Hearing the feedback on PR reflecting with the devils advocate I
could
understand the counter arguments:

       * In Airflow 2 there was a mini-scheduler, there was a hard
fight
in
         Airflow 3 to get rid of this!
         Understand. But we do not want to add a "mini scheduler" we
just
         want to use parts of the Executor code to push the task
instance
to
         queue and skip scheduling. It is NOT the target to make any
more
and
         schedule anything else.
       * This would skip all concurrency checks and potentially
over-load
the
         workers!
         No. Concurrency rules are checked when the workload is
initially
         started. I know there are parallel bugs we are fighting
with
to
         ensure deferred status is counted on all levels into
concurrency
to
         correctly keep limits. Assuming that you enable counting
deferred
         into pools, a direct re-queue to worker is just keeping the
level of
         concurrency not adding more workload... just transferring
back.
And
         Celery for example has a queue so not really over-loading.
It
is
         mainly intended to clean-up workload which is a low effort
task.
       * We plan to cut-off components and untangle package
dependencies.
         After worker the Dag parser and triggerer are next.
Linking to
         Executor defeats these plans!
         Yes, understood. But also today the setting of the
task_instance
is
         using direct DB access... and would in such surgery need
to be
cut
         to the level that the DB access would need to be moved to
execution
         API back-end. So the cut for re-queueing would move to
execution
API
         in future, not triggerer. I think it would be valid to
think
about
         the options if such distribution is made how that might
evolve in
         future.
       * This option is risky and we have concerns people have more
errors.
         Feature is opt-in, need to be configured. Per default as
proposed in
         the PR it is not active. Would be also acceptable to mark
this
         experimental for a while.

Sorry, a bit longer text. Happy to get feedback.

Jens

---------------------------------------------------------------------
To unsubscribe,e-mail:[email protected]
For additional commands,e-mail:[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail:[email protected]
For additional commands, e-mail:[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to