This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e0a617c Log memory usage in CgroupTaskRunner (#21481)
e0a617c is described below
commit e0a617cd9fb56089319961ce0889d0d236771594
Author: Ping Zhang <[email protected]>
AuthorDate: Thu Feb 10 03:51:41 2022 -0800
Log memory usage in CgroupTaskRunner (#21481)
---
airflow/task/task_runner/cgroup_task_runner.py | 25 +++++++++++++++++++++++--
1 file changed, 23 insertions(+), 2 deletions(-)
diff --git a/airflow/task/task_runner/cgroup_task_runner.py
b/airflow/task/task_runner/cgroup_task_runner.py
index 15075de..d6c6e53 100644
--- a/airflow/task/task_runner/cgroup_task_runner.py
+++ b/airflow/task/task_runner/cgroup_task_runner.py
@@ -146,11 +146,11 @@ class CgroupTaskRunner(BaseTaskRunner):
self._mem_mb_limit = resources.ram.qty
# Create the memory cgroup
- mem_cgroup_node = self._create_cgroup(self.mem_cgroup_name)
+ self.mem_cgroup_node = self._create_cgroup(self.mem_cgroup_name)
self._created_mem_cgroup = True
if self._mem_mb_limit > 0:
self.log.debug("Setting %s with %s MB of memory",
self.mem_cgroup_name, self._mem_mb_limit)
- mem_cgroup_node.controller.limit_in_bytes = self._mem_mb_limit *
1024 * 1024
+ self.mem_cgroup_node.controller.limit_in_bytes =
self._mem_mb_limit * 1024 * 1024
# Create the CPU cgroup
cpu_cgroup_node = self._create_cgroup(self.cpu_cgroup_name)
@@ -185,11 +185,32 @@ class CgroupTaskRunner(BaseTaskRunner):
if self.process and psutil.pid_exists(self.process.pid):
reap_process_group(self.process.pid, self.log)
+ def _log_memory_usage(self, mem_cgroup_node):
+ def byte_to_gb(num_bytes, precision=2):
+ return round(num_bytes / (1024 * 1024 * 1024), precision)
+
+ with open(mem_cgroup_node.full_path + '/memory.max_usage_in_bytes') as
f:
+ max_usage_in_bytes = int(f.read().strip())
+
+ used_gb = byte_to_gb(max_usage_in_bytes)
+ limit_gb = byte_to_gb(mem_cgroup_node.controller.limit_in_bytes)
+
+ self.log.info(
+ "Memory max usage of the task is %s GB, while the memory limit is
%s GB", used_gb, limit_gb
+ )
+
+ if max_usage_in_bytes >= mem_cgroup_node.controller.limit_in_bytes:
+ self.log.info(
+ "This task has reached the memory limit allocated by Airflow
worker. "
+ "If it failed, try to optimize the task or reserve more
memory."
+ )
+
def on_finish(self):
# Let the OOM watcher thread know we're done to avoid false OOM alarms
self._finished_running = True
# Clean up the cgroups
if self._created_mem_cgroup:
+ self._log_memory_usage(self.mem_cgroup_node)
self._delete_cgroup(self.mem_cgroup_name)
if self._created_cpu_cgroup:
self._delete_cgroup(self.cpu_cgroup_name)