potiuk commented on a change in pull request #5615: [AIRFLOW-XXX] Use
events/messages not multiprocessing.Manager
URL: https://github.com/apache/airflow/pull/5615#discussion_r306249102
##########
File path: airflow/utils/dag_processing.py
##########
@@ -543,94 +543,86 @@ def wait_until_finished(self):
Should only be used when launched DAG file processor manager in sync
mode.
Wait for done signal from the manager.
"""
- while True:
- if self._parent_signal_conn.recv() ==
DagParsingSignal.MANAGER_DONE:
- break
+ while not self.done:
+ result = self._parent_signal_conn.recve()
+ if isinstance(result, DagParsingStat):
+ self._sync_metadata(result)
@staticmethod
- def _launch_process(dag_directory,
- file_paths,
- max_runs,
- processor_factory,
- signal_conn,
- _stat_queue,
- result_queue,
- async_mode):
- def helper():
- # Reload configurations and settings to avoid collision with
parent process.
- # Because this process may need custom configurations that cannot
be shared,
- # e.g. RotatingFileHandler. And it can cause connection corruption
if we
- # do not recreate the SQLA connection pool.
- os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER'] = 'True'
- # Replicating the behavior of how logging module was loaded
- # in logging_config.py
-
reload_module(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit('.',
1)[0]))
- reload_module(airflow.settings)
- airflow.settings.initialize()
- del os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER']
- processor_manager = DagFileProcessorManager(dag_directory,
- file_paths,
- max_runs,
- processor_factory,
- signal_conn,
- _stat_queue,
- result_queue,
- async_mode)
-
- processor_manager.start()
-
- p = multiprocessing.Process(target=helper,
- args=(),
- name="DagFileProcessorManager")
- p.start()
- return p
+ def _run_processor_manager(dag_directory,
+ file_paths,
+ max_runs,
+ processor_factory,
+ signal_conn,
+ async_mode):
+ from setproctitle import setproctitle
+ setproctitle("airflow scheduler -- DagFileProcessorManager")
+ # Reload configurations and settings to avoid collision with parent
process.
+ # Because this process may need custom configurations that cannot be
shared,
+ # e.g. RotatingFileHandler. And it can cause connection corruption if
we
+ # do not recreate the SQLA connection pool.
+ os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER'] = 'True'
+ # Replicating the behavior of how logging module was loaded
+ # in logging_config.py
+
reload_module(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit('.',
1)[0]))
+ reload_module(airflow.settings)
+ airflow.settings.initialize()
+ del os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER']
+ processor_manager = DagFileProcessorManager(dag_directory,
+ file_paths,
+ max_runs,
+ processor_factory,
+ signal_conn,
+ async_mode)
+
+ processor_manager.start()
def harvest_simple_dags(self):
"""
Harvest DAG parsing results from result queue and sync metadata from
stat queue.
+
:return: List of parsing result in SimpleDag format.
"""
- # Metadata and results to be harvested can be inconsistent,
- # but it should not be a big problem.
- self._sync_metadata()
- # Heartbeating after syncing metadata so we do not restart manager
- # if it processed all files for max_run times and exit normally.
- self._heartbeat_manager()
+
simple_dags = []
- while True:
- try:
- result = self._result_queue.get_nowait()
- try:
- simple_dags.append(result)
- finally:
- self._result_queue.task_done()
- except Empty:
- break
+ while self._parent_signal_conn.poll():
+ result = self._parent_signal_conn.recv()
+
+ self.log.debug("Received message of type %s",
type(result).__name__)
+ if isinstance(result, DagParsingStat):
+ self._sync_metadata(result)
+ else:
+ simple_dags.append(result)
+
+ # Receive any pending messages before checking if the process has
exited.
+ self._heartbeat_manager()
self._result_count = 0
return simple_dags
def _heartbeat_manager(self):
"""
- Heartbeat DAG file processor and start it if it is not alive.
- :return:
+ Heartbeat DAG file processor and retstart it if we are not done.
Review comment:
NIT: Retstart -> restart
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services