Jarek do you think we should try to get some telemetry on the size/count of imports that are loaded, the amount of reuse/overlap we have, etc? Because while this sounds good and might help, I fear we might in this aim to reduce our memory footprint we might be overloading (no pun intended) the process? Think one off dags importing something ginormous? And additionally it seems like a non trivial change so some data to validate our thoughts would put my mind at ease.
This is all based on the assumption that what I understood from the thread that we try to preload imports and freeze them in memory to reduce subsequent loads is correct? -- Regards, Aritra Basu On Thu, 18 Dec 2025, 5:24 pm Jarek Potiuk, <[email protected]> wrote: > I am very much pro exploring several options there - and I think there are > several approaches that should be tried here. Increased memory usage in > Airflow 3 is a fact that is indisputable. I think we do not yet understand > all the reasons for that. The COW case that I initially hypothesized about > (after listening to a few Core.py podcasts where I understood better how > memory management in Python plays with forking) was very counter-intuitive. > The fact that in our scenario, garbage collecting can **significantly > increase** memory usage (even many times in your experiments) was > absolutely not obvious. But your experiments, and finding out that as of > Python 3.7 we even have a mechanism to handle it (gc.freeze) had shown > clearly that this is it > > We have seen that in a number of user reports and I think we should have a > deliberate effort to find those cases and optimize them. And I think some > of our changes with isolation and separation of our code had triggered some > of the memory usage growth, as side-effects - because the impact of memory > usage in interpreter languages like Python is complex - especially when we > are using forking as a primary mechanism to implement isolation between > spawned processes. Simply speaking (and for me this was really discovery i > made after listening to those "core.py" podcast by core python developers - > Łukasz Langa and Pablo Galindo - especially all the chapters related to > reference counting, garbage collection, free threading (that for the > future) - it became clear that Python and Forking are not playing very well > together out-of-the-box and you need to do quite a deliberate effort to > optimise memory usage when you heavily use forking. > > But I think the important part here is that we need to do it step-by-step > and experiment with some approaches and see the actual impact it might have > - like with the local executor case. * > > I think there are two aspects of it: > > * Optimizing "airflow-induced" imports > > This has been a long standing issue that we were painfully aware of and > we've been discussing fixing it for years. In Airflow 2 I made at least 2 > attempts - unsuccessful - to untangle some of the spaghetti code and chains > of imports that we had - this was impossible to do without more drastic > moves. And with Airflow 3 we are on a good path to fix those. I explained > in https://github.com/apache/airflow/pull/58890#issuecomment-3599373034 - > some of the memory usage effects we observe now will be essentially > **gone** once we complete separation of "task sdk" from "airflow" - and I > would say a lot of this experiments should be performed once we complete > it, because it will dramatically change the internal import behaviour and > import chains we observe today will be much shorter, predictable and less > memory consuming. Also likely we can make some better decisions on when and > how to freeze gc for forking. > > So .. I would simply hold our horses before we complete that part. > > * Optimizing "external imports" > > This is much more doable **now**. And personally I do not even see a big > issue with pre-importing external packages and doing the gc.freeze() dance > in Dag File Processor and Celery Workers. And even reverting what you > proposed: importing all the used packages "by default" unless the user > opts-out. I was thinking a lot about how we can do it and I think I have a > general "high level" proposal on how we can do it. > > Few assumptions / observations: > > * when we are importing from 'installed distribution" (i.e. not from dags > folder), generally there is **no expectation** that you should be able to > reload that module. Running a Python interpreter by default will import > such packages only once and we all know that attempting to reload such > modules in general is futile - you need to basically restart the > interpreter to load a new version of such a module. Hot reloading is at > most developer tooling (very useful, but not practical for production). So > I am not too worried about pre-importing those external modules (as long as > we know which modules we **should** import). I think KNOWING what to import > before the fork is the most important problem to solve. > * for Dag Processor - those forks where memory grows only temporarily so > the memory growth is at most spiking, not lasting too long - so I am less > worried about memory but more about performance. And there, I would not > worry too much about OOMs > * for Celery - where "prefork" mode is used by default - this is quite a > different story, there we can see high memory usage across multiple > pre-forked processes, either coming from COW effect or coming from the fact > that we are importing the same huge imports in parallel in many processes > > So I (this is again a bit of hypothesising) that we should likely use a bit > different strategy for DagProcessors and different for Celery workers. > > My ideas: > > a) for DagProcessors, I think it would be perfectly fine if we just track > what's being imported in a forked parser. Send the list of modules imported > from real installed "distributions" (easy to check) to Dag Processor > Manager, and let it import those in its interpreter. Then we can do the > freeze/unfreeze dance before every fork (i.e. before we attempt to parse > another Dag file). This might have a set of nice properties: > * we do not have to know what to import in advance - so we do not need to > parse all the Dag files upfront > * the first time a module is imported in a Dag, it will be reported back to > the Dag Processor Manager, it will import it, and if we do the > freeze/unfreeze dance - next time when parsing happens, the import will be > already in memory and not garbage-collectible - so we will not incur COW > needlessly and we will reuse all that memory from DagFileProcessor > * the effect of it -> initially DagProcessorManager memory will be smaller, > each parsing will get a spike of memory loaded for import it needs, then > when it completes, the memory is freed, but DagProcessorManager will import > the thing, grow it's memory and this will decrease memory and speed up > parsing of all subsequent Dags importing the same modules > * which has a nice smoothening effect, while initially there will be some > spikes, over time when more and more Dags are parsed and modules imported, > this will smoothen out and for common imports, the memory used will > **only** be allocated in DagFileProcessor Manager, - the memory will be > shared by the parsing processes - and performance will improve > significantly > > b) for Celery workers, I think it would make sense to do something a bit > more sophisticated but similar. The problem is that those pre-forked > processes might be (and are by default) long running - they are "pre" > forked. And there, we do not know (before we parse all the files) which > modules are actually needed to be pre-imported before we fork them, This is > a tough one to solve in generic case - because celery workers generally do > not parse "all" the Dag files (unlike Dag File Processors) - they generally > only should parse the particular Dags they execute (which for example in > case of queues, might be a small subset of Dags). In pre-fork mode, Celery > will spawn `concurrency` child processes > https://docs.celeryq.dev/en/latest/userguide/workers.html#concurrency to > run the tasks. Here I think we can use similar strategy as in case of > DagProcessors - i.e. track all imports and get them imported by the > "parent" celery worker, so that next fork is using that, and we could do > the same freeze/unfreeze dance while forking (I think Celery has hooks to > allow that) - but only under one condition - that we restart the forked > processes from time to time. Which.... is actually possible for example > with > > https://docs.celeryq.dev/en/latest/userguide/configuration.html#std-setting-worker_max_tasks_per_child > - which will restart the process after processing x tasks. So far we have > not set that config as default (in our docs we explain that you can do it > with custom configuration) - but if we do this "import gc dance" - this is > pretty much a prerequisite to make use of such pre-imported modules that we > will dynamically import in the "parent" process - otherwise all the forks > will just run forever and each of them will have a copy of the memory used > by imports in their process. > > Both a) and b) have this nice property that in case of any issues or new > versions of distributions installed, restart of those will start the whole > thing from the beginning - the imports will be gradually loaded and the > forks will gradually make use of those - pre-imports done by "parent" > process (until the next restart) > > My hypothesis is that if we do that - we can achieve all three goals: > decrease the memory usage overall, decrease memory spikes over time, and > increase performance. > > But all that needs to be tested of course. > > J. > > > > > > > > > On Thu, Dec 18, 2025 at 10:30 AM Aritra Basu <[email protected]> > wrote: > > > This is quite an interesting read Jeongwoo, I don't yet have a strong > > opinion on this except it's worth checking out. I'll reread through a > > couple times and hopefully come up with some thoughts, but your > > investigation so far looks quite interesting. > > > > -- > > Regards, > > Aritra Basu > > > > On Thu, 18 Dec 2025, 2:08 pm Jeongwoo Do, <[email protected]> wrote: > > > > > Hello Airflow community, > > > > > > While working on resolving a memory leak issue in the > LocalExecutor[1], I > > > observed that garbage collection (GC) in forked subprocesses was > > triggering > > > Copy-On-Write (COW) on shared memory, which significantly increased > each > > > process’s PSS. By using gc.freeze to move objects created at subprocess > > > startup into the GC permanent generation, I was able to mitigate this > > issue > > > effectively. > > > > > > > > > > > > I would like to propose applying the same approach to the Dag processor > > to > > > address GC-related performance issues and improve stability in > > > subprocesses. Below are the expected benefits. > > > > > > Preventing COW on Shared Memory > > > Unlike the LocalExecutor, where subprocesses are long-lived, Dag > > processor > > > subprocesses are not permanent. However, with the increasing adoption > of > > > dynamic Dags, parsing time has become longer in many cases. GC activity > > > during parsing can trigger COW on shared memory, leading to memory > > spikes. > > > In containerized environments, these spikes can result in OOM events. > > > > > > Improving GC Performance > > > Applying gc.freeze marks existing objects as non-GC targets. As a > result, > > > this greatly lowers the frequency of threshold-based GC runs and makes > GC > > > much faster when it does occur. In a simple experiment, I observed GC > > time > > > dropping from roughly 1 second to about 1 microsecond (with GC forced > via > > > gc.collect). > > > > > > Eliminating GC-Related Issues in Child Processes > > > Similar to the issue in [2], GC triggered arbitrarily in child > processes > > > can affect shared objects inherited from the parent. By ensuring that > > > parent-owned objects are not subject to GC in children, these issues > can > > be > > > avoided entirely. > > > > > > > > > > > > Beyond immediate performance and stability improvements, increased > memory > > > stability also enables further optimizations. For example, preloading > > heavy > > > modules in the parent process can eliminate repeated memory loading in > > each > > > child process. This approach has been discussed previously in [3], and > > > preloading Airflow modules is already partially implemented today. > > > > > > While [3] primarily focused on parsing time, the broader benefit is > > > reduced CPU and memory usage overall. Extending this idea beyond > Airflow > > > modules, allowing users to pre-import libraries used in DAG files could > > > provide significant performance gains. > > > > > > That said, it is also clear why this has not been broadly adopted so > far. > > > Persistently importing problematic libraries defined in DAG files could > > > introduce side effects, and unloading modules once loaded is difficult. > > In > > > environments with frequent DAG changes, this can become a burden. > > > > > > For this reason, I believe the optimal approach is to allow > pre-importing > > > only for explicitly user-approved libraries. Users would define which > > > libraries to preload via configuration. These libraries would be loaded > > > lazily, and only after they are successfully loaded in a child process > > > would they be loaded in the parent process as well. The pre-import > > > mechanism I proposed recently in [4] may be helpful here. > > > > > > > > > > > > In summary, I am proposing two items: > > > > > > 1. Apply gc.freeze to the DAG processor. > > > > > > 2.Then, allow user-aware and intentional preloading of libraries. > > > > > > Thank you for taking the time to read this. If this proposal requires > an > > > AIP, I would be happy to prepare one. > > > > > > [1] https://github.com/apache/airflow/pull/58365 > > > [2] https://github.com/apache/airflow/issues/56879 > > > [3] https://github.com/apache/airflow/pull/30495 > > > [4] https://github.com/apache/airflow/pull/58890 > > > > > > Best Regards, Jeongwoo Do > > > > > > > > > > > > > > > > > >
