This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new baec49a13bb Fix LocalExecutor memory spike by applying gc.freeze 
(#58365)
baec49a13bb is described below

commit baec49a13bb43ceb1725600abe11f3f076937136
Author: Jeongwoo Do <[email protected]>
AuthorDate: Tue Dec 2 19:42:33 2025 +0900

    Fix LocalExecutor memory spike by applying gc.freeze (#58365)
    
    * fix local executor issue caused by cow
    
    * fix test
    
    * fix test
    
    * remove gc utils
    
    * fix test to prevent timeout
    
    * fix tests
    
    * fix tests
    
    * fix tests
---
 .../src/airflow/executors/local_executor.py        | 37 ++++++++++++++++++++--
 .../tests/unit/executors/test_local_executor.py    | 27 ++++++++++++++--
 .../executors/test_local_executor_check_workers.py |  4 +--
 3 files changed, 60 insertions(+), 8 deletions(-)

diff --git a/airflow-core/src/airflow/executors/local_executor.py 
b/airflow-core/src/airflow/executors/local_executor.py
index 83d5ccc9c5d..3bceb8b10ea 100644
--- a/airflow-core/src/airflow/executors/local_executor.py
+++ b/airflow-core/src/airflow/executors/local_executor.py
@@ -145,6 +145,7 @@ class LocalExecutor(BaseExecutor):
     """
 
     is_local: bool = True
+    is_mp_using_fork: bool = multiprocessing.get_start_method() == "fork"
 
     serve_logs: bool = True
 
@@ -166,6 +167,11 @@ class LocalExecutor(BaseExecutor):
         # (it looks like an int to python)
         self._unread_messages = multiprocessing.Value(ctypes.c_uint)
 
+        if self.is_mp_using_fork:
+            # This creates the maximum number of worker processes 
(parallelism) at once
+            # to minimize gc freeze/unfreeze cycles when using fork in 
multiprocessing
+            self._spawn_workers_with_gc_freeze(self.parallelism)
+
     def _check_workers(self):
         # Reap any dead workers
         to_remove = set()
@@ -189,9 +195,14 @@ class LocalExecutor(BaseExecutor):
         # via `sync()` a few times before the spawned process actually starts 
picking up messages. Try not to
         # create too much
         if num_outstanding and len(self.workers) < self.parallelism:
-            # This only creates one worker, which is fine as we call this 
directly after putting a message on
-            # activity_queue in execute_async
-            self._spawn_worker()
+            if self.is_mp_using_fork:
+                # This creates the maximum number of worker processes at once
+                # to minimize gc freeze/unfreeze cycles when using fork in 
multiprocessing
+                self._spawn_workers_with_gc_freeze(self.parallelism - 
len(self.workers))
+            else:
+                # This only creates one worker, which is fine as we call this 
directly after putting a message on
+                # activity_queue in execute_async when using spawn in 
multiprocessing
+                self._spawn_worker()
 
     def _spawn_worker(self):
         p = multiprocessing.Process(
@@ -208,6 +219,26 @@ class LocalExecutor(BaseExecutor):
             assert p.pid  # Since we've called start
         self.workers[p.pid] = p
 
+    def _spawn_workers_with_gc_freeze(self, spawn_number):
+        """
+        Freeze the GC before forking worker process and unfreeze it after 
forking.
+
+        This is done to prevent memory increase due to COW (Copy-on-Write) by 
moving all
+        existing objects to the permanent generation before forking the 
process. After forking,
+        unfreeze is called to ensure there is no impact on gc operations
+        in the original running process.
+
+        Ref: https://docs.python.org/3/library/gc.html#gc.freeze
+        """
+        import gc
+
+        gc.freeze()
+        try:
+            for _ in range(spawn_number):
+                self._spawn_worker()
+        finally:
+            gc.unfreeze()
+
     def sync(self) -> None:
         """Sync will get called periodically by the heartbeat method."""
         self._read_results()
diff --git a/airflow-core/tests/unit/executors/test_local_executor.py 
b/airflow-core/tests/unit/executors/test_local_executor.py
index b0261baf04e..5f0420cf980 100644
--- a/airflow-core/tests/unit/executors/test_local_executor.py
+++ b/airflow-core/tests/unit/executors/test_local_executor.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+import gc
 import multiprocessing
 import os
 from unittest import mock
@@ -44,6 +45,11 @@ skip_spawn_mp_start = pytest.mark.skipif(
 
 
 class TestLocalExecutor:
+    """
+    When the executor is started, end() must be called before the test 
finishes.
+    Otherwise, subprocesses will remain running, preventing the test from 
terminating and causing a timeout.
+    """
+
     TEST_SUCCESS_COMMANDS = 5
 
     def test_sentry_integration(self):
@@ -55,6 +61,20 @@ class TestLocalExecutor:
     def test_serve_logs_default_value(self):
         assert LocalExecutor.serve_logs
 
+    @skip_spawn_mp_start
+    @mock.patch.object(gc, "unfreeze")
+    @mock.patch.object(gc, "freeze")
+    def test_executor_worker_spawned(self, mock_freeze, mock_unfreeze):
+        executor = LocalExecutor(parallelism=5)
+        executor.start()
+
+        mock_freeze.assert_called_once()
+        mock_unfreeze.assert_called_once()
+
+        assert len(executor.workers) == 5
+
+        executor.end()
+
     @skip_spawn_mp_start
     @mock.patch("airflow.sdk.execution_time.supervisor.supervise")
     def test_execution(self, mock_supervise):
@@ -86,11 +106,12 @@ class TestLocalExecutor:
         mock_supervise.side_effect = fake_supervise
 
         executor = LocalExecutor(parallelism=2)
-        executor.start()
-
-        assert executor.result_queue.empty()
 
         with spy_on(executor._spawn_worker) as spawn_worker:
+            executor.start()
+
+            assert executor.result_queue.empty()
+
             for ti in success_tis:
                 executor.queue_workload(
                     workloads.ExecuteTask(
diff --git 
a/airflow-core/tests/unit/executors/test_local_executor_check_workers.py 
b/airflow-core/tests/unit/executors/test_local_executor_check_workers.py
index b0adfe5b9e5..557ff4bbcbf 100644
--- a/airflow-core/tests/unit/executors/test_local_executor_check_workers.py
+++ b/airflow-core/tests/unit/executors/test_local_executor_check_workers.py
@@ -101,7 +101,7 @@ def test_spawn_worker_when_needed(setup_executor):
     executor.activity_queue.empty.return_value = False
     executor.workers = {}
     executor._check_workers()
-    executor._spawn_worker.assert_called_once()
+    executor._spawn_worker.assert_called()
 
 
 def test_no_spawn_if_parallelism_reached(setup_executor):
@@ -133,4 +133,4 @@ def 
test_spawn_worker_when_we_have_parallelism_left(setup_executor):
     executor.activity_queue.empty.return_value = False
     executor._spawn_worker.reset_mock()
     executor._check_workers()
-    executor._spawn_worker.assert_called_once()
+    executor._spawn_worker.assert_called()

Reply via email to