Hi,

thanks for all the proposals. This sounds really really great and important! We also see a couple of OOM problems and static allocations of memory really might improve performance indirectly as well.

Before in the case of long-lived objects in Celery and we invest time there we should double-check that Celery does not have such optimization already built-in. They probably had their mind made up about this when they implemented pre-fork. Not that we optimize around and existing optimization.

So all these investigations - following LocalExecutor - should be separted PRs and analysis in my view and increments can gain experience and make tooling more efficient.

TLDR: Great, please not all in parallel at the same time :-D

Jens

On 12/18/25 16:42, Aritra Basu wrote:
Ack, I think that makes sense. Let's get some of the work done and then
maybe we can try getting the data as well.

--
Regards,
Aritra Basu

On Thu, 18 Dec 2025, 6:47 pm Jarek Potiuk, <[email protected]> wrote:

Jarek do you think we should try to get some telemetry on the size/count
of
imports that are loaded, the amount of reuse/overlap we have, etc?

Yes, but not necessarily telemetry and not necessarily to start with it. I
think initially we need more like a good set of test dags with various
imports that might simulate various scenarios. This will be way easier to
pull off - and this is what Jeongwoo already did when analysing the COW
with Local Executor. Then yes  - we could gather some
"telemetry" information (not automatically but we could ask our users to do
some experimenting on their dags - with the new ray feature, also courtesy
of Jeongwoo). But asking for it first will not let us experiment with
observed gains when we experiment with fixes. It could be useful later - to
ask our "active" users to run some experiments when we enable any
improvements we come up with (undoubtedly those should be opt-in features
initially) - but I would say we need to have some good understanding of
what we can gain by trying it out locally first.

Think one off dags importing something ginormous? And additionally
it seems like a non trivial change so some data to validate our thoughts
would put my mind at ease.

I think more of a compound effect of importing the same thing by multiple
processes. If you look at the local executor fixes achieved by
https://github.com/apache/airflow/pull/58365  - by avoiding the COWs, the
memory used "per-executor-process" are enormous: 114 M per-process vs 39 M
per-process is HUGE. Basically it means that in the tests done by Jeongwoo
- if you have a local executor with 20 processes, you save 20*70M => 1.4 GB
of memory - simply because most of the memory remains shared between all
those processes instead of being essentially copied. Not mentioning the
performance of all those memory copy operations incurred by COW.

This will be similar for Celery Executor. Currently - because we allow the
workers to run with - even high - concurrency, what is happening in celery
workers is that those preforked processes that are long-running are
essentially keeping a copy of almost exactly the same memory resulting from
importing some heavy libraries. Note - that it's not just **imported
modules** as such - but ALL the memory those imported modules are
allocating on heap when they are initialized. Many, many python modules
allocate memory for something - classes, they import, pre-loading some
data, preparing some in-memory structure so that you avoid doing I/O
operations and use in-memory structure to speed up lookups. Essentially - a
lot of memory is read-only when initialized, but the way how garbage
collection works, and the fact that such memory is usually fragmented, the
issue is that garbage collection will essentially - over time - copy all
that read-only memory in forked processes (unless you do this
gc.freeze/unfreeze dance with prevents that) . Essentially - in the local
executor tests done by Jeongwoo - 70 M out of 114 M used by a forked
process is this kind of memory - shared memory that is read-only that is
copied because of garbage collector. -> that's almost 65% saving per
process. We have 16 as default concurrency in celery now and "prefork" is
the default setting - and we have "long" running forks - we do not have
"max_tasks_per_child" set, nor any other "recycling" of celery worker
processes, so we can safely assume that in vast majority of our
installations - all the workers have essentially 16x copy of the memory
that could be shared because gc process would not COW it. This is a HUGE
saving if we can decrease default memory usage of pretty much all
installations there. Assuming that it's even just 50% of memory, this means
that those workers currently use 8x more memory than they need. Not 20 or
30 percent more, but 8x more (!) - which is 800%.

So the memory gains should be **huge** for celery executor IMHO.

The memory gains for Dag File Processor will not be as big - because there
are no long-running processes and we have usually smaller concurrency (2 is
the default but of course in huge installations this goes up) - there will
only be spikes of memory, not sustained usage, there - the main gain is
performance, and actually this is quite important once, because same dags
are continuously parsed and re-parsed, and if the approach with import
optimisation will work as hypothesised, this means that all the import
savings will be multiplied by a number of time dags are parsed. Even more -
the same imports used in different dags will also be optimized away.
Imagine you have 1000 dags parsed every 5 minutes (safe assumption -
normally you want parsing to be done more frequently) and you save just 1s
for the import (which is not something unlikely - pandas import can easily
takes seconds), then you save 200s build-time seconds (2.5 minutes) per 5
minutes of parsing. That's a lot of CPU time saved. Like... A LOT.

Those are the "back-of-the-envelope" calculations I can hypothesise about,
but IMHO - they are quite close to real gains we can observe. So ... if we
think "is it worth trying", my answer is - definitely yes. But we should
see some real numbers and results of those experiments, before we actually
release it to the users, and this could be initially opt-in, and we could
ask some friendly users to test it out in "reality" and bring back the real
numbers and stability feedback, and then we could decide to make it
"opt-out".

J.



On Thu, Dec 18, 2025 at 1:25 PM Aritra Basu <[email protected]>
wrote:

Jarek do you think we should try to get some telemetry on the size/count
of
imports that are loaded, the amount of reuse/overlap we have, etc?
Because
while this sounds good and might help, I fear we might in this aim to
reduce our memory footprint we might be overloading (no pun intended) the
process? Think one off dags importing something ginormous? And
additionally
it seems like a non trivial change so some data to validate our thoughts
would put my mind at ease.

This is all based on the assumption that what I understood from the
thread
that we try to preload imports and freeze them in memory to reduce
subsequent loads is correct?

--
Regards,
Aritra Basu

On Thu, 18 Dec 2025, 5:24 pm Jarek Potiuk, <[email protected]> wrote:

I am very much pro exploring several options there - and I think there
are
several approaches that should be tried here. Increased memory usage in
Airflow 3 is a fact that is indisputable. I think we do not yet
understand
all the reasons for that. The COW case that I initially hypothesized
about
(after listening to a few Core.py podcasts where I understood better
how
memory management in Python plays with forking) was very
counter-intuitive.
The fact that in our scenario, garbage collecting can **significantly
increase** memory usage (even many times in your experiments) was
absolutely not obvious. But your experiments, and finding out that as
of
Python 3.7 we even have a mechanism to handle it (gc.freeze) had shown
clearly that this is it

We have seen that in a number of user reports and I think we should
have
a
deliberate effort to find those cases and optimize them. And I think
some
of our changes with isolation and separation of our code had triggered
some
of the memory usage growth, as side-effects - because the impact of
memory
usage in interpreter languages like Python is complex - especially when
we
are using forking as a primary mechanism to implement isolation between
spawned processes. Simply speaking (and for me this was really
discovery
i
made after listening to those "core.py" podcast by core python
developers -
Łukasz Langa and Pablo Galindo - especially all the chapters related to
reference counting, garbage collection, free threading (that for the
future) - it became clear that Python and Forking are not playing very
well
together out-of-the-box and you need to do quite a deliberate effort to
optimise memory usage when you heavily use forking.

But I think the important part here is that we need to do it
step-by-step
and experiment with some approaches and see the actual impact it might
have
- like with the local executor case. *

I think there are two aspects of it:

* Optimizing "airflow-induced" imports

This has been a long standing issue that we were painfully aware of and
we've been discussing fixing it for years. In Airflow 2 I made at
least 2
attempts - unsuccessful - to untangle some of the spaghetti code and
chains
of imports that we had - this was impossible to do without more drastic
moves. And with Airflow 3 we are on a good path to fix those. I
explained
in
https://github.com/apache/airflow/pull/58890#issuecomment-3599373034
-
some of the memory usage effects we observe now will be essentially
**gone** once we complete separation of "task sdk"  from "airflow" -
and
I
would say a lot of this experiments should be performed once we
complete
it, because it will dramatically change the internal import behaviour
and
import chains we observe today will be much shorter, predictable and
less
memory consuming. Also likely we can make some better decisions on when
and
how to freeze gc for forking.

So .. I would simply hold our horses before we complete that part.

* Optimizing "external imports"

This is much more doable **now**. And personally I do not even see a
big
issue with pre-importing external packages and doing the gc.freeze()
dance
in Dag File Processor and Celery Workers. And even reverting what you
proposed: importing all the used packages "by default" unless the user
opts-out. I was thinking a lot about how we can do it and I think I
have
a
general "high level" proposal on how we can do it.

Few assumptions / observations:

* when we are importing from 'installed distribution" (i.e. not from
dags
folder), generally there is **no expectation** that you should be able
to
reload that module. Running a Python interpreter by default will import
such packages only once and we all know that attempting to reload such
modules in general is futile - you need to basically restart the
interpreter to load a new version of such a module. Hot reloading is at
most developer tooling (very useful, but not practical for production).
So
I am not too worried about pre-importing those external modules (as
long
as
we know which modules we **should** import). I think KNOWING what to
import
before the fork is the most important problem to solve.
* for Dag Processor - those forks where memory grows only temporarily
so
the memory growth is at most spiking, not lasting too long - so I am
less
worried about memory but more about performance. And there, I would not
worry too much about OOMs
* for Celery - where "prefork" mode is used by default - this is quite
a
different story, there we can see high memory usage across multiple
pre-forked processes, either coming from COW effect or coming from the
fact
that we are importing the same huge imports in parallel in many
processes
So I (this is again a bit of hypothesising) that we should likely use a
bit
different strategy for DagProcessors and different for Celery workers.

My ideas:

a) for DagProcessors, I think it would be perfectly fine if we just
track
what's being imported in a forked parser. Send the list of modules
imported
from real installed "distributions" (easy to check) to Dag Processor
Manager, and let it import those in its interpreter. Then we can do the
freeze/unfreeze dance before every fork (i.e. before we attempt to
parse
another Dag file). This might have a set of nice properties:
* we do not have to know what to import in advance - so we do not need
to
parse all the Dag files upfront
* the first time a module is imported in a Dag, it will be reported
back
to
the Dag Processor Manager, it will import it, and if we do the
freeze/unfreeze dance - next time when parsing happens, the import will
be
already in memory and not garbage-collectible - so we will not incur
COW
needlessly and we will reuse all that memory from DagFileProcessor
* the effect of it -> initially DagProcessorManager memory will be
smaller,
each parsing will get a spike of memory loaded for import it needs,
then
when it completes, the memory is freed, but DagProcessorManager will
import
the thing, grow it's memory and this will decrease memory and speed up
parsing of all subsequent Dags importing the same modules
* which has a nice smoothening effect, while initially there will be
some
spikes, over time when more and more Dags are parsed and modules
imported,
this will smoothen out and for common imports, the memory used will
**only** be allocated in DagFileProcessor Manager, - the memory will be
shared by the parsing processes - and performance will improve
significantly

b) for Celery workers, I think it would make sense to do something a
bit
more sophisticated but similar. The problem is that those pre-forked
processes might be (and are by default) long running - they are "pre"
forked. And there, we do not know (before we parse all the files) which
modules are actually needed to be pre-imported before we fork them,
This
is
a tough one to solve in generic case - because celery workers generally
do
not parse "all" the Dag files (unlike Dag File Processors) - they
generally
only should parse the particular Dags they execute (which for example
in
case of queues, might be a small subset of Dags). In pre-fork mode,
Celery
will spawn `concurrency` child processes
https://docs.celeryq.dev/en/latest/userguide/workers.html#concurrency
to
run the tasks. Here I think we can use similar strategy as in case of
DagProcessors - i.e. track all imports and get them imported by the
"parent" celery worker, so that next fork is using that, and we could
do
the same freeze/unfreeze dance while forking (I think Celery has hooks
to
allow that) - but only under one condition - that we restart the forked
processes from time to time. Which.... is actually possible for example
with


https://docs.celeryq.dev/en/latest/userguide/configuration.html#std-setting-worker_max_tasks_per_child
- which will restart the process after processing x tasks. So far we
have
not set that config as default (in our docs we explain that you can do
it
with custom configuration) - but if we do this "import gc dance" - this
is
pretty much a prerequisite to make use of such pre-imported modules
that
we
will dynamically import in the "parent" process - otherwise all the
forks
will just run forever and each of them will have a copy of the memory
used
by imports in their process.

Both a) and b) have this nice property that in case of any issues or
new
versions of distributions installed, restart of those will start the
whole
thing from the beginning - the imports will be gradually loaded and the
forks will gradually make use of those - pre-imports done by "parent"
process (until the next restart)

My hypothesis is that if we do that - we can achieve  all three goals:
decrease the memory usage overall, decrease memory spikes over time,
and
increase performance.

But all that needs to be tested of course.

J.








On Thu, Dec 18, 2025 at 10:30 AM Aritra Basu <[email protected]
wrote:

This is quite an interesting read Jeongwoo, I don't yet have a strong
opinion on this except it's worth checking out. I'll reread through a
couple times and hopefully come up with some thoughts, but your
investigation so far looks quite interesting.

--
Regards,
Aritra Basu

On Thu, 18 Dec 2025, 2:08 pm Jeongwoo Do, <[email protected]>
wrote:
Hello Airflow community,

While working on resolving a memory leak issue in the
LocalExecutor[1], I
observed that garbage collection (GC) in forked subprocesses was
triggering
Copy-On-Write (COW) on shared memory, which significantly increased
each
process’s PSS. By using gc.freeze to move objects created at
subprocess
startup into the GC permanent generation, I was able to mitigate
this
issue
effectively.



I would like to propose applying the same approach to the Dag
processor
to
address GC-related performance issues and improve stability in
subprocesses. Below are the expected benefits.

Preventing COW on Shared Memory
Unlike the LocalExecutor, where subprocesses are long-lived, Dag
processor
subprocesses are not permanent. However, with the increasing
adoption
of
dynamic Dags, parsing time has become longer in many cases. GC
activity
during parsing can trigger COW on shared memory, leading to memory
spikes.
In containerized environments, these spikes can result in OOM
events.
Improving GC Performance
Applying gc.freeze marks existing objects as non-GC targets. As a
result,
this greatly lowers the frequency of threshold-based GC runs and
makes
GC
much faster when it does occur. In a simple experiment, I observed
GC
time
dropping from roughly 1 second to about 1 microsecond (with GC
forced
via
gc.collect).

Eliminating GC-Related Issues in Child Processes
Similar to the issue in [2], GC triggered arbitrarily in child
processes
can affect shared objects inherited from the parent. By ensuring
that
parent-owned objects are not subject to GC in children, these
issues
can
be
avoided entirely.



Beyond immediate performance and stability improvements, increased
memory
stability also enables further optimizations. For example,
preloading
heavy
modules in the parent process can eliminate repeated memory loading
in
each
child process. This approach has been discussed previously in [3],
and
preloading Airflow modules is already partially implemented today.

While [3] primarily focused on parsing time, the broader benefit is
reduced CPU and memory usage overall. Extending this idea beyond
Airflow
modules, allowing users to pre-import libraries used in DAG files
could
provide significant performance gains.

That said, it is also clear why this has not been broadly adopted
so
far.
Persistently importing problematic libraries defined in DAG files
could
introduce side effects, and unloading modules once loaded is
difficult.
In
environments with frequent DAG changes, this can become a burden.

For this reason, I believe the optimal approach is to allow
pre-importing
only for explicitly user-approved libraries. Users would define
which
libraries to preload via configuration. These libraries would be
loaded
lazily, and only after they are successfully loaded in a child
process
would they be loaded in the parent process as well. The pre-import
mechanism I proposed recently in [4] may be helpful here.



In summary, I am proposing two items:

1. Apply gc.freeze to the DAG processor.

2.Then, allow user-aware and intentional preloading of libraries.

Thank you for taking the time to read this. If this proposal
requires
an
AIP, I would be happy to prepare one.

[1] https://github.com/apache/airflow/pull/58365
[2] https://github.com/apache/airflow/issues/56879
[3] https://github.com/apache/airflow/pull/30495
[4] https://github.com/apache/airflow/pull/58890

Best Regards, Jeongwoo Do






---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to