jhtimmins commented on code in PR #25528:
URL: https://github.com/apache/airflow/pull/25528#discussion_r940636832
##########
airflow/dag_processing/processor.py:
##########
@@ -137,27 +137,38 @@ def _run_file_processor(
set_context(log, file_path)
setproctitle(f"airflow scheduler - DagFileProcessor {file_path}")
+
+ def _handle_dag_file_processing():
+ # Re-configure the ORM engine as there are issues with multiple
processes
+ settings.configure_orm()
+
+ # Change the thread name to differentiate log lines. This is
+ # really a separate process, but changing the name of the
+ # process doesn't work, so changing the thread name instead.
+ threading.current_thread().name = thread_name
+
+ log.info("Started process (PID=%s) to work on %s", os.getpid(),
file_path)
+ dag_file_processor = DagFileProcessor(dag_ids=dag_ids, log=log)
+ result: Tuple[int, int] = dag_file_processor.process_file(
+ file_path=file_path,
+ pickle_dags=pickle_dags,
+ callback_requests=callback_requests,
+ )
+ result_channel.send(result)
+
try:
- # redirect stdout/stderr to log
- with redirect_stdout(StreamLogWriter(log, logging.INFO)),
redirect_stderr(
- StreamLogWriter(log, logging.WARN)
- ), Stats.timer() as timer:
- # Re-configure the ORM engine as there are issues with
multiple processes
- settings.configure_orm()
-
- # Change the thread name to differentiate log lines. This is
- # really a separate process, but changing the name of the
- # process doesn't work, so changing the thread name instead.
- threading.current_thread().name = thread_name
-
- log.info("Started process (PID=%s) to work on %s",
os.getpid(), file_path)
- dag_file_processor = DagFileProcessor(dag_ids=dag_ids, log=log)
- result: Tuple[int, int] = dag_file_processor.process_file(
- file_path=file_path,
- pickle_dags=pickle_dags,
- callback_requests=callback_requests,
- )
- result_channel.send(result)
+ DAG_PROCESSOR_LOG_TARGET: str = conf.get('scheduler',
'DAG_PROCESSOR_LOG_TARGET')
+ if DAG_PROCESSOR_LOG_TARGET == "stdout":
+ with Stats.timer() as timer:
+ _handle_dag_file_processing()
+ else:
Review Comment:
@kaxil ok running into some issue with testing this. Can you suggest an
effective way to test something dependent on the internal behavior of the
processor? It appears that no existing tests actually do this.
This is the basic structure of the test I'm adding to `test_processor.py`.
```python
@conf_vars({("logging", "dag_processor_log_target"): "file"})
def test_stuff(self):
"""
Test that the dag file processor does not call the sla miss callback
when
given an invalid sla
"""
with create_session() as session:
processor = DagFileProcessorProcess('abc.txt', False,
["fake_dag_id"], [])
# processor.start()
start_method = processor._get_multiprocessing_start_method()
context = multiprocessing.get_context(start_method)
_parent_channel, _child_channel = context.Pipe(duplex=False)
processor._run_file_processor(
result_channel=_child_channel,
parent_channel=_parent_channel,
file_path='abc.txt',
pickle_dags=False,
dag_ids=['fake_dag_ids'],
thread_name='DagFileProcessor0',
callback_requests=[],
)
_child_channel.close()
del _child_channel
```
These tests consistently fail with
```
ERROR
tests/dag_processing/test_processor.py::TestDagFileProcessor::test_stuff -
RuntimeError: Session must be set before!
FAILED
tests/dag_processing/test_processor.py::TestDagFileProcessor::test_stuff -
BrokenPipeError: [Errno 32] Broken pipe
```
I've attempted to address both of these issues without much luck. I've also
looked into both going to a higher level to test (Instantiating
`DagFileProcessorAgent`, which creates the `DagFileProcessorProcess` class), as
well as mocking out internal methods called by
`DagFileProcessorProcess._run_file_processor`, but this seems like a brittle
approach, at best.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]