ashb commented on a change in pull request #5615: [AIRFLOW-XXX] Use
events/messages not multiprocessing.Manager
URL: https://github.com/apache/airflow/pull/5615#discussion_r306251464
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -120,66 +119,68 @@ def _launch_process(result_queue,
:return: the process that was launched
:rtype: multiprocessing.Process
"""
- def helper():
- # This helper runs in the newly created process
- log = logging.getLogger("airflow.processor")
-
- stdout = StreamLogWriter(log, logging.INFO)
- stderr = StreamLogWriter(log, logging.WARN)
-
- set_context(log, file_path)
-
- try:
- # redirect stdout/stderr to log
- sys.stdout = stdout
- sys.stderr = stderr
-
- # Re-configure the ORM engine as there are issues with
multiple processes
- settings.configure_orm()
-
- # Change the thread name to differentiate log lines. This is
- # really a separate process, but changing the name of the
- # process doesn't work, so changing the thread name instead.
- threading.current_thread().name = thread_name
- start_time = time.time()
-
- log.info("Started process (PID=%s) to work on %s",
- os.getpid(), file_path)
- scheduler_job = SchedulerJob(dag_ids=dag_id_white_list,
log=log)
- result = scheduler_job.process_file(file_path, pickle_dags)
- result_queue.put(result)
- end_time = time.time()
- log.info(
- "Processing %s took %.3f seconds", file_path, end_time -
start_time
- )
- except Exception:
- # Log exceptions through the logging framework.
- log.exception("Got an exception! Propagating...")
- raise
- finally:
- sys.stdout = sys.__stdout__
- sys.stderr = sys.__stderr__
- # We re-initialized the ORM within this Process above so we
need to
- # tear it down manually here
- settings.dispose_orm()
-
- p = multiprocessing.Process(target=helper,
- args=(),
- name="{}-Process".format(thread_name))
- p.start()
- return p
+ # This helper runs in the newly created process
+ log = logging.getLogger("airflow.processor")
+
+ stdout = StreamLogWriter(log, logging.INFO)
+ stderr = StreamLogWriter(log, logging.WARN)
+
+ set_context(log, file_path)
+ from setproctitle import setproctitle
+ setproctitle("airflow scheduler - DagFileProcessor
{}".format(file_path))
Review comment:
Yeah, this was something I added to (try) and track down _what_ kind of
process was leaking, and I decided to keep it anyway.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services