This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new baec49a13bb Fix LocalExecutor memory spike by applying gc.freeze
(#58365)
baec49a13bb is described below
commit baec49a13bb43ceb1725600abe11f3f076937136
Author: Jeongwoo Do <[email protected]>
AuthorDate: Tue Dec 2 19:42:33 2025 +0900
Fix LocalExecutor memory spike by applying gc.freeze (#58365)
* fix local executor issue caused by cow
* fix test
* fix test
* remove gc utils
* fix test to prevent timeout
* fix tests
* fix tests
* fix tests
---
.../src/airflow/executors/local_executor.py | 37 ++++++++++++++++++++--
.../tests/unit/executors/test_local_executor.py | 27 ++++++++++++++--
.../executors/test_local_executor_check_workers.py | 4 +--
3 files changed, 60 insertions(+), 8 deletions(-)
diff --git a/airflow-core/src/airflow/executors/local_executor.py
b/airflow-core/src/airflow/executors/local_executor.py
index 83d5ccc9c5d..3bceb8b10ea 100644
--- a/airflow-core/src/airflow/executors/local_executor.py
+++ b/airflow-core/src/airflow/executors/local_executor.py
@@ -145,6 +145,7 @@ class LocalExecutor(BaseExecutor):
"""
is_local: bool = True
+ is_mp_using_fork: bool = multiprocessing.get_start_method() == "fork"
serve_logs: bool = True
@@ -166,6 +167,11 @@ class LocalExecutor(BaseExecutor):
# (it looks like an int to python)
self._unread_messages = multiprocessing.Value(ctypes.c_uint)
+ if self.is_mp_using_fork:
+ # This creates the maximum number of worker processes
(parallelism) at once
+ # to minimize gc freeze/unfreeze cycles when using fork in
multiprocessing
+ self._spawn_workers_with_gc_freeze(self.parallelism)
+
def _check_workers(self):
# Reap any dead workers
to_remove = set()
@@ -189,9 +195,14 @@ class LocalExecutor(BaseExecutor):
# via `sync()` a few times before the spawned process actually starts
picking up messages. Try not to
# create too much
if num_outstanding and len(self.workers) < self.parallelism:
- # This only creates one worker, which is fine as we call this
directly after putting a message on
- # activity_queue in execute_async
- self._spawn_worker()
+ if self.is_mp_using_fork:
+ # This creates the maximum number of worker processes at once
+ # to minimize gc freeze/unfreeze cycles when using fork in
multiprocessing
+ self._spawn_workers_with_gc_freeze(self.parallelism -
len(self.workers))
+ else:
+ # This only creates one worker, which is fine as we call this
directly after putting a message on
+ # activity_queue in execute_async when using spawn in
multiprocessing
+ self._spawn_worker()
def _spawn_worker(self):
p = multiprocessing.Process(
@@ -208,6 +219,26 @@ class LocalExecutor(BaseExecutor):
assert p.pid # Since we've called start
self.workers[p.pid] = p
+ def _spawn_workers_with_gc_freeze(self, spawn_number):
+ """
+ Freeze the GC before forking worker process and unfreeze it after
forking.
+
+ This is done to prevent memory increase due to COW (Copy-on-Write) by
moving all
+ existing objects to the permanent generation before forking the
process. After forking,
+ unfreeze is called to ensure there is no impact on gc operations
+ in the original running process.
+
+ Ref: https://docs.python.org/3/library/gc.html#gc.freeze
+ """
+ import gc
+
+ gc.freeze()
+ try:
+ for _ in range(spawn_number):
+ self._spawn_worker()
+ finally:
+ gc.unfreeze()
+
def sync(self) -> None:
"""Sync will get called periodically by the heartbeat method."""
self._read_results()
diff --git a/airflow-core/tests/unit/executors/test_local_executor.py
b/airflow-core/tests/unit/executors/test_local_executor.py
index b0261baf04e..5f0420cf980 100644
--- a/airflow-core/tests/unit/executors/test_local_executor.py
+++ b/airflow-core/tests/unit/executors/test_local_executor.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+import gc
import multiprocessing
import os
from unittest import mock
@@ -44,6 +45,11 @@ skip_spawn_mp_start = pytest.mark.skipif(
class TestLocalExecutor:
+ """
+ When the executor is started, end() must be called before the test
finishes.
+ Otherwise, subprocesses will remain running, preventing the test from
terminating and causing a timeout.
+ """
+
TEST_SUCCESS_COMMANDS = 5
def test_sentry_integration(self):
@@ -55,6 +61,20 @@ class TestLocalExecutor:
def test_serve_logs_default_value(self):
assert LocalExecutor.serve_logs
+ @skip_spawn_mp_start
+ @mock.patch.object(gc, "unfreeze")
+ @mock.patch.object(gc, "freeze")
+ def test_executor_worker_spawned(self, mock_freeze, mock_unfreeze):
+ executor = LocalExecutor(parallelism=5)
+ executor.start()
+
+ mock_freeze.assert_called_once()
+ mock_unfreeze.assert_called_once()
+
+ assert len(executor.workers) == 5
+
+ executor.end()
+
@skip_spawn_mp_start
@mock.patch("airflow.sdk.execution_time.supervisor.supervise")
def test_execution(self, mock_supervise):
@@ -86,11 +106,12 @@ class TestLocalExecutor:
mock_supervise.side_effect = fake_supervise
executor = LocalExecutor(parallelism=2)
- executor.start()
-
- assert executor.result_queue.empty()
with spy_on(executor._spawn_worker) as spawn_worker:
+ executor.start()
+
+ assert executor.result_queue.empty()
+
for ti in success_tis:
executor.queue_workload(
workloads.ExecuteTask(
diff --git
a/airflow-core/tests/unit/executors/test_local_executor_check_workers.py
b/airflow-core/tests/unit/executors/test_local_executor_check_workers.py
index b0adfe5b9e5..557ff4bbcbf 100644
--- a/airflow-core/tests/unit/executors/test_local_executor_check_workers.py
+++ b/airflow-core/tests/unit/executors/test_local_executor_check_workers.py
@@ -101,7 +101,7 @@ def test_spawn_worker_when_needed(setup_executor):
executor.activity_queue.empty.return_value = False
executor.workers = {}
executor._check_workers()
- executor._spawn_worker.assert_called_once()
+ executor._spawn_worker.assert_called()
def test_no_spawn_if_parallelism_reached(setup_executor):
@@ -133,4 +133,4 @@ def
test_spawn_worker_when_we_have_parallelism_left(setup_executor):
executor.activity_queue.empty.return_value = False
executor._spawn_worker.reset_mock()
executor._check_workers()
- executor._spawn_worker.assert_called_once()
+ executor._spawn_worker.assert_called()