jhtimmins commented on code in PR #25528:
URL: https://github.com/apache/airflow/pull/25528#discussion_r940636832


##########
airflow/dag_processing/processor.py:
##########
@@ -137,27 +137,38 @@ def _run_file_processor(
 
         set_context(log, file_path)
         setproctitle(f"airflow scheduler - DagFileProcessor {file_path}")
+
+        def _handle_dag_file_processing():
+            # Re-configure the ORM engine as there are issues with multiple 
processes
+            settings.configure_orm()
+
+            # Change the thread name to differentiate log lines. This is
+            # really a separate process, but changing the name of the
+            # process doesn't work, so changing the thread name instead.
+            threading.current_thread().name = thread_name
+
+            log.info("Started process (PID=%s) to work on %s", os.getpid(), 
file_path)
+            dag_file_processor = DagFileProcessor(dag_ids=dag_ids, log=log)
+            result: Tuple[int, int] = dag_file_processor.process_file(
+                file_path=file_path,
+                pickle_dags=pickle_dags,
+                callback_requests=callback_requests,
+            )
+            result_channel.send(result)
+
         try:
-            # redirect stdout/stderr to log
-            with redirect_stdout(StreamLogWriter(log, logging.INFO)), 
redirect_stderr(
-                StreamLogWriter(log, logging.WARN)
-            ), Stats.timer() as timer:
-                # Re-configure the ORM engine as there are issues with 
multiple processes
-                settings.configure_orm()
-
-                # Change the thread name to differentiate log lines. This is
-                # really a separate process, but changing the name of the
-                # process doesn't work, so changing the thread name instead.
-                threading.current_thread().name = thread_name
-
-                log.info("Started process (PID=%s) to work on %s", 
os.getpid(), file_path)
-                dag_file_processor = DagFileProcessor(dag_ids=dag_ids, log=log)
-                result: Tuple[int, int] = dag_file_processor.process_file(
-                    file_path=file_path,
-                    pickle_dags=pickle_dags,
-                    callback_requests=callback_requests,
-                )
-                result_channel.send(result)
+            DAG_PROCESSOR_LOG_TARGET: str = conf.get('scheduler', 
'DAG_PROCESSOR_LOG_TARGET')
+            if DAG_PROCESSOR_LOG_TARGET == "stdout":
+                with Stats.timer() as timer:
+                    _handle_dag_file_processing()
+            else:

Review Comment:
   @kaxil ok running into some issue with testing this. Can you suggest an 
effective way to test something dependent on the internal behavior of the 
processor? It appears that no existing tests actually do this.
   
   This is the basic structure of the test I'm adding to `test_processor.py`.
   
   ```python
       @conf_vars({("logging", "dag_processor_log_target"): "file"})
       def test_stuff(self):
           """
           Test that the dag file processor does not call the sla miss callback 
when
           given an invalid sla
           """
           with create_session() as session:
               processor = DagFileProcessorProcess('abc.txt', False, 
["fake_dag_id"], [])
               # processor.start()
               start_method = processor._get_multiprocessing_start_method()
               context = multiprocessing.get_context(start_method)
   
               _parent_channel, _child_channel = context.Pipe(duplex=False)
               processor._run_file_processor(
                   result_channel=_child_channel,
                   parent_channel=_parent_channel,
                   file_path='abc.txt',
                   pickle_dags=False,
                   dag_ids=['fake_dag_ids'],
                   thread_name='DagFileProcessor0',
                   callback_requests=[],
               )
               _child_channel.close()
               del _child_channel
   ```
   
   These tests consistently fail with 
   ```
   ERROR 
tests/dag_processing/test_processor.py::TestDagFileProcessor::test_stuff - 
RuntimeError: Session must be set before!
   FAILED 
tests/dag_processing/test_processor.py::TestDagFileProcessor::test_stuff - 
BrokenPipeError: [Errno 32] Broken pipe
   ```
   
   I've attempted to address both of these issues without much luck. I've also 
looked into both going to a higher level to test (Instantiating 
`DagFileProcessorAgent`, which creates the `DagFileProcessorProcess` class), as 
well as mocking out internal methods called by 
`DagFileProcessorProcess._run_file_processor`, but this seems like a brittle 
approach, at best.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to