This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9139b22944 Add metrics about task CPU and memory usage (#39650)
9139b22944 is described below
commit 9139b22944984e9e96e5277ad7d930e78c49e473
Author: Vincent <[email protected]>
AuthorDate: Wed May 22 13:27:19 2024 -0400
Add metrics about task CPU and memory usage (#39650)
---
airflow/task/task_runner/standard_task_runner.py | 25 +++++++++++++++
.../logging-monitoring/metrics.rst | 2 ++
.../task/task_runner/test_standard_task_runner.py | 37 ++++++++++++++++++++--
3 files changed, 62 insertions(+), 2 deletions(-)
diff --git a/airflow/task/task_runner/standard_task_runner.py
b/airflow/task/task_runner/standard_task_runner.py
index 00252acf42..5ecf1ad64c 100644
--- a/airflow/task/task_runner/standard_task_runner.py
+++ b/airflow/task/task_runner/standard_task_runner.py
@@ -21,6 +21,8 @@ from __future__ import annotations
import logging
import os
+import threading
+import time
from typing import TYPE_CHECKING
import psutil
@@ -29,6 +31,7 @@ from setproctitle import setproctitle
from airflow.api_internal.internal_api_call import InternalApiConfig
from airflow.models.taskinstance import TaskReturnCode
from airflow.settings import CAN_FORK
+from airflow.stats import Stats
from airflow.task.task_runner.base_task_runner import BaseTaskRunner
from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
from airflow.utils.process_utils import reap_process_group,
set_new_process_group
@@ -53,6 +56,11 @@ class StandardTaskRunner(BaseTaskRunner):
else:
self.process = self._start_by_exec()
+ if self.process:
+ resource_monitor =
threading.Thread(target=self._read_task_utilization)
+ resource_monitor.daemon = True
+ resource_monitor.start()
+
def _start_by_exec(self) -> psutil.Process:
subprocess = self.run_command()
self.process = psutil.Process(subprocess.pid)
@@ -186,3 +194,20 @@ class StandardTaskRunner(BaseTaskRunner):
if self.process is None:
raise RuntimeError("Process is not started yet")
return self.process.pid
+
+ def _read_task_utilization(self):
+ dag_id = self._task_instance.dag_id
+ task_id = self._task_instance.task_id
+
+ try:
+ while True:
+ with self.process.oneshot():
+ mem_usage = self.process.memory_percent()
+ cpu_usage = self.process.cpu_percent()
+
+ Stats.gauge(f"task.mem_usage.{dag_id}.{task_id}",
mem_usage)
+ Stats.gauge(f"task.cpu_usage.{dag_id}.{task_id}",
cpu_usage)
+ time.sleep(5)
+ except (psutil.NoSuchProcess, psutil.AccessDenied, AttributeError):
+ self.log.info("Process not found (most likely exited), stop
collecting metrics")
+ return
diff --git
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
index efe565094a..f95c3a981c 100644
---
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
+++
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst
@@ -242,6 +242,8 @@ Name
Description
``pool.scheduled_tasks`` Number of scheduled tasks
in the pool. Metric with pool_name tagging.
``pool.starving_tasks.<pool_name>`` Number of starving tasks
in the pool
``pool.starving_tasks`` Number of starving tasks
in the pool. Metric with pool_name tagging.
+``task.cpu_usage_percent.<dag_id>.<task_id>`` Percentage of CPU used by
a task
+``task.mem_usage_percent.<dag_id>.<task_id>`` Percentage of memory used
by a task
``triggers.running.<hostname>`` Number of triggers
currently running for a triggerer (described by hostname)
``triggers.running`` Number of triggers
currently running for a triggerer (described by hostname).
Metric with hostname
tagging.
diff --git a/tests/task/task_runner/test_standard_task_runner.py
b/tests/task/task_runner/test_standard_task_runner.py
index ab9b882e1f..381aefc7c1 100644
--- a/tests/task/task_runner/test_standard_task_runner.py
+++ b/tests/task/task_runner/test_standard_task_runner.py
@@ -28,6 +28,7 @@ from unittest.mock import patch
import psutil
import pytest
+from airflow.exceptions import AirflowTaskTimeout
from airflow.jobs.job import Job
from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
from airflow.listeners.listener import get_listener_manager
@@ -96,8 +97,9 @@ class TestStandardTaskRunner:
yield
get_listener_manager().clear()
+ @mock.patch.object(StandardTaskRunner, "_read_task_utilization")
@patch("airflow.utils.log.file_task_handler.FileTaskHandler._init_file")
- def test_start_and_terminate(self, mock_init):
+ def test_start_and_terminate(self, mock_init, mock_read_task_utilization):
mock_init.return_value = "/tmp/any"
Job = mock.Mock()
Job.job_type = None
@@ -131,6 +133,7 @@ class TestStandardTaskRunner:
assert not psutil.pid_exists(process.pid), f"{process} is still
alive"
assert task_runner.return_code() is not None
+ mock_read_task_utilization.assert_called()
@pytest.mark.db_test
def test_notifies_about_start_and_stop(self, tmp_path):
@@ -260,8 +263,9 @@ class TestStandardTaskRunner:
assert f.readline() == "on_task_instance_success\n"
assert f.readline() == "listener\n"
+ @mock.patch.object(StandardTaskRunner, "_read_task_utilization")
@patch("airflow.utils.log.file_task_handler.FileTaskHandler._init_file")
- def test_start_and_terminate_run_as_user(self, mock_init):
+ def test_start_and_terminate_run_as_user(self, mock_init,
mock_read_task_utilization):
mock_init.return_value = "/tmp/any"
Job = mock.Mock()
Job.job_type = None
@@ -296,6 +300,7 @@ class TestStandardTaskRunner:
assert not psutil.pid_exists(process.pid), f"{process} is still
alive"
assert task_runner.return_code() is not None
+ mock_read_task_utilization.assert_called()
@propagate_task_logger()
@patch("airflow.utils.log.file_task_handler.FileTaskHandler._init_file")
@@ -444,6 +449,34 @@ class TestStandardTaskRunner:
"_AIRFLOW_PARSING_CONTEXT_TASK_ID=task1\n"
)
+ @mock.patch("airflow.task.task_runner.standard_task_runner.Stats.gauge")
+ @patch("airflow.utils.log.file_task_handler.FileTaskHandler._init_file")
+ def test_read_task_utilization(self, mock_init, mock_stats):
+ mock_init.return_value = "/tmp/any"
+ Job = mock.Mock()
+ Job.job_type = None
+ Job.task_instance = mock.MagicMock()
+ Job.task_instance.task_id = "task_id"
+ Job.task_instance.dag_id = "dag_id"
+ Job.task_instance.run_as_user = None
+ Job.task_instance.command_as_list.return_value = [
+ "airflow",
+ "tasks",
+ "run",
+ "test_on_kill",
+ "task1",
+ "2016-01-01",
+ ]
+ job_runner = LocalTaskJobRunner(job=Job,
task_instance=Job.task_instance)
+ task_runner = StandardTaskRunner(job_runner)
+ task_runner.start()
+ try:
+ with timeout(1):
+ task_runner._read_task_utilization()
+ except AirflowTaskTimeout:
+ pass
+ assert mock_stats.call_count == 2
+
@staticmethod
def _procs_in_pgroup(pgid):
for proc in psutil.process_iter(attrs=["pid", "name"]):