This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e0a617c  Log memory usage in CgroupTaskRunner (#21481)
e0a617c is described below

commit e0a617cd9fb56089319961ce0889d0d236771594
Author: Ping Zhang <[email protected]>
AuthorDate: Thu Feb 10 03:51:41 2022 -0800

    Log memory usage in CgroupTaskRunner (#21481)
---
 airflow/task/task_runner/cgroup_task_runner.py | 25 +++++++++++++++++++++++--
 1 file changed, 23 insertions(+), 2 deletions(-)

diff --git a/airflow/task/task_runner/cgroup_task_runner.py 
b/airflow/task/task_runner/cgroup_task_runner.py
index 15075de..d6c6e53 100644
--- a/airflow/task/task_runner/cgroup_task_runner.py
+++ b/airflow/task/task_runner/cgroup_task_runner.py
@@ -146,11 +146,11 @@ class CgroupTaskRunner(BaseTaskRunner):
         self._mem_mb_limit = resources.ram.qty
 
         # Create the memory cgroup
-        mem_cgroup_node = self._create_cgroup(self.mem_cgroup_name)
+        self.mem_cgroup_node = self._create_cgroup(self.mem_cgroup_name)
         self._created_mem_cgroup = True
         if self._mem_mb_limit > 0:
             self.log.debug("Setting %s with %s MB of memory", 
self.mem_cgroup_name, self._mem_mb_limit)
-            mem_cgroup_node.controller.limit_in_bytes = self._mem_mb_limit * 
1024 * 1024
+            self.mem_cgroup_node.controller.limit_in_bytes = 
self._mem_mb_limit * 1024 * 1024
 
         # Create the CPU cgroup
         cpu_cgroup_node = self._create_cgroup(self.cpu_cgroup_name)
@@ -185,11 +185,32 @@ class CgroupTaskRunner(BaseTaskRunner):
         if self.process and psutil.pid_exists(self.process.pid):
             reap_process_group(self.process.pid, self.log)
 
+    def _log_memory_usage(self, mem_cgroup_node):
+        def byte_to_gb(num_bytes, precision=2):
+            return round(num_bytes / (1024 * 1024 * 1024), precision)
+
+        with open(mem_cgroup_node.full_path + '/memory.max_usage_in_bytes') as 
f:
+            max_usage_in_bytes = int(f.read().strip())
+
+        used_gb = byte_to_gb(max_usage_in_bytes)
+        limit_gb = byte_to_gb(mem_cgroup_node.controller.limit_in_bytes)
+
+        self.log.info(
+            "Memory max usage of the task is %s GB, while the memory limit is 
%s GB", used_gb, limit_gb
+        )
+
+        if max_usage_in_bytes >= mem_cgroup_node.controller.limit_in_bytes:
+            self.log.info(
+                "This task has reached the memory limit allocated by Airflow 
worker. "
+                "If it failed, try to optimize the task or reserve more 
memory."
+            )
+
     def on_finish(self):
         # Let the OOM watcher thread know we're done to avoid false OOM alarms
         self._finished_running = True
         # Clean up the cgroups
         if self._created_mem_cgroup:
+            self._log_memory_usage(self.mem_cgroup_node)
             self._delete_cgroup(self.mem_cgroup_name)
         if self._created_cpu_cgroup:
             self._delete_cgroup(self.cpu_cgroup_name)

Reply via email to