Ack, I think that makes sense. Let's get some of the work done and then
maybe we can try getting the data as well.

--
Regards,
Aritra Basu

On Thu, 18 Dec 2025, 6:47 pm Jarek Potiuk, <[email protected]> wrote:

> > Jarek do you think we should try to get some telemetry on the size/count
> of
> imports that are loaded, the amount of reuse/overlap we have, etc?
>
> Yes, but not necessarily telemetry and not necessarily to start with it. I
> think initially we need more like a good set of test dags with various
> imports that might simulate various scenarios. This will be way easier to
> pull off - and this is what Jeongwoo already did when analysing the COW
> with Local Executor. Then yes  - we could gather some
> "telemetry" information (not automatically but we could ask our users to do
> some experimenting on their dags - with the new ray feature, also courtesy
> of Jeongwoo). But asking for it first will not let us experiment with
> observed gains when we experiment with fixes. It could be useful later - to
> ask our "active" users to run some experiments when we enable any
> improvements we come up with (undoubtedly those should be opt-in features
> initially) - but I would say we need to have some good understanding of
> what we can gain by trying it out locally first.
>
> > Think one off dags importing something ginormous? And additionally
> it seems like a non trivial change so some data to validate our thoughts
> would put my mind at ease.
>
> I think more of a compound effect of importing the same thing by multiple
> processes. If you look at the local executor fixes achieved by
> https://github.com/apache/airflow/pull/58365  - by avoiding the COWs, the
> memory used "per-executor-process" are enormous: 114 M per-process vs 39 M
> per-process is HUGE. Basically it means that in the tests done by Jeongwoo
> - if you have a local executor with 20 processes, you save 20*70M => 1.4 GB
> of memory - simply because most of the memory remains shared between all
> those processes instead of being essentially copied. Not mentioning the
> performance of all those memory copy operations incurred by COW.
>
> This will be similar for Celery Executor. Currently - because we allow the
> workers to run with - even high - concurrency, what is happening in celery
> workers is that those preforked processes that are long-running are
> essentially keeping a copy of almost exactly the same memory resulting from
> importing some heavy libraries. Note - that it's not just **imported
> modules** as such - but ALL the memory those imported modules are
> allocating on heap when they are initialized. Many, many python modules
> allocate memory for something - classes, they import, pre-loading some
> data, preparing some in-memory structure so that you avoid doing I/O
> operations and use in-memory structure to speed up lookups. Essentially - a
> lot of memory is read-only when initialized, but the way how garbage
> collection works, and the fact that such memory is usually fragmented, the
> issue is that garbage collection will essentially - over time - copy all
> that read-only memory in forked processes (unless you do this
> gc.freeze/unfreeze dance with prevents that) . Essentially - in the local
> executor tests done by Jeongwoo - 70 M out of 114 M used by a forked
> process is this kind of memory - shared memory that is read-only that is
> copied because of garbage collector. -> that's almost 65% saving per
> process. We have 16 as default concurrency in celery now and "prefork" is
> the default setting - and we have "long" running forks - we do not have
> "max_tasks_per_child" set, nor any other "recycling" of celery worker
> processes, so we can safely assume that in vast majority of our
> installations - all the workers have essentially 16x copy of the memory
> that could be shared because gc process would not COW it. This is a HUGE
> saving if we can decrease default memory usage of pretty much all
> installations there. Assuming that it's even just 50% of memory, this means
> that those workers currently use 8x more memory than they need. Not 20 or
> 30 percent more, but 8x more (!) - which is 800%.
>
> So the memory gains should be **huge** for celery executor IMHO.
>
> The memory gains for Dag File Processor will not be as big - because there
> are no long-running processes and we have usually smaller concurrency (2 is
> the default but of course in huge installations this goes up) - there will
> only be spikes of memory, not sustained usage, there - the main gain is
> performance, and actually this is quite important once, because same dags
> are continuously parsed and re-parsed, and if the approach with import
> optimisation will work as hypothesised, this means that all the import
> savings will be multiplied by a number of time dags are parsed. Even more -
> the same imports used in different dags will also be optimized away.
> Imagine you have 1000 dags parsed every 5 minutes (safe assumption -
> normally you want parsing to be done more frequently) and you save just 1s
> for the import (which is not something unlikely - pandas import can easily
> takes seconds), then you save 200s build-time seconds (2.5 minutes) per 5
> minutes of parsing. That's a lot of CPU time saved. Like... A LOT.
>
> Those are the "back-of-the-envelope" calculations I can hypothesise about,
> but IMHO - they are quite close to real gains we can observe. So ... if we
> think "is it worth trying", my answer is - definitely yes. But we should
> see some real numbers and results of those experiments, before we actually
> release it to the users, and this could be initially opt-in, and we could
> ask some friendly users to test it out in "reality" and bring back the real
> numbers and stability feedback, and then we could decide to make it
> "opt-out".
>
> J.
>
>
>
> On Thu, Dec 18, 2025 at 1:25 PM Aritra Basu <[email protected]>
> wrote:
>
> > Jarek do you think we should try to get some telemetry on the size/count
> of
> > imports that are loaded, the amount of reuse/overlap we have, etc?
> Because
> > while this sounds good and might help, I fear we might in this aim to
> > reduce our memory footprint we might be overloading (no pun intended) the
> > process? Think one off dags importing something ginormous? And
> additionally
> > it seems like a non trivial change so some data to validate our thoughts
> > would put my mind at ease.
> >
> > This is all based on the assumption that what I understood from the
> thread
> > that we try to preload imports and freeze them in memory to reduce
> > subsequent loads is correct?
> >
> > --
> > Regards,
> > Aritra Basu
> >
> > On Thu, 18 Dec 2025, 5:24 pm Jarek Potiuk, <[email protected]> wrote:
> >
> > > I am very much pro exploring several options there - and I think there
> > are
> > > several approaches that should be tried here. Increased memory usage in
> > > Airflow 3 is a fact that is indisputable. I think we do not yet
> > understand
> > > all the reasons for that. The COW case that I initially hypothesized
> > about
> > > (after listening to a few Core.py podcasts where I understood better
> how
> > > memory management in Python plays with forking) was very
> > counter-intuitive.
> > > The fact that in our scenario, garbage collecting can **significantly
> > > increase** memory usage (even many times in your experiments) was
> > > absolutely not obvious. But your experiments, and finding out that as
> of
> > > Python 3.7 we even have a mechanism to handle it (gc.freeze) had shown
> > > clearly that this is it
> > >
> > > We have seen that in a number of user reports and I think we should
> have
> > a
> > > deliberate effort to find those cases and optimize them. And I think
> some
> > > of our changes with isolation and separation of our code had triggered
> > some
> > > of the memory usage growth, as side-effects - because the impact of
> > memory
> > > usage in interpreter languages like Python is complex - especially when
> > we
> > > are using forking as a primary mechanism to implement isolation between
> > > spawned processes. Simply speaking (and for me this was really
> discovery
> > i
> > > made after listening to those "core.py" podcast by core python
> > developers -
> > > Łukasz Langa and Pablo Galindo - especially all the chapters related to
> > > reference counting, garbage collection, free threading (that for the
> > > future) - it became clear that Python and Forking are not playing very
> > well
> > > together out-of-the-box and you need to do quite a deliberate effort to
> > > optimise memory usage when you heavily use forking.
> > >
> > > But I think the important part here is that we need to do it
> step-by-step
> > > and experiment with some approaches and see the actual impact it might
> > have
> > > - like with the local executor case. *
> > >
> > > I think there are two aspects of it:
> > >
> > > * Optimizing "airflow-induced" imports
> > >
> > > This has been a long standing issue that we were painfully aware of and
> > > we've been discussing fixing it for years. In Airflow 2 I made at
> least 2
> > > attempts - unsuccessful - to untangle some of the spaghetti code and
> > chains
> > > of imports that we had - this was impossible to do without more drastic
> > > moves. And with Airflow 3 we are on a good path to fix those. I
> explained
> > > in
> https://github.com/apache/airflow/pull/58890#issuecomment-3599373034
> > -
> > > some of the memory usage effects we observe now will be essentially
> > > **gone** once we complete separation of "task sdk"  from "airflow" -
> and
> > I
> > > would say a lot of this experiments should be performed once we
> complete
> > > it, because it will dramatically change the internal import behaviour
> and
> > > import chains we observe today will be much shorter, predictable and
> less
> > > memory consuming. Also likely we can make some better decisions on when
> > and
> > > how to freeze gc for forking.
> > >
> > > So .. I would simply hold our horses before we complete that part.
> > >
> > > * Optimizing "external imports"
> > >
> > > This is much more doable **now**. And personally I do not even see a
> big
> > > issue with pre-importing external packages and doing the gc.freeze()
> > dance
> > > in Dag File Processor and Celery Workers. And even reverting what you
> > > proposed: importing all the used packages "by default" unless the user
> > > opts-out. I was thinking a lot about how we can do it and I think I
> have
> > a
> > > general "high level" proposal on how we can do it.
> > >
> > > Few assumptions / observations:
> > >
> > > * when we are importing from 'installed distribution" (i.e. not from
> dags
> > > folder), generally there is **no expectation** that you should be able
> to
> > > reload that module. Running a Python interpreter by default will import
> > > such packages only once and we all know that attempting to reload such
> > > modules in general is futile - you need to basically restart the
> > > interpreter to load a new version of such a module. Hot reloading is at
> > > most developer tooling (very useful, but not practical for production).
> > So
> > > I am not too worried about pre-importing those external modules (as
> long
> > as
> > > we know which modules we **should** import). I think KNOWING what to
> > import
> > > before the fork is the most important problem to solve.
> > > * for Dag Processor - those forks where memory grows only temporarily
> so
> > > the memory growth is at most spiking, not lasting too long - so I am
> less
> > > worried about memory but more about performance. And there, I would not
> > > worry too much about OOMs
> > > * for Celery - where "prefork" mode is used by default - this is quite
> a
> > > different story, there we can see high memory usage across multiple
> > > pre-forked processes, either coming from COW effect or coming from the
> > fact
> > > that we are importing the same huge imports in parallel in many
> processes
> > >
> > > So I (this is again a bit of hypothesising) that we should likely use a
> > bit
> > > different strategy for DagProcessors and different for Celery workers.
> > >
> > > My ideas:
> > >
> > > a) for DagProcessors, I think it would be perfectly fine if we just
> track
> > > what's being imported in a forked parser. Send the list of modules
> > imported
> > > from real installed "distributions" (easy to check) to Dag Processor
> > > Manager, and let it import those in its interpreter. Then we can do the
> > > freeze/unfreeze dance before every fork (i.e. before we attempt to
> parse
> > > another Dag file). This might have a set of nice properties:
> > > * we do not have to know what to import in advance - so we do not need
> to
> > > parse all the Dag files upfront
> > > * the first time a module is imported in a Dag, it will be reported
> back
> > to
> > > the Dag Processor Manager, it will import it, and if we do the
> > > freeze/unfreeze dance - next time when parsing happens, the import will
> > be
> > > already in memory and not garbage-collectible - so we will not incur
> COW
> > > needlessly and we will reuse all that memory from DagFileProcessor
> > > * the effect of it -> initially DagProcessorManager memory will be
> > smaller,
> > > each parsing will get a spike of memory loaded for import it needs,
> then
> > > when it completes, the memory is freed, but DagProcessorManager will
> > import
> > > the thing, grow it's memory and this will decrease memory and speed up
> > > parsing of all subsequent Dags importing the same modules
> > > * which has a nice smoothening effect, while initially there will be
> some
> > > spikes, over time when more and more Dags are parsed and modules
> > imported,
> > > this will smoothen out and for common imports, the memory used will
> > > **only** be allocated in DagFileProcessor Manager, - the memory will be
> > > shared by the parsing processes - and performance will improve
> > > significantly
> > >
> > > b) for Celery workers, I think it would make sense to do something a
> bit
> > > more sophisticated but similar. The problem is that those pre-forked
> > > processes might be (and are by default) long running - they are "pre"
> > > forked. And there, we do not know (before we parse all the files) which
> > > modules are actually needed to be pre-imported before we fork them,
> This
> > is
> > > a tough one to solve in generic case - because celery workers generally
> > do
> > > not parse "all" the Dag files (unlike Dag File Processors) - they
> > generally
> > > only should parse the particular Dags they execute (which for example
> in
> > > case of queues, might be a small subset of Dags). In pre-fork mode,
> > Celery
> > > will spawn `concurrency` child processes
> > > https://docs.celeryq.dev/en/latest/userguide/workers.html#concurrency
> to
> > > run the tasks. Here I think we can use similar strategy as in case of
> > > DagProcessors - i.e. track all imports and get them imported by the
> > > "parent" celery worker, so that next fork is using that, and we could
> do
> > > the same freeze/unfreeze dance while forking (I think Celery has hooks
> to
> > > allow that) - but only under one condition - that we restart the forked
> > > processes from time to time. Which.... is actually possible for example
> > > with
> > >
> > >
> >
> https://docs.celeryq.dev/en/latest/userguide/configuration.html#std-setting-worker_max_tasks_per_child
> > > - which will restart the process after processing x tasks. So far we
> have
> > > not set that config as default (in our docs we explain that you can do
> it
> > > with custom configuration) - but if we do this "import gc dance" - this
> > is
> > > pretty much a prerequisite to make use of such pre-imported modules
> that
> > we
> > > will dynamically import in the "parent" process - otherwise all the
> forks
> > > will just run forever and each of them will have a copy of the memory
> > used
> > > by imports in their process.
> > >
> > > Both a) and b) have this nice property that in case of any issues or
> new
> > > versions of distributions installed, restart of those will start the
> > whole
> > > thing from the beginning - the imports will be gradually loaded and the
> > > forks will gradually make use of those - pre-imports done by "parent"
> > > process (until the next restart)
> > >
> > > My hypothesis is that if we do that - we can achieve  all three goals:
> > > decrease the memory usage overall, decrease memory spikes over time,
> and
> > > increase performance.
> > >
> > > But all that needs to be tested of course.
> > >
> > > J.
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Thu, Dec 18, 2025 at 10:30 AM Aritra Basu <[email protected]
> >
> > > wrote:
> > >
> > > > This is quite an interesting read Jeongwoo, I don't yet have a strong
> > > > opinion on this except it's worth checking out. I'll reread through a
> > > > couple times and hopefully come up with some thoughts, but your
> > > > investigation so far looks quite interesting.
> > > >
> > > > --
> > > > Regards,
> > > > Aritra Basu
> > > >
> > > > On Thu, 18 Dec 2025, 2:08 pm Jeongwoo Do, <[email protected]>
> > wrote:
> > > >
> > > > > Hello Airflow community,
> > > > >
> > > > > While working on resolving a memory leak issue in the
> > > LocalExecutor[1], I
> > > > > observed that garbage collection (GC) in forked subprocesses was
> > > > triggering
> > > > > Copy-On-Write (COW) on shared memory, which significantly increased
> > > each
> > > > > process’s PSS. By using gc.freeze to move objects created at
> > subprocess
> > > > > startup into the GC permanent generation, I was able to mitigate
> this
> > > > issue
> > > > > effectively.
> > > > >
> > > > >
> > > > >
> > > > > I would like to propose applying the same approach to the Dag
> > processor
> > > > to
> > > > > address GC-related performance issues and improve stability in
> > > > > subprocesses. Below are the expected benefits.
> > > > >
> > > > > Preventing COW on Shared Memory
> > > > > Unlike the LocalExecutor, where subprocesses are long-lived, Dag
> > > > processor
> > > > > subprocesses are not permanent. However, with the increasing
> adoption
> > > of
> > > > > dynamic Dags, parsing time has become longer in many cases. GC
> > activity
> > > > > during parsing can trigger COW on shared memory, leading to memory
> > > > spikes.
> > > > > In containerized environments, these spikes can result in OOM
> events.
> > > > >
> > > > > Improving GC Performance
> > > > > Applying gc.freeze marks existing objects as non-GC targets. As a
> > > result,
> > > > > this greatly lowers the frequency of threshold-based GC runs and
> > makes
> > > GC
> > > > > much faster when it does occur. In a simple experiment, I observed
> GC
> > > > time
> > > > > dropping from roughly 1 second to about 1 microsecond (with GC
> forced
> > > via
> > > > > gc.collect).
> > > > >
> > > > > Eliminating GC-Related Issues in Child Processes
> > > > > Similar to the issue in [2], GC triggered arbitrarily in child
> > > processes
> > > > > can affect shared objects inherited from the parent. By ensuring
> that
> > > > > parent-owned objects are not subject to GC in children, these
> issues
> > > can
> > > > be
> > > > > avoided entirely.
> > > > >
> > > > >
> > > > >
> > > > > Beyond immediate performance and stability improvements, increased
> > > memory
> > > > > stability also enables further optimizations. For example,
> preloading
> > > > heavy
> > > > > modules in the parent process can eliminate repeated memory loading
> > in
> > > > each
> > > > > child process. This approach has been discussed previously in [3],
> > and
> > > > > preloading Airflow modules is already partially implemented today.
> > > > >
> > > > > While [3] primarily focused on parsing time, the broader benefit is
> > > > > reduced CPU and memory usage overall. Extending this idea beyond
> > > Airflow
> > > > > modules, allowing users to pre-import libraries used in DAG files
> > could
> > > > > provide significant performance gains.
> > > > >
> > > > > That said, it is also clear why this has not been broadly adopted
> so
> > > far.
> > > > > Persistently importing problematic libraries defined in DAG files
> > could
> > > > > introduce side effects, and unloading modules once loaded is
> > difficult.
> > > > In
> > > > > environments with frequent DAG changes, this can become a burden.
> > > > >
> > > > > For this reason, I believe the optimal approach is to allow
> > > pre-importing
> > > > > only for explicitly user-approved libraries. Users would define
> which
> > > > > libraries to preload via configuration. These libraries would be
> > loaded
> > > > > lazily, and only after they are successfully loaded in a child
> > process
> > > > > would they be loaded in the parent process as well. The pre-import
> > > > > mechanism I proposed recently in [4] may be helpful here.
> > > > >
> > > > >
> > > > >
> > > > > In summary, I am proposing two items:
> > > > >
> > > > > 1. Apply gc.freeze to the DAG processor.
> > > > >
> > > > > 2.Then, allow user-aware and intentional preloading of libraries.
> > > > >
> > > > > Thank you for taking the time to read this. If this proposal
> requires
> > > an
> > > > > AIP, I would be happy to prepare one.
> > > > >
> > > > > [1] https://github.com/apache/airflow/pull/58365
> > > > > [2] https://github.com/apache/airflow/issues/56879
> > > > > [3] https://github.com/apache/airflow/pull/30495
> > > > > [4] https://github.com/apache/airflow/pull/58890
> > > > >
> > > > > Best Regards, Jeongwoo Do
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to