This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0eb019571c Improve code coverage for CgroupTaskRunner (#39896)
0eb019571c is described below
commit 0eb019571c98f9bdd13691e105ca667d7e65a460
Author: Yang-Yule <[email protected]>
AuthorDate: Wed May 29 16:54:11 2024 +0800
Improve code coverage for CgroupTaskRunner (#39896)
---
tests/task/task_runner/test_cgroup_task_runner.py | 57 ++++++++++++++++++++---
1 file changed, 50 insertions(+), 7 deletions(-)
diff --git a/tests/task/task_runner/test_cgroup_task_runner.py
b/tests/task/task_runner/test_cgroup_task_runner.py
index c3999e19b0..b8b78f6064 100644
--- a/tests/task/task_runner/test_cgroup_task_runner.py
+++ b/tests/task/task_runner/test_cgroup_task_runner.py
@@ -19,10 +19,21 @@ from __future__ import annotations
from unittest import mock
+from cgroupspy.nodes import Node
+
from airflow.task.task_runner.cgroup_task_runner import CgroupTaskRunner
class TestCgroupTaskRunner:
+ def setup_method(self):
+ job = mock.Mock()
+ job.job_type = None
+ job.task_instance = mock.MagicMock()
+ job.task_instance.run_as_user = None
+ job.task_instance.command_as_list.return_value = ["sleep", "1000"]
+ job.task_instance.task.resources = None
+ self.job = job
+
@mock.patch("airflow.task.task_runner.base_task_runner.BaseTaskRunner.__init__")
@mock.patch("airflow.task.task_runner.base_task_runner.BaseTaskRunner.on_finish")
def test_cgroup_task_runner_super_calls(self, mock_super_on_finish,
mock_super_init):
@@ -32,14 +43,46 @@ class TestCgroupTaskRunner:
and when task finishes, CgroupTaskRunner.on_finish() calls
super().on_finish() to delete the temp cfg file.
"""
- Job = mock.Mock()
- Job.job_type = None
- Job.task_instance = mock.MagicMock()
- Job.task_instance.run_as_user = None
- Job.task_instance.command_as_list.return_value = ["sleep", "1000"]
-
- runner = CgroupTaskRunner(Job)
+ runner = CgroupTaskRunner(self.job)
assert mock_super_init.called
runner.on_finish()
assert mock_super_on_finish.called
+
+
@mock.patch("airflow.task.task_runner.base_task_runner.BaseTaskRunner.__init__")
+ @mock.patch("cgroupspy.nodes.Node.create_cgroup")
+ def test_create_cgroup_not_exist(self, mock_create_cgroup,
mock_super_init):
+ mock_create_cgroup.return_value = Node("test_node")
+ node = CgroupTaskRunner(self.job)._create_cgroup("./test_cgroup")
+ assert node.name.decode() == "test_node"
+
+
@mock.patch("airflow.task.task_runner.base_task_runner.BaseTaskRunner.__init__")
+ @mock.patch("cgroupspy.nodes.Node.delete_cgroup")
+ def test_delete_cgroup_exist(self, mock_delete_cgroup, mock_super_init):
+ CgroupTaskRunner(self.job)._delete_cgroup("./test_cgroup")
+ assert not mock_delete_cgroup.called
+
+ @mock.patch("airflow.utils.log.logging_mixin.LoggingMixin.__init__")
+
@mock.patch("airflow.task.task_runner.base_task_runner.BaseTaskRunner.run_command")
+ @mock.patch("cgroupspy.nodes.Node.create_cgroup")
+ def test_start_task(self, mock_create_cgroup, mock_run_command,
logging_init):
+ CgroupTaskRunner(self.job).start()
+ assert mock_create_cgroup.called
+ assert mock_run_command.called
+ assert logging_init.called
+
+
@mock.patch("airflow.task.task_runner.base_task_runner.BaseTaskRunner.__init__")
+ def test_return_code_none(self, mock_super_init):
+ return_code = CgroupTaskRunner(self.job).return_code()
+ assert not return_code
+
+
@mock.patch("airflow.task.task_runner.base_task_runner.BaseTaskRunner.__init__")
+ @mock.patch("builtins.open", new_callable=mock.mock_open)
+ def test_log_memory_usage(self, mock_open_file, mock_super_init):
+ mock_open_file.return_value.read.return_value = "12345789"
+ mem_cgroup_node = mock.Mock()
+ mem_cgroup_node.full_path = "/test/cgroup"
+ mem_cgroup_node.controller = mock.MagicMock()
+ mem_cgroup_node.controller.limit_in_bytes = 123456
+ CgroupTaskRunner(self.job)._log_memory_usage(mem_cgroup_node)
+ assert mock_open_file.called