This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 57eed58152 add "enable_tracemalloc" to log memory usage in scheduler
(#42304)
57eed58152 is described below
commit 57eed581523c3af5b6e266935643ace81329c9a3
Author: Wei Lee <[email protected]>
AuthorDate: Tue Sep 24 04:20:46 2024 -0700
add "enable_tracemalloc" to log memory usage in scheduler (#42304)
---
airflow/config_templates/config.yml | 10 ++++++++++
airflow/executors/base_executor.py | 4 ++--
airflow/jobs/scheduler_job_runner.py | 26 ++++++++++++++++++++++++++
docs/spelling_wordlist.txt | 1 +
4 files changed, 39 insertions(+), 2 deletions(-)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index 068b19df07..bc749ad6b7 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2566,6 +2566,16 @@ scheduler:
example: ~
default: "True"
see_also: ":ref:`Differences between the two cron timetables`"
+ enable_tracemalloc:
+ description: |
+ Whether to enable memory allocation tracing in the scheduler. If
enabled, Airflow will start
+ tracing memory allocation and log the top 10 memory usages at the
error level upon receiving the
+ signal SIGUSR1.
+ This is an expensive operation and generally should not be used except
for debugging purposes.
+ version_added: 3.0.0
+ type: boolean
+ example: ~
+ default: "False"
triggerer:
description: ~
options:
diff --git a/airflow/executors/base_executor.py
b/airflow/executors/base_executor.py
index ad7690b3f6..87f496fb05 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -537,11 +537,11 @@ class BaseExecutor(LoggingMixin):
def end(self) -> None: # pragma: no cover
"""Wait synchronously for the previously submitted job to complete."""
- raise NotImplementedError()
+ raise NotImplementedError
def terminate(self):
"""Get called when the daemon receives a SIGTERM."""
- raise NotImplementedError()
+ raise NotImplementedError
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) ->
list[str]: # pragma: no cover
"""
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 6b4a730358..eb0abbc296 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -181,6 +181,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
self.do_pickle = do_pickle
+ self._enable_tracemalloc = conf.getboolean("scheduler",
"enable_tracemalloc")
+ if self._enable_tracemalloc:
+ import tracemalloc
+
+ tracemalloc.start()
+
if log:
self._log = log
@@ -202,17 +208,37 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
signal.signal(signal.SIGTERM, self._exit_gracefully)
signal.signal(signal.SIGUSR2, self._debug_dump)
+ if self._enable_tracemalloc:
+ signal.signal(signal.SIGUSR1, self._log_memory_usage)
+
def _exit_gracefully(self, signum: int, frame: FrameType | None) -> None:
"""Clean up processor_agent to avoid leaving orphan processes."""
if not _is_parent_process():
# Only the parent process should perform the cleanup.
return
+ if self._enable_tracemalloc:
+ import tracemalloc
+
+ tracemalloc.stop()
+
self.log.info("Exiting gracefully upon receiving signal %s", signum)
if self.processor_agent:
self.processor_agent.end()
sys.exit(os.EX_OK)
+ def _log_memory_usage(self, signum: int, frame: FrameType | None) -> None:
+ import tracemalloc
+
+ snapshot = tracemalloc.take_snapshot()
+ top_stats = snapshot.statistics("lineno")
+ n = 10
+ self.log.error(
+ "scheduler memory usgae:\n Top %d\n %s",
+ n,
+ "\n\t".join(map(str, top_stats[:n])),
+ )
+
def _debug_dump(self, signum: int, frame: FrameType | None) -> None:
if not _is_parent_process():
# Only the parent process should perform the debug dump.
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index b80f2b0872..b834ccc9b2 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1684,6 +1684,7 @@ tooltip
tooltips
traceback
tracebacks
+tracemalloc
TrainingPipeline
travis
triage