This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 622d271048 Add tests to increase DagFileProcessorAgent coverage (#37605) 622d271048 is described below commit 622d2710488dd73bd67af1cd174b9b82d17338cc Author: Dylan Rajguru <445696+drajg...@users.noreply.github.com> AuthorDate: Wed Feb 21 13:24:36 2024 -0800 Add tests to increase DagFileProcessorAgent coverage (#37605) --- scripts/cov/core_coverage.py | 2 +- scripts/cov/other_coverage.py | 69 +++++++++++++ tests/dag_processing/test_job_runner.py | 174 +++++++++++++++++++++++++++++++- 3 files changed, 243 insertions(+), 2 deletions(-) diff --git a/scripts/cov/core_coverage.py b/scripts/cov/core_coverage.py index f5d2c88f28..0facd4bb1c 100644 --- a/scripts/cov/core_coverage.py +++ b/scripts/cov/core_coverage.py @@ -24,7 +24,6 @@ from cov_runner import run_tests sys.path.insert(0, str(Path(__file__).parent.resolve())) source_files = [ - "airflow/core", "airflow/executors", "airflow/jobs", "airflow/models", @@ -164,6 +163,7 @@ core_files = [ "tests/utils", ] + if __name__ == "__main__": args = ["-qq"] + core_files run_tests(args, source_files, files_not_fully_covered) diff --git a/scripts/cov/other_coverage.py b/scripts/cov/other_coverage.py new file mode 100644 index 0000000000..ff30725ab0 --- /dev/null +++ b/scripts/cov/other_coverage.py @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import sys +from pathlib import Path + +from cov_runner import run_tests + +sys.path.insert(0, str(Path(__file__).parent.resolve())) + +source_files = [ + "airflow/dag_processing", +] + +files_not_fully_covered = [ + "airflow/dag_processing/manager.py", + "airflow/dag_processing/processor.py", +] + +other_tests = [ + "tests/dag_processing", +] + +""" +These 'other' packages can be added to the above lists +as necessary: + +"tests/auth", +"tests/callbacks", +"tests/charts", +"tests/cluster_policies", +"tests/config_templates", +"tests/datasets", +"tests/decorators", +"tests/hooks", +"tests/io", +"tests/lineage", +"tests/listeners", +"tests/macros", +"tests/notifications", +"tests/plugins", +"tests/secrets", +"tests/security", +"tests/sensors", +"tests/task", +"tests/template", +"tests/testconfig", +"tests/timetables", +"tests/triggers", +""" + +if __name__ == "__main__": + args = ["-qq"] + other_tests + run_tests(args, source_files, files_not_fully_covered) diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index b9bd4d1818..33d45dfb12 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -33,7 +33,7 @@ from collections import deque from datetime import datetime, timedelta from logging.config import dictConfig from unittest import mock -from unittest.mock import MagicMock, PropertyMock +from unittest.mock import MagicMock, Mock, PropertyMock import pytest import time_machine @@ -1460,3 +1460,175 @@ class TestDagFileProcessorAgent: processor_agent._process.join() assert os.path.isfile(log_file_loc) + + def test_single_parsing_loop_no_parent_signal_conn(self): + with pytest.raises(ValueError, match="Process not started"): + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent._process = Mock() + processor_agent._parent_signal_conn = None + processor_agent.run_single_parsing_loop() + + def test_single_parsing_loop_no_process(self): + with pytest.raises(ValueError, match="Process not started"): + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent._parent_signal_conn = Mock() + processor_agent._process = None + processor_agent.run_single_parsing_loop() + + def test_single_parsing_loop_process_isnt_alive(self): + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent._process = Mock() + processor_agent._parent_signal_conn = Mock() + processor_agent._process.is_alive.return_value = False + ret_val = processor_agent.run_single_parsing_loop() + assert not ret_val + + def test_single_parsing_loop_process_conn_error(self): + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent._process = Mock() + processor_agent._parent_signal_conn = Mock() + processor_agent._process.is_alive.return_value = True + processor_agent._parent_signal_conn.send.side_effect = ConnectionError + ret_val = processor_agent.run_single_parsing_loop() + assert not ret_val + + def test_get_callbacks_pipe(self): + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent._parent_signal_conn = Mock() + retval = processor_agent.get_callbacks_pipe() + assert retval == processor_agent._parent_signal_conn + + def test_get_callbacks_pipe_no_parent_signal_conn(self): + with pytest.raises(ValueError, match="Process not started"): + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent._parent_signal_conn = None + processor_agent.get_callbacks_pipe() + + def test_wait_until_finished_no_parent_signal_conn(self): + with pytest.raises(ValueError, match="Process not started"): + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent._parent_signal_conn = None + processor_agent.wait_until_finished() + + def test_wait_until_finished_poll_eof_error(self): + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent._parent_signal_conn = Mock() + processor_agent._parent_signal_conn.poll.return_value = True + processor_agent._parent_signal_conn.recv = Mock() + processor_agent._parent_signal_conn.recv.side_effect = EOFError + ret_val = processor_agent.wait_until_finished() + assert ret_val is None + + def test_heartbeat_no_parent_signal_conn(self): + with pytest.raises(ValueError, match="Process not started"): + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent._parent_signal_conn = None + processor_agent.heartbeat() + + def test_heartbeat_poll_eof_error(self): + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent._parent_signal_conn = Mock() + processor_agent._parent_signal_conn.poll.return_value = True + processor_agent._parent_signal_conn.recv = Mock() + processor_agent._parent_signal_conn.recv.side_effect = EOFError + ret_val = processor_agent.heartbeat() + assert ret_val is None + + def test_heartbeat_poll_connection_error(self): + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent._parent_signal_conn = Mock() + processor_agent._parent_signal_conn.poll.return_value = True + processor_agent._parent_signal_conn.recv = Mock() + processor_agent._parent_signal_conn.recv.side_effect = ConnectionError + ret_val = processor_agent.heartbeat() + assert ret_val is None + + def test_heartbeat_poll_process_message(self): + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent._parent_signal_conn = Mock() + processor_agent._parent_signal_conn.poll.side_effect = [True, False] + processor_agent._parent_signal_conn.recv = Mock() + processor_agent._parent_signal_conn.recv.return_value = "testelem" + with mock.patch.object(processor_agent, "_process_message"): + processor_agent.heartbeat() + processor_agent._process_message.assert_called_with("testelem") + + def test_process_message_invalid_type(self): + with pytest.raises(RuntimeError, match="Unexpected message received of type str"): + message = "xyz" + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent._process_message(message) + + def test_heartbeat_manager(self): + with pytest.raises(ValueError, match="Process not started"): + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent._parent_signal_conn = None + processor_agent._heartbeat_manager() + + @mock.patch("airflow.utils.process_utils.reap_process_group") + def test_heartbeat_manager_process_restart(self, mock_pg): + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent._parent_signal_conn = Mock() + processor_agent._process = MagicMock() + processor_agent.start = Mock() + processor_agent._process.is_alive.return_value = False + with mock.patch.object(processor_agent._process, "join"): + processor_agent._heartbeat_manager() + processor_agent.start.assert_called() + mock_pg.assert_not_called() + + @mock.patch("airflow.dag_processing.manager.Stats") + @mock.patch("time.monotonic") + @mock.patch("airflow.dag_processing.manager.reap_process_group") + def test_heartbeat_manager_process_reap(self, mock_pg, mock_time_monotonic, mock_stats): + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent._parent_signal_conn = Mock() + processor_agent._process = Mock() + processor_agent._process.pid = 12345 + processor_agent._process.is_alive.return_value = True + processor_agent._done = False + + processor_agent.log.error = Mock() + processor_agent._processor_timeout = Mock() + processor_agent._processor_timeout.total_seconds.return_value = 500 + mock_time_monotonic.return_value = 1000 + processor_agent._last_parsing_stat_received_at = 100 + processor_agent.start = Mock() + + processor_agent._heartbeat_manager() + mock_stats.incr.assert_called() + mock_pg.assert_called() + processor_agent.log.error.assert_called() + processor_agent.start.assert_called() + + def test_heartbeat_manager_terminate(self): + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent._parent_signal_conn = Mock() + processor_agent._process = Mock() + processor_agent._process.is_alive.return_value = True + processor_agent.log.info = Mock() + + processor_agent.terminate() + processor_agent._parent_signal_conn.send.assert_called_with(DagParsingSignal.TERMINATE_MANAGER) + + def test_heartbeat_manager_terminate_conn_err(self): + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent._process = Mock() + processor_agent._process.is_alive.return_value = True + processor_agent._parent_signal_conn = Mock() + processor_agent._parent_signal_conn.send.side_effect = ConnectionError + processor_agent.log.info = Mock() + + processor_agent.terminate() + processor_agent._parent_signal_conn.send.assert_called_with(DagParsingSignal.TERMINATE_MANAGER) + + def test_heartbeat_manager_end_no_process(self): + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent._process = Mock() + processor_agent._process.__bool__ = Mock(return_value=False) + processor_agent._process.side_effect = [None] + processor_agent.log.warning = Mock() + + processor_agent.end() + processor_agent.log.warning.assert_called_with("Ending without manager process.") + processor_agent._process.join.assert_not_called()