jhtimmins commented on code in PR #25528:
URL: https://github.com/apache/airflow/pull/25528#discussion_r939300583
##########
airflow/dag_processing/processor.py:
##########
@@ -137,27 +137,38 @@ def _run_file_processor(
set_context(log, file_path)
setproctitle(f"airflow scheduler - DagFileProcessor {file_path}")
+
+ def _handle_dag_file_processing():
+ # Re-configure the ORM engine as there are issues with multiple
processes
+ settings.configure_orm()
+
+ # Change the thread name to differentiate log lines. This is
+ # really a separate process, but changing the name of the
+ # process doesn't work, so changing the thread name instead.
+ threading.current_thread().name = thread_name
+
+ log.info("Started process (PID=%s) to work on %s", os.getpid(),
file_path)
+ dag_file_processor = DagFileProcessor(dag_ids=dag_ids, log=log)
+ result: Tuple[int, int] = dag_file_processor.process_file(
+ file_path=file_path,
+ pickle_dags=pickle_dags,
+ callback_requests=callback_requests,
+ )
+ result_channel.send(result)
+
try:
- # redirect stdout/stderr to log
- with redirect_stdout(StreamLogWriter(log, logging.INFO)),
redirect_stderr(
- StreamLogWriter(log, logging.WARN)
- ), Stats.timer() as timer:
- # Re-configure the ORM engine as there are issues with
multiple processes
- settings.configure_orm()
-
- # Change the thread name to differentiate log lines. This is
- # really a separate process, but changing the name of the
- # process doesn't work, so changing the thread name instead.
- threading.current_thread().name = thread_name
-
- log.info("Started process (PID=%s) to work on %s",
os.getpid(), file_path)
- dag_file_processor = DagFileProcessor(dag_ids=dag_ids, log=log)
- result: Tuple[int, int] = dag_file_processor.process_file(
- file_path=file_path,
- pickle_dags=pickle_dags,
- callback_requests=callback_requests,
- )
- result_channel.send(result)
+ DAG_PROCESSOR_LOG_TARGET: str = conf.get('scheduler',
'DAG_PROCESSOR_LOG_TARGET')
+ if DAG_PROCESSOR_LOG_TARGET == "stdout":
+ with Stats.timer() as timer:
+ _handle_dag_file_processing()
+ else:
Review Comment:
Oooof. Alright I'll think out how to do that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]