This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 622d271048 Add tests to increase DagFileProcessorAgent coverage 
(#37605)
622d271048 is described below

commit 622d2710488dd73bd67af1cd174b9b82d17338cc
Author: Dylan Rajguru <445696+drajg...@users.noreply.github.com>
AuthorDate: Wed Feb 21 13:24:36 2024 -0800

    Add tests to increase DagFileProcessorAgent coverage (#37605)
---
 scripts/cov/core_coverage.py            |   2 +-
 scripts/cov/other_coverage.py           |  69 +++++++++++++
 tests/dag_processing/test_job_runner.py | 174 +++++++++++++++++++++++++++++++-
 3 files changed, 243 insertions(+), 2 deletions(-)

diff --git a/scripts/cov/core_coverage.py b/scripts/cov/core_coverage.py
index f5d2c88f28..0facd4bb1c 100644
--- a/scripts/cov/core_coverage.py
+++ b/scripts/cov/core_coverage.py
@@ -24,7 +24,6 @@ from cov_runner import run_tests
 sys.path.insert(0, str(Path(__file__).parent.resolve()))
 
 source_files = [
-    "airflow/core",
     "airflow/executors",
     "airflow/jobs",
     "airflow/models",
@@ -164,6 +163,7 @@ core_files = [
     "tests/utils",
 ]
 
+
 if __name__ == "__main__":
     args = ["-qq"] + core_files
     run_tests(args, source_files, files_not_fully_covered)
diff --git a/scripts/cov/other_coverage.py b/scripts/cov/other_coverage.py
new file mode 100644
index 0000000000..ff30725ab0
--- /dev/null
+++ b/scripts/cov/other_coverage.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import sys
+from pathlib import Path
+
+from cov_runner import run_tests
+
+sys.path.insert(0, str(Path(__file__).parent.resolve()))
+
+source_files = [
+    "airflow/dag_processing",
+]
+
+files_not_fully_covered = [
+    "airflow/dag_processing/manager.py",
+    "airflow/dag_processing/processor.py",
+]
+
+other_tests = [
+    "tests/dag_processing",
+]
+
+"""
+These 'other' packages can be added to the above lists
+as necessary:
+
+"tests/auth",
+"tests/callbacks",
+"tests/charts",
+"tests/cluster_policies",
+"tests/config_templates",
+"tests/datasets",
+"tests/decorators",
+"tests/hooks",
+"tests/io",
+"tests/lineage",
+"tests/listeners",
+"tests/macros",
+"tests/notifications",
+"tests/plugins",
+"tests/secrets",
+"tests/security",
+"tests/sensors",
+"tests/task",
+"tests/template",
+"tests/testconfig",
+"tests/timetables",
+"tests/triggers",
+"""
+
+if __name__ == "__main__":
+    args = ["-qq"] + other_tests
+    run_tests(args, source_files, files_not_fully_covered)
diff --git a/tests/dag_processing/test_job_runner.py 
b/tests/dag_processing/test_job_runner.py
index b9bd4d1818..33d45dfb12 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -33,7 +33,7 @@ from collections import deque
 from datetime import datetime, timedelta
 from logging.config import dictConfig
 from unittest import mock
-from unittest.mock import MagicMock, PropertyMock
+from unittest.mock import MagicMock, Mock, PropertyMock
 
 import pytest
 import time_machine
@@ -1460,3 +1460,175 @@ class TestDagFileProcessorAgent:
         processor_agent._process.join()
 
         assert os.path.isfile(log_file_loc)
+
+    def test_single_parsing_loop_no_parent_signal_conn(self):
+        with pytest.raises(ValueError, match="Process not started"):
+            processor_agent = DagFileProcessorAgent("", 1, 
timedelta(days=365), [], False, False)
+            processor_agent._process = Mock()
+            processor_agent._parent_signal_conn = None
+            processor_agent.run_single_parsing_loop()
+
+    def test_single_parsing_loop_no_process(self):
+        with pytest.raises(ValueError, match="Process not started"):
+            processor_agent = DagFileProcessorAgent("", 1, 
timedelta(days=365), [], False, False)
+            processor_agent._parent_signal_conn = Mock()
+            processor_agent._process = None
+            processor_agent.run_single_parsing_loop()
+
+    def test_single_parsing_loop_process_isnt_alive(self):
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), 
[], False, False)
+        processor_agent._process = Mock()
+        processor_agent._parent_signal_conn = Mock()
+        processor_agent._process.is_alive.return_value = False
+        ret_val = processor_agent.run_single_parsing_loop()
+        assert not ret_val
+
+    def test_single_parsing_loop_process_conn_error(self):
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), 
[], False, False)
+        processor_agent._process = Mock()
+        processor_agent._parent_signal_conn = Mock()
+        processor_agent._process.is_alive.return_value = True
+        processor_agent._parent_signal_conn.send.side_effect = ConnectionError
+        ret_val = processor_agent.run_single_parsing_loop()
+        assert not ret_val
+
+    def test_get_callbacks_pipe(self):
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), 
[], False, False)
+        processor_agent._parent_signal_conn = Mock()
+        retval = processor_agent.get_callbacks_pipe()
+        assert retval == processor_agent._parent_signal_conn
+
+    def test_get_callbacks_pipe_no_parent_signal_conn(self):
+        with pytest.raises(ValueError, match="Process not started"):
+            processor_agent = DagFileProcessorAgent("", 1, 
timedelta(days=365), [], False, False)
+            processor_agent._parent_signal_conn = None
+            processor_agent.get_callbacks_pipe()
+
+    def test_wait_until_finished_no_parent_signal_conn(self):
+        with pytest.raises(ValueError, match="Process not started"):
+            processor_agent = DagFileProcessorAgent("", 1, 
timedelta(days=365), [], False, False)
+            processor_agent._parent_signal_conn = None
+            processor_agent.wait_until_finished()
+
+    def test_wait_until_finished_poll_eof_error(self):
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), 
[], False, False)
+        processor_agent._parent_signal_conn = Mock()
+        processor_agent._parent_signal_conn.poll.return_value = True
+        processor_agent._parent_signal_conn.recv = Mock()
+        processor_agent._parent_signal_conn.recv.side_effect = EOFError
+        ret_val = processor_agent.wait_until_finished()
+        assert ret_val is None
+
+    def test_heartbeat_no_parent_signal_conn(self):
+        with pytest.raises(ValueError, match="Process not started"):
+            processor_agent = DagFileProcessorAgent("", 1, 
timedelta(days=365), [], False, False)
+            processor_agent._parent_signal_conn = None
+            processor_agent.heartbeat()
+
+    def test_heartbeat_poll_eof_error(self):
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), 
[], False, False)
+        processor_agent._parent_signal_conn = Mock()
+        processor_agent._parent_signal_conn.poll.return_value = True
+        processor_agent._parent_signal_conn.recv = Mock()
+        processor_agent._parent_signal_conn.recv.side_effect = EOFError
+        ret_val = processor_agent.heartbeat()
+        assert ret_val is None
+
+    def test_heartbeat_poll_connection_error(self):
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), 
[], False, False)
+        processor_agent._parent_signal_conn = Mock()
+        processor_agent._parent_signal_conn.poll.return_value = True
+        processor_agent._parent_signal_conn.recv = Mock()
+        processor_agent._parent_signal_conn.recv.side_effect = ConnectionError
+        ret_val = processor_agent.heartbeat()
+        assert ret_val is None
+
+    def test_heartbeat_poll_process_message(self):
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), 
[], False, False)
+        processor_agent._parent_signal_conn = Mock()
+        processor_agent._parent_signal_conn.poll.side_effect = [True, False]
+        processor_agent._parent_signal_conn.recv = Mock()
+        processor_agent._parent_signal_conn.recv.return_value = "testelem"
+        with mock.patch.object(processor_agent, "_process_message"):
+            processor_agent.heartbeat()
+            processor_agent._process_message.assert_called_with("testelem")
+
+    def test_process_message_invalid_type(self):
+        with pytest.raises(RuntimeError, match="Unexpected message received of 
type str"):
+            message = "xyz"
+            processor_agent = DagFileProcessorAgent("", 1, 
timedelta(days=365), [], False, False)
+            processor_agent._process_message(message)
+
+    def test_heartbeat_manager(self):
+        with pytest.raises(ValueError, match="Process not started"):
+            processor_agent = DagFileProcessorAgent("", 1, 
timedelta(days=365), [], False, False)
+            processor_agent._parent_signal_conn = None
+            processor_agent._heartbeat_manager()
+
+    @mock.patch("airflow.utils.process_utils.reap_process_group")
+    def test_heartbeat_manager_process_restart(self, mock_pg):
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), 
[], False, False)
+        processor_agent._parent_signal_conn = Mock()
+        processor_agent._process = MagicMock()
+        processor_agent.start = Mock()
+        processor_agent._process.is_alive.return_value = False
+        with mock.patch.object(processor_agent._process, "join"):
+            processor_agent._heartbeat_manager()
+            processor_agent.start.assert_called()
+            mock_pg.assert_not_called()
+
+    @mock.patch("airflow.dag_processing.manager.Stats")
+    @mock.patch("time.monotonic")
+    @mock.patch("airflow.dag_processing.manager.reap_process_group")
+    def test_heartbeat_manager_process_reap(self, mock_pg, 
mock_time_monotonic, mock_stats):
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), 
[], False, False)
+        processor_agent._parent_signal_conn = Mock()
+        processor_agent._process = Mock()
+        processor_agent._process.pid = 12345
+        processor_agent._process.is_alive.return_value = True
+        processor_agent._done = False
+
+        processor_agent.log.error = Mock()
+        processor_agent._processor_timeout = Mock()
+        processor_agent._processor_timeout.total_seconds.return_value = 500
+        mock_time_monotonic.return_value = 1000
+        processor_agent._last_parsing_stat_received_at = 100
+        processor_agent.start = Mock()
+
+        processor_agent._heartbeat_manager()
+        mock_stats.incr.assert_called()
+        mock_pg.assert_called()
+        processor_agent.log.error.assert_called()
+        processor_agent.start.assert_called()
+
+    def test_heartbeat_manager_terminate(self):
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), 
[], False, False)
+        processor_agent._parent_signal_conn = Mock()
+        processor_agent._process = Mock()
+        processor_agent._process.is_alive.return_value = True
+        processor_agent.log.info = Mock()
+
+        processor_agent.terminate()
+        
processor_agent._parent_signal_conn.send.assert_called_with(DagParsingSignal.TERMINATE_MANAGER)
+
+    def test_heartbeat_manager_terminate_conn_err(self):
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), 
[], False, False)
+        processor_agent._process = Mock()
+        processor_agent._process.is_alive.return_value = True
+        processor_agent._parent_signal_conn = Mock()
+        processor_agent._parent_signal_conn.send.side_effect = ConnectionError
+        processor_agent.log.info = Mock()
+
+        processor_agent.terminate()
+        
processor_agent._parent_signal_conn.send.assert_called_with(DagParsingSignal.TERMINATE_MANAGER)
+
+    def test_heartbeat_manager_end_no_process(self):
+        processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), 
[], False, False)
+        processor_agent._process = Mock()
+        processor_agent._process.__bool__ = Mock(return_value=False)
+        processor_agent._process.side_effect = [None]
+        processor_agent.log.warning = Mock()
+
+        processor_agent.end()
+        processor_agent.log.warning.assert_called_with("Ending without manager 
process.")
+        processor_agent._process.join.assert_not_called()

Reply via email to