This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9bd1c406403 Test DagFileProcessorManager directly, not via the 
JobRunner (#44642)
9bd1c406403 is described below

commit 9bd1c4064035a4ee206b07055b3daff2700e2e74
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Wed Dec 4 11:32:41 2024 +0000

    Test DagFileProcessorManager directly, not via the JobRunner (#44642)
    
    90% of these tests created a DagProcessorJobRunner with the Manager inside 
it,
    then did absolutely nothing with the JobRunner object. This makes the tests
    more directly use what they are testing.
    
    (DagProcessorJobRunner itself is as simple as can be -- it calls `start()` 
->
    `terminate()` -> `end()` so we don't loose much of anything by not testing 
it
    explicitly)
---
 .../{test_job_runner.py => test_manager.py}        | 674 +++++++++------------
 1 file changed, 290 insertions(+), 384 deletions(-)

diff --git a/tests/dag_processing/test_job_runner.py 
b/tests/dag_processing/test_manager.py
similarity index 75%
rename from tests/dag_processing/test_job_runner.py
rename to tests/dag_processing/test_manager.py
index 3ab328e8e79..76652740fbc 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_manager.py
@@ -50,8 +50,6 @@ from airflow.dag_processing.manager import (
     DagParsingStat,
 )
 from airflow.dag_processing.processor import DagFileProcessorProcess
-from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner
-from airflow.jobs.job import Job
 from airflow.models import DagBag, DagModel, DbCallbackRequest
 from airflow.models.dag_version import DagVersion
 from airflow.models.dagcode import DagCode
@@ -138,14 +136,14 @@ class TestDagProcessorJobRunner:
         clear_db_dags()
         clear_db_callbacks()
 
-    def run_processor_manager_one_loop(self, manager, parent_pipe):
-        if not manager.processor._async_mode:
+    def run_processor_manager_one_loop(self, processor, parent_pipe):
+        if not processor._async_mode:
             parent_pipe.send(DagParsingSignal.AGENT_RUN_ONCE)
 
         results = []
 
         while True:
-            manager.processor._run_parsing_loop()
+            processor._run_parsing_loop()
 
             while parent_pipe.poll(timeout=0.01):
                 obj = parent_pipe.recv()
@@ -170,16 +168,13 @@ class TestDagProcessorJobRunner:
         child_pipe, parent_pipe = multiprocessing.Pipe()
 
         async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn")
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory=path_to_parse.parent,
-                max_runs=1,
-                processor_timeout=timedelta(days=365),
-                signal_conn=child_pipe,
-                dag_ids=[],
-                async_mode=async_mode,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory=path_to_parse.parent,
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=child_pipe,
+            dag_ids=[],
+            async_mode=async_mode,
         )
 
         with create_session() as session:
@@ -205,16 +200,13 @@ class TestDagProcessorJobRunner:
         child_pipe, parent_pipe = multiprocessing.Pipe()
 
         async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn")
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory=os.fspath(tmp_path),
-                max_runs=1,
-                processor_timeout=timedelta(days=365),
-                signal_conn=child_pipe,
-                dag_ids=[],
-                async_mode=async_mode,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory=os.fspath(tmp_path),
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=child_pipe,
+            dag_ids=[],
+            async_mode=async_mode,
         )
 
         self.run_processor_manager_one_loop(manager, parent_pipe)
@@ -228,27 +220,24 @@ class TestDagProcessorJobRunner:
         Test that when a processor already exist with a filepath, a new 
processor won't be created
         with that filepath. The filepath will just be removed from the list.
         """
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory="directory",
-                max_runs=1,
-                processor_timeout=timedelta(days=365),
-                signal_conn=MagicMock(),
-                dag_ids=[],
-                async_mode=True,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory="directory",
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            async_mode=True,
         )
 
         file_1 = "file_1.py"
         file_2 = "file_2.py"
         file_3 = "file_3.py"
-        manager.processor._file_path_queue = deque([file_1, file_2, file_3])
+        manager._file_path_queue = deque([file_1, file_2, file_3])
 
         # Mock that only one processor exists. This processor runs with 
'file_1'
-        manager.processor._processors[file_1] = MagicMock()
+        manager._processors[file_1] = MagicMock()
         # Start New Processes
-        manager.processor.start_new_processes()
+        manager.start_new_processes()
 
         # Because of the config: '[scheduler] parsing_processes = 2'
         # verify that only one extra process is created
@@ -256,55 +245,49 @@ class TestDagProcessorJobRunner:
         # even though it is first in '_file_path_queue'
         # a new processor is created with 'file_2' and not 'file_1'.
 
-        assert file_1 in manager.processor._processors.keys()
-        assert file_2 in manager.processor._processors.keys()
-        assert deque([file_3]) == manager.processor._file_path_queue
+        assert file_1 in manager._processors.keys()
+        assert file_2 in manager._processors.keys()
+        assert deque([file_3]) == manager._file_path_queue
 
     def 
test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory="directory",
-                max_runs=1,
-                processor_timeout=timedelta(days=365),
-                signal_conn=MagicMock(),
-                dag_ids=[],
-                async_mode=True,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory="directory",
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            async_mode=True,
         )
 
         mock_processor = MagicMock()
         mock_processor.stop.side_effect = AttributeError("DagFileProcessor 
object has no attribute stop")
         mock_processor.terminate.side_effect = None
 
-        manager.processor._processors["missing_file.txt"] = mock_processor
-        manager.processor._file_stats["missing_file.txt"] = DagFileStat(0, 0, 
None, None, 0, 0)
+        manager._processors["missing_file.txt"] = mock_processor
+        manager._file_stats["missing_file.txt"] = DagFileStat(0, 0, None, 
None, 0, 0)
 
-        manager.processor.set_file_paths(["abc.txt"])
-        assert manager.processor._processors == {}
-        assert "missing_file.txt" not in manager.processor._file_stats
+        manager.set_file_paths(["abc.txt"])
+        assert manager._processors == {}
+        assert "missing_file.txt" not in manager._file_stats
 
     def 
test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self):
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory="directory",
-                max_runs=1,
-                processor_timeout=timedelta(days=365),
-                signal_conn=MagicMock(),
-                dag_ids=[],
-                async_mode=True,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory="directory",
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            async_mode=True,
         )
 
         mock_processor = MagicMock()
         mock_processor.stop.side_effect = AttributeError("DagFileProcessor 
object has no attribute stop")
         mock_processor.terminate.side_effect = None
 
-        manager.processor._processors["abc.txt"] = mock_processor
+        manager._processors["abc.txt"] = mock_processor
 
-        manager.processor.set_file_paths(["abc.txt"])
-        assert manager.processor._processors == {"abc.txt": mock_processor}
+        manager.set_file_paths(["abc.txt"])
+        assert manager._processors == {"abc.txt": mock_processor}
 
     @conf_vars({("scheduler", "file_parsing_sort_mode"): "alphabetical"})
     @mock.patch("zipfile.is_zipfile", return_value=True)
@@ -318,24 +301,19 @@ class TestDagProcessorJobRunner:
         dag_files = ["file_3.py", "file_2.py", "file_4.py", "file_1.py"]
         mock_find_path.return_value = dag_files
 
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory="directory",
-                max_runs=1,
-                processor_timeout=timedelta(days=365),
-                signal_conn=MagicMock(),
-                dag_ids=[],
-                async_mode=True,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory="directory",
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            async_mode=True,
         )
 
-        manager.processor.set_file_paths(dag_files)
-        assert manager.processor._file_path_queue == deque()
-        manager.processor.prepare_file_path_queue()
-        assert manager.processor._file_path_queue == deque(
-            ["file_1.py", "file_2.py", "file_3.py", "file_4.py"]
-        )
+        manager.set_file_paths(dag_files)
+        assert manager._file_path_queue == deque()
+        manager.prepare_file_path_queue()
+        assert manager._file_path_queue == deque(["file_1.py", "file_2.py", 
"file_3.py", "file_4.py"])
 
     @conf_vars({("scheduler", "file_parsing_sort_mode"): 
"random_seeded_by_host"})
     @mock.patch("zipfile.is_zipfile", return_value=True)
@@ -349,30 +327,27 @@ class TestDagProcessorJobRunner:
         dag_files = ["file_3.py", "file_2.py", "file_4.py", "file_1.py"]
         mock_find_path.return_value = dag_files
 
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory="directory",
-                max_runs=1,
-                processor_timeout=timedelta(days=365),
-                signal_conn=MagicMock(),
-                dag_ids=[],
-                async_mode=True,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory="directory",
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            async_mode=True,
         )
 
-        manager.processor.set_file_paths(dag_files)
-        assert manager.processor._file_path_queue == deque()
-        manager.processor.prepare_file_path_queue()
+        manager.set_file_paths(dag_files)
+        assert manager._file_path_queue == deque()
+        manager.prepare_file_path_queue()
 
         expected_order = deque(dag_files)
         random.Random(get_hostname()).shuffle(expected_order)
-        assert manager.processor._file_path_queue == expected_order
+        assert manager._file_path_queue == expected_order
 
         # Verify running it again produces same order
-        manager.processor._file_paths = []
-        manager.processor.prepare_file_path_queue()
-        assert manager.processor._file_path_queue == expected_order
+        manager._file_paths = []
+        manager.prepare_file_path_queue()
+        assert manager._file_path_queue == expected_order
 
     @pytest.fixture
     def change_platform_timezone(self, monkeypatch):
@@ -413,24 +388,19 @@ class TestDagProcessorJobRunner:
         mock_getmtime.side_effect = list(paths_with_mtime.values())
         mock_find_path.return_value = dag_files
 
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory="directory",
-                max_runs=1,
-                processor_timeout=timedelta(days=365),
-                signal_conn=MagicMock(),
-                dag_ids=[],
-                async_mode=True,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory="directory",
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            async_mode=True,
         )
 
-        manager.processor.set_file_paths(dag_files)
-        assert manager.processor._file_path_queue == deque()
-        manager.processor.prepare_file_path_queue()
-        assert manager.processor._file_path_queue == deque(
-            ["file_4.py", "file_1.py", "file_3.py", "file_2.py"]
-        )
+        manager.set_file_paths(dag_files)
+        assert manager._file_path_queue == deque()
+        manager.prepare_file_path_queue()
+        assert manager._file_path_queue == deque(["file_4.py", "file_1.py", 
"file_3.py", "file_2.py"])
 
     @conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"})
     @mock.patch("zipfile.is_zipfile", return_value=True)
@@ -452,21 +422,18 @@ class TestDagProcessorJobRunner:
         mock_getmtime.side_effect = [1.0, 2.0, FileNotFoundError()]
         mock_find_path.return_value = dag_files
 
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory="directory",
-                max_runs=1,
-                processor_timeout=timedelta(days=365),
-                signal_conn=MagicMock(),
-                dag_ids=[],
-                async_mode=True,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory="directory",
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            async_mode=True,
         )
 
-        manager.processor.set_file_paths(dag_files)
-        manager.processor.prepare_file_path_queue()
-        assert manager.processor._file_path_queue == deque(["file_2.py", 
"file_3.py"])
+        manager.set_file_paths(dag_files)
+        manager.prepare_file_path_queue()
+        assert manager._file_path_queue == deque(["file_2.py", "file_3.py"])
 
     @conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"})
     @mock.patch("zipfile.is_zipfile", return_value=True)
@@ -488,27 +455,22 @@ class TestDagProcessorJobRunner:
         mock_getmtime.side_effect = [1.0, 2.0, 3.0]
         mock_find_path.return_value = dag_files
 
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory="directory",
-                max_runs=1,
-                processor_timeout=timedelta(days=365),
-                signal_conn=MagicMock(),
-                dag_ids=[],
-                async_mode=True,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory="directory",
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            async_mode=True,
         )
 
-        manager.processor.set_file_paths(dag_files)
-        manager.processor.prepare_file_path_queue()
-        assert manager.processor._file_path_queue == deque(["file_3.py", 
"file_2.py", "file_1.py"])
+        manager.set_file_paths(dag_files)
+        manager.prepare_file_path_queue()
+        assert manager._file_path_queue == deque(["file_3.py", "file_2.py", 
"file_1.py"])
 
-        manager.processor.set_file_paths([*dag_files, "file_4.py"])
-        manager.processor.add_new_file_path_to_queue()
-        assert manager.processor._file_path_queue == deque(
-            ["file_4.py", "file_3.py", "file_2.py", "file_1.py"]
-        )
+        manager.set_file_paths([*dag_files, "file_4.py"])
+        manager.add_new_file_path_to_queue()
+        assert manager._file_path_queue == deque(["file_4.py", "file_3.py", 
"file_2.py", "file_1.py"])
 
     @conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"})
     @mock.patch("airflow.settings.TIMEZONE", timezone.utc)
@@ -535,46 +497,43 @@ class TestDagProcessorJobRunner:
         mock_getmtime.side_effect = [initial_file_1_mtime]
         mock_find_path.return_value = dag_files
 
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory="directory",
-                max_runs=3,
-                processor_timeout=timedelta(days=365),
-                signal_conn=MagicMock(),
-                dag_ids=[],
-                async_mode=True,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory="directory",
+            max_runs=3,
+            processor_timeout=timedelta(days=365),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            async_mode=True,
         )
 
         # let's say the DAG was just parsed 10 seconds before the Freezed time
         last_finish_time = freezed_base_time - timedelta(seconds=10)
-        manager.processor._file_stats = {
+        manager._file_stats = {
             "file_1.py": DagFileStat(1, 0, last_finish_time, 
timedelta(seconds=1.0), 1, 1),
         }
         with time_machine.travel(freezed_base_time):
-            manager.processor.set_file_paths(dag_files)
-            assert manager.processor._file_path_queue == deque()
+            manager.set_file_paths(dag_files)
+            assert manager._file_path_queue == deque()
             # File Path Queue will be empty as the "modified time" < "last 
finish time"
-            manager.processor.prepare_file_path_queue()
-            assert manager.processor._file_path_queue == deque()
+            manager.prepare_file_path_queue()
+            assert manager._file_path_queue == deque()
 
         # Simulate the DAG modification by using modified_time which is greater
         # than the last_parse_time but still less than now - 
min_file_process_interval
         file_1_new_mtime = freezed_base_time - timedelta(seconds=5)
         file_1_new_mtime_ts = file_1_new_mtime.timestamp()
         with time_machine.travel(freezed_base_time):
-            manager.processor.set_file_paths(dag_files)
-            assert manager.processor._file_path_queue == deque()
+            manager.set_file_paths(dag_files)
+            assert manager._file_path_queue == deque()
             # File Path Queue will be empty as the "modified time" < "last 
finish time"
             mock_getmtime.side_effect = [file_1_new_mtime_ts]
-            manager.processor.prepare_file_path_queue()
+            manager.prepare_file_path_queue()
             # Check that file is added to the queue even though file was just 
recently passed
-            assert manager.processor._file_path_queue == deque(["file_1.py"])
+            assert manager._file_path_queue == deque(["file_1.py"])
             assert last_finish_time < file_1_new_mtime
             assert (
-                manager.processor._file_process_interval
-                > (freezed_base_time - 
manager.processor.get_last_finish_time("file_1.py")).total_seconds()
+                manager._file_process_interval
+                > (freezed_base_time - 
manager.get_last_finish_time("file_1.py")).total_seconds()
             )
 
     @mock.patch("zipfile.is_zipfile", return_value=True)
@@ -595,24 +554,19 @@ class TestDagProcessorJobRunner:
         dag_files = ["file_3.py", "file_2.py", "file_4.py", "file_1.py"]
         mock_find_path.return_value = dag_files
 
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory="directory",
-                max_runs=1,
-                processor_timeout=timedelta(days=365),
-                signal_conn=MagicMock(),
-                dag_ids=[],
-                async_mode=True,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory="directory",
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            async_mode=True,
         )
 
-        manager.processor.set_file_paths(dag_files)
-        manager.processor._file_path_queue = deque(["file_2.py", "file_3.py", 
"file_4.py", "file_1.py"])
-        manager.processor._refresh_requested_filelocs()
-        assert manager.processor._file_path_queue == deque(
-            ["file_1.py", "file_2.py", "file_3.py", "file_4.py"]
-        )
+        manager.set_file_paths(dag_files)
+        manager._file_path_queue = deque(["file_2.py", "file_3.py", 
"file_4.py", "file_1.py"])
+        manager._refresh_requested_filelocs()
+        assert manager._file_path_queue == deque(["file_1.py", "file_2.py", 
"file_3.py", "file_4.py"])
         with create_session() as session2:
             parsing_request_after = 
session2.query(DagPriorityParsingRequest).get(parsing_request.id)
         assert parsing_request_after is None
@@ -622,16 +576,13 @@ class TestDagProcessorJobRunner:
         Ensure that DAGs are marked inactive when the file is parsed but the
         DagModel.last_parsed_time is not updated.
         """
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory="directory",
-                max_runs=1,
-                processor_timeout=timedelta(minutes=10),
-                signal_conn=MagicMock(),
-                dag_ids=[],
-                async_mode=True,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory="directory",
+            max_runs=1,
+            processor_timeout=timedelta(minutes=10),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            async_mode=True,
         )
 
         test_dag_path = str(TEST_DAG_FOLDER / "test_example_bash_operator.py")
@@ -653,8 +604,8 @@ class TestDagProcessorJobRunner:
                 run_count=1,
                 last_num_of_db_queries=1,
             )
-            manager.processor._file_paths = [test_dag_path]
-            manager.processor._file_stats[test_dag_path] = stat
+            manager._file_paths = [test_dag_path]
+            manager._file_stats[test_dag_path] = stat
 
             active_dag_count = (
                 session.query(func.count(DagModel.dag_id))
@@ -663,7 +614,7 @@ class TestDagProcessorJobRunner:
             )
             assert active_dag_count == 1
 
-            manager.processor._scan_stale_dags()
+            manager._scan_stale_dags()
 
             active_dag_count = (
                 session.query(func.count(DagModel.dag_id))
@@ -693,16 +644,13 @@ class TestDagProcessorJobRunner:
         Ensure only dags from current dag_directory are updated
         """
         dag_directory = "directory"
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory=dag_directory,
-                max_runs=1,
-                processor_timeout=timedelta(minutes=10),
-                signal_conn=MagicMock(),
-                dag_ids=[],
-                async_mode=True,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory=dag_directory,
+            max_runs=1,
+            processor_timeout=timedelta(minutes=10),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            async_mode=True,
         )
 
         test_dag_path = str(TEST_DAG_FOLDER / "test_example_bash_operator.py")
@@ -730,13 +678,13 @@ class TestDagProcessorJobRunner:
                 run_count=1,
                 last_num_of_db_queries=1,
             )
-            manager.processor._file_paths = [test_dag_path]
-            manager.processor._file_stats[test_dag_path] = stat
+            manager._file_paths = [test_dag_path]
+            manager._file_stats[test_dag_path] = stat
 
             active_dag_count = 
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
             assert active_dag_count == 2
 
-            manager.processor._scan_stale_dags()
+            manager._scan_stale_dags()
 
             active_dag_count = 
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
             assert active_dag_count == 1
@@ -749,16 +697,13 @@ class TestDagProcessorJobRunner:
     def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid, 
mock_waitable_handle):
         mock_pid.return_value = 1234
         mock_waitable_handle.return_value = 3
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory="directory",
-                max_runs=1,
-                processor_timeout=timedelta(seconds=5),
-                signal_conn=MagicMock(),
-                dag_ids=[],
-                async_mode=True,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory="directory",
+            max_runs=1,
+            processor_timeout=timedelta(seconds=5),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            async_mode=True,
         )
 
         processor = DagFileProcessorProcess(
@@ -768,28 +713,25 @@ class TestDagProcessorJobRunner:
             callback_requests=[],
         )
         processor._start_time = timezone.make_aware(datetime.min)
-        manager.processor._processors = {"abc.txt": processor}
-        manager.processor.waitables[3] = processor
-        initial_waitables = len(manager.processor.waitables)
-        manager.processor._kill_timed_out_processors()
+        manager._processors = {"abc.txt": processor}
+        manager.waitables[3] = processor
+        initial_waitables = len(manager.waitables)
+        manager._kill_timed_out_processors()
         mock_kill.assert_called_once_with()
-        assert len(manager.processor._processors) == 0
-        assert len(manager.processor.waitables) == initial_waitables - 1
+        assert len(manager._processors) == 0
+        assert len(manager.waitables) == initial_waitables - 1
 
     
@mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.pid", 
new_callable=PropertyMock)
     @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess")
     def test_kill_timed_out_processors_no_kill(self, mock_dag_file_processor, 
mock_pid):
         mock_pid.return_value = 1234
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory=TEST_DAG_FOLDER,
-                max_runs=1,
-                processor_timeout=timedelta(seconds=5),
-                signal_conn=MagicMock(),
-                dag_ids=[],
-                async_mode=True,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory=TEST_DAG_FOLDER,
+            max_runs=1,
+            processor_timeout=timedelta(seconds=5),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            async_mode=True,
         )
 
         processor = DagFileProcessorProcess(
@@ -799,8 +741,8 @@ class TestDagProcessorJobRunner:
             callback_requests=[],
         )
         processor._start_time = timezone.make_aware(datetime.max)
-        manager.processor._processors = {"abc.txt": processor}
-        manager.processor._kill_timed_out_processors()
+        manager._processors = {"abc.txt": processor}
+        manager._kill_timed_out_processors()
         mock_dag_file_processor.kill.assert_not_called()
 
     @conf_vars({("core", "load_examples"): "False"})
@@ -819,19 +761,16 @@ class TestDagProcessorJobRunner:
 
         child_pipe, parent_pipe = multiprocessing.Pipe()
 
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory=dag_directory,
-                dag_ids=[],
-                max_runs=1,
-                processor_timeout=timedelta(seconds=5),
-                signal_conn=child_pipe,
-                async_mode=True,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory=dag_directory,
+            dag_ids=[],
+            max_runs=1,
+            processor_timeout=timedelta(seconds=5),
+            signal_conn=child_pipe,
+            async_mode=True,
         )
 
-        manager.processor._run_parsing_loop()
+        manager._run_parsing_loop()
 
         result = None
         while parent_pipe.poll(timeout=None):
@@ -840,7 +779,7 @@ class TestDagProcessorJobRunner:
                 break
 
         # Three files in folder should be processed
-        assert sum(stat.run_count for stat in 
manager.processor._file_stats.values()) == 3
+        assert sum(stat.run_count for stat in manager._file_stats.values()) == 
3
 
         with create_session() as session:
             assert session.get(DagModel, dag_id) is not None
@@ -864,16 +803,13 @@ class TestDagProcessorJobRunner:
         with create_session() as session:
             child_pipe, parent_pipe = multiprocessing.Pipe()
 
-            manager = DagProcessorJobRunner(
-                job=Job(),
-                processor=DagFileProcessorManager(
-                    dag_directory=processor_dir_1,
-                    dag_ids=[],
-                    max_runs=1,
-                    signal_conn=child_pipe,
-                    processor_timeout=timedelta(seconds=5),
-                    async_mode=False,
-                ),
+            manager = DagFileProcessorManager(
+                dag_directory=processor_dir_1,
+                dag_ids=[],
+                max_runs=1,
+                signal_conn=child_pipe,
+                processor_timeout=timedelta(seconds=5),
+                async_mode=False,
             )
 
             self.run_processor_manager_one_loop(manager, parent_pipe)
@@ -884,16 +820,13 @@ class TestDagProcessorJobRunner:
 
             child_pipe, parent_pipe = multiprocessing.Pipe()
 
-            manager = DagProcessorJobRunner(
-                job=Job(),
-                processor=DagFileProcessorManager(
-                    dag_directory=processor_dir_2,
-                    dag_ids=[],
-                    max_runs=1,
-                    signal_conn=child_pipe,
-                    processor_timeout=timedelta(seconds=5),
-                    async_mode=True,
-                ),
+            manager = DagFileProcessorManager(
+                dag_directory=processor_dir_2,
+                dag_ids=[],
+                max_runs=1,
+                signal_conn=child_pipe,
+                processor_timeout=timedelta(seconds=5),
+                async_mode=True,
             )
 
             self.run_processor_manager_one_loop(manager, parent_pipe)
@@ -994,20 +927,17 @@ class TestDagProcessorJobRunner:
         child_pipe, parent_pipe = multiprocessing.Pipe()
 
         async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn")
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory=path_to_parse.parent,
-                max_runs=1,
-                processor_timeout=timedelta(days=365),
-                signal_conn=child_pipe,
-                dag_ids=[],
-                async_mode=async_mode,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory=path_to_parse.parent,
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=child_pipe,
+            dag_ids=[],
+            async_mode=async_mode,
         )
 
         self.run_processor_manager_one_loop(manager, parent_pipe)
-        last_runtime = 
manager.processor.get_last_runtime(manager.processor.file_paths[0])
+        last_runtime = manager.get_last_runtime(manager.file_paths[0])
 
         child_pipe.close()
         parent_pipe.close()
@@ -1025,17 +955,14 @@ class TestDagProcessorJobRunner:
         )
 
     def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path):
-        """Test DagProcessorJobRunner._refresh_dag_dir method"""
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory=TEST_DAG_FOLDER,
-                max_runs=1,
-                processor_timeout=timedelta(days=365),
-                signal_conn=MagicMock(),
-                dag_ids=[],
-                async_mode=True,
-            ),
+        """Test DagFileProcessorManager._refresh_dag_dir method"""
+        manager = DagFileProcessorManager(
+            dag_directory=TEST_DAG_FOLDER,
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            async_mode=True,
         )
         dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
         zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
@@ -1043,8 +970,8 @@ class TestDagProcessorJobRunner:
         dag = dagbag.get_dag("test_zip_dag")
         dag.sync_to_db()
         SerializedDagModel.write_dag(dag)
-        manager.processor.last_dag_dir_refresh_time = timezone.utcnow() - 
timedelta(minutes=10)
-        manager.processor._refresh_dag_dir()
+        manager.last_dag_dir_refresh_time = timezone.utcnow() - 
timedelta(minutes=10)
+        manager._refresh_dag_dir()
         # Assert dag not deleted in SDM
         assert SerializedDagModel.has_dag("test_zip_dag")
         # assert code not deleted
@@ -1053,17 +980,14 @@ class TestDagProcessorJobRunner:
         assert dag.get_is_active()
 
     def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmp_path):
-        """Test DagProcessorJobRunner._refresh_dag_dir method"""
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory=TEST_DAG_FOLDER,
-                max_runs=1,
-                processor_timeout=timedelta(days=365),
-                signal_conn=MagicMock(),
-                dag_ids=[],
-                async_mode=True,
-            ),
+        """Test DagFileProcessorManager._refresh_dag_dir method"""
+        manager = DagFileProcessorManager(
+            dag_directory=TEST_DAG_FOLDER,
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            async_mode=True,
         )
         dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
         zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
@@ -1071,11 +995,11 @@ class TestDagProcessorJobRunner:
         dag = dagbag.get_dag("test_zip_dag")
         dag.sync_to_db()
         SerializedDagModel.write_dag(dag)
-        manager.processor.last_dag_dir_refresh_time = timezone.utcnow() - 
timedelta(minutes=10)
+        manager.last_dag_dir_refresh_time = timezone.utcnow() - 
timedelta(minutes=10)
 
         # Mock might_contain_dag to mimic deleting the python file from the zip
         with mock.patch("airflow.dag_processing.manager.might_contain_dag", 
return_value=False):
-            manager.processor._refresh_dag_dir()
+            manager._refresh_dag_dir()
 
         # Deleting the python file should not delete SDM for versioning sake
         assert SerializedDagModel.has_dag("test_zip_dag")
@@ -1087,7 +1011,7 @@ class TestDagProcessorJobRunner:
         assert not dag.get_is_active()
 
     def 
test_refresh_dags_dir_does_not_interfer_with_dags_outside_its_subdir(self, 
tmp_path):
-        """Test DagProcessorJobRunner._refresh_dag_dir should not update dags 
outside its processor_subdir"""
+        """Test DagFileProcessorManager._refresh_dag_dir should not update 
dags outside its processor_subdir"""
 
         dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
         dag_path = os.path.join(TEST_DAGS_FOLDER, "test_miscellaneous.py")
@@ -1100,20 +1024,17 @@ class TestDagProcessorJobRunner:
         assert dag.get_is_active()
         assert DagCode.has_dag(dag.dag_id)
 
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory=TEST_DAG_FOLDER / "subdir2" / "subdir3",
-                max_runs=1,
-                processor_timeout=timedelta(days=365),
-                signal_conn=MagicMock(),
-                dag_ids=[],
-                async_mode=True,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory=TEST_DAG_FOLDER / "subdir2" / "subdir3",
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            async_mode=True,
         )
-        manager.processor.last_dag_dir_refresh_time = timezone.utcnow() - 
timedelta(minutes=10)
+        manager.last_dag_dir_refresh_time = timezone.utcnow() - 
timedelta(minutes=10)
 
-        manager.processor._refresh_dag_dir()
+        manager._refresh_dag_dir()
 
         assert SerializedDagModel.has_dag("miscellaneous_test_dag")
         assert dag.get_is_active()
@@ -1126,7 +1047,7 @@ class TestDagProcessorJobRunner:
         }
     )
     def test_fetch_callbacks_from_database(self, tmp_path):
-        """Test DagProcessorJobRunner._fetch_callbacks method"""
+        """Test DagFileProcessorManager._fetch_callbacks method"""
         dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
 
         callback1 = DagCallbackRequest(
@@ -1149,16 +1070,13 @@ class TestDagProcessorJobRunner:
             session.add(DbCallbackRequest(callback=callback2, 
priority_weight=10))
 
         child_pipe, parent_pipe = multiprocessing.Pipe()
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory=os.fspath(tmp_path),
-                max_runs=1,
-                processor_timeout=timedelta(days=365),
-                signal_conn=child_pipe,
-                dag_ids=[],
-                async_mode=False,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory=os.fspath(tmp_path),
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=child_pipe,
+            dag_ids=[],
+            async_mode=False,
         )
 
         with create_session() as session:
@@ -1172,7 +1090,7 @@ class TestDagProcessorJobRunner:
         }
     )
     def test_fetch_callbacks_for_current_dag_directory_only(self, tmp_path):
-        """Test DagProcessorJobRunner._fetch_callbacks method"""
+        """Test DagFileProcessorManager._fetch_callbacks method"""
         dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
 
         callback1 = DagCallbackRequest(
@@ -1195,16 +1113,13 @@ class TestDagProcessorJobRunner:
             session.add(DbCallbackRequest(callback=callback2, 
priority_weight=10))
 
         child_pipe, parent_pipe = multiprocessing.Pipe()
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory=tmp_path,
-                max_runs=1,
-                processor_timeout=timedelta(days=365),
-                signal_conn=child_pipe,
-                dag_ids=[],
-                async_mode=False,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory=tmp_path,
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=child_pipe,
+            dag_ids=[],
+            async_mode=False,
         )
 
         with create_session() as session:
@@ -1219,7 +1134,7 @@ class TestDagProcessorJobRunner:
         }
     )
     def test_fetch_callbacks_from_database_max_per_loop(self, tmp_path):
-        """Test DagProcessorJobRunner._fetch_callbacks method"""
+        """Test DagFileProcessorManager._fetch_callbacks method"""
         dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
 
         with create_session() as session:
@@ -1234,16 +1149,13 @@ class TestDagProcessorJobRunner:
                 session.add(DbCallbackRequest(callback=callback, 
priority_weight=i))
 
         child_pipe, parent_pipe = multiprocessing.Pipe()
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory=str(tmp_path),
-                max_runs=1,
-                processor_timeout=timedelta(days=365),
-                signal_conn=child_pipe,
-                dag_ids=[],
-                async_mode=False,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory=str(tmp_path),
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=child_pipe,
+            dag_ids=[],
+            async_mode=False,
         )
 
         with create_session() as session:
@@ -1274,16 +1186,13 @@ class TestDagProcessorJobRunner:
             session.add(DbCallbackRequest(callback=callback, 
priority_weight=10))
 
         child_pipe, parent_pipe = multiprocessing.Pipe()
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory=tmp_path,
-                max_runs=1,
-                processor_timeout=timedelta(days=365),
-                signal_conn=child_pipe,
-                dag_ids=[],
-                async_mode=False,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory=tmp_path,
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=child_pipe,
+            dag_ids=[],
+            async_mode=False,
         )
 
         with create_session() as session:
@@ -1296,16 +1205,13 @@ class TestDagProcessorJobRunner:
 
     def test_callback_queue(self, tmp_path):
         # given
-        manager = DagProcessorJobRunner(
-            job=Job(),
-            processor=DagFileProcessorManager(
-                dag_directory=TEST_DAG_FOLDER,
-                max_runs=1,
-                processor_timeout=timedelta(days=365),
-                signal_conn=MagicMock(),
-                dag_ids=[],
-                async_mode=True,
-            ),
+        manager = DagFileProcessorManager(
+            dag_directory=TEST_DAG_FOLDER,
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            async_mode=True,
         )
 
         dag1_req1 = DagCallbackRequest(
@@ -1335,26 +1241,26 @@ class TestDagProcessorJobRunner:
         )
 
         # when
-        manager.processor._add_callback_to_queue(dag1_req1)
-        manager.processor._add_callback_to_queue(dag2_req1)
+        manager._add_callback_to_queue(dag1_req1)
+        manager._add_callback_to_queue(dag2_req1)
 
         # then - requests should be in manager's queue, with dag2 ahead of 
dag1 (because it was added last)
-        assert manager.processor._file_path_queue == 
deque([dag2_req1.full_filepath, dag1_req1.full_filepath])
-        assert set(manager.processor._callback_to_execute.keys()) == {
+        assert manager._file_path_queue == deque([dag2_req1.full_filepath, 
dag1_req1.full_filepath])
+        assert set(manager._callback_to_execute.keys()) == {
             dag1_req1.full_filepath,
             dag2_req1.full_filepath,
         }
-        assert manager.processor._callback_to_execute[dag2_req1.full_filepath] 
== [dag2_req1]
+        assert manager._callback_to_execute[dag2_req1.full_filepath] == 
[dag2_req1]
 
         # update the queue, although the callback is registered
-        assert manager.processor._file_path_queue == 
deque([dag2_req1.full_filepath, dag1_req1.full_filepath])
+        assert manager._file_path_queue == deque([dag2_req1.full_filepath, 
dag1_req1.full_filepath])
 
         # when
-        manager.processor._add_callback_to_queue(dag1_req2)
+        manager._add_callback_to_queue(dag1_req2)
 
         # then - non-sla callback should have brought dag1 to the fore
-        assert manager.processor._file_path_queue == 
deque([dag1_req1.full_filepath, dag2_req1.full_filepath])
-        assert manager.processor._callback_to_execute[dag1_req1.full_filepath] 
== [
+        assert manager._file_path_queue == deque([dag1_req1.full_filepath, 
dag2_req1.full_filepath])
+        assert manager._callback_to_execute[dag1_req1.full_filepath] == [
             dag1_req1,
             dag1_req2,
         ]


Reply via email to