kaxil commented on code in PR #44972:
URL: https://github.com/apache/airflow/pull/44972#discussion_r1888243504


##########
tests/dag_processing/test_manager.py:
##########
@@ -1197,85 +970,31 @@ def test_callback_queue(self, tmp_path):
             dag1_req2,
         ]
 
-
-def _wait_for_processor_agent_to_complete(processor_agent: 
DagFileProcessorAgent):
-    start_timer = time.monotonic()
-    while time.monotonic() - start_timer < 10:
-        if processor_agent.done and all(
-            [processor.done for processor in 
processor_agent._processors.values()]
-        ):
-            break
-        processor_agent.heartbeat()
-        time.sleep(0.1)
+        with mock.patch.object(
+            DagFileProcessorProcess, "start", side_effect=lambda *args, 
**kwargs: self.mock_processor()
+        ) as start:
+            manager._start_new_processes()
+        # Callbacks passed to process ctor
+        start.assert_any_call(
+            id=mock.ANY, path=dag1_req1.full_filepath, callbacks=[dag1_req1, 
dag1_req2], selector=mock.ANY
+        )
+        # And removed from the queue
+        assert dag1_req1.full_filepath not in manager._callback_to_execute
 
 
 class TestDagFileProcessorAgent:
-    def setup_method(self):
-        # Make sure that the configure_logging is not cached
-        self.old_modules = dict(sys.modules)
-
-    def teardown_method(self):
-        # Remove any new modules imported during the test run. This lets us
-        # import the same source files for more than one test.
-        remove_list = []
-        for mod in sys.modules:
-            if mod not in self.old_modules:
-                remove_list.append(mod)
-
-        for mod in remove_list:
-            del sys.modules[mod]
-
-    def test_reload_module(self):
-        """
-        Configure the context to have logging.logging_config_class set to a 
fake logging
-        class path, thus when reloading logging module the 
airflow.processor_manager
-        logger should not be configured.
-        """
-        with settings_context(SETTINGS_FILE_VALID):
-            # Launch a process through DagFileProcessorAgent, which will try
-            # reload the logging module.
-            test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py"
-            log_file_loc = conf.get("logging", 
"DAG_PROCESSOR_MANAGER_LOG_LOCATION")
-
-            with contextlib.suppress(OSError):
-                os.remove(log_file_loc)
-
-            # Starting dag processing with 0 max_runs to avoid redundant 
operations.
-            processor_agent = DagFileProcessorAgent(test_dag_path, 0, 
timedelta(days=365))
-            processor_agent.start()
-
-            processor_agent._process.join()
-            # Since we are reloading logging config not creating this file,
-            # we should expect it to be nonexistent.
-
-            assert not os.path.isfile(log_file_loc)
-
-    @conf_vars({("core", "load_examples"): "False"})
-    def test_parse_once(self):

Review Comment:
   handling this in test_processor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to