potiuk commented on a change in pull request #5615: [AIRFLOW-XXX] Use 
events/messages not multiprocessing.Manager
URL: https://github.com/apache/airflow/pull/5615#discussion_r306249395
 
 

 ##########
 File path: airflow/utils/dag_processing.py
 ##########
 @@ -543,94 +543,86 @@ def wait_until_finished(self):
         Should only be used when launched DAG file processor manager in sync 
mode.
         Wait for done signal from the manager.
         """
-        while True:
-            if self._parent_signal_conn.recv() == 
DagParsingSignal.MANAGER_DONE:
-                break
+        while not self.done:
+            result = self._parent_signal_conn.recve()
+            if isinstance(result, DagParsingStat):
+                self._sync_metadata(result)
 
     @staticmethod
-    def _launch_process(dag_directory,
-                        file_paths,
-                        max_runs,
-                        processor_factory,
-                        signal_conn,
-                        _stat_queue,
-                        result_queue,
-                        async_mode):
-        def helper():
-            # Reload configurations and settings to avoid collision with 
parent process.
-            # Because this process may need custom configurations that cannot 
be shared,
-            # e.g. RotatingFileHandler. And it can cause connection corruption 
if we
-            # do not recreate the SQLA connection pool.
-            os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER'] = 'True'
-            # Replicating the behavior of how logging module was loaded
-            # in logging_config.py
-            
reload_module(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit('.', 
1)[0]))
-            reload_module(airflow.settings)
-            airflow.settings.initialize()
-            del os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER']
-            processor_manager = DagFileProcessorManager(dag_directory,
-                                                        file_paths,
-                                                        max_runs,
-                                                        processor_factory,
-                                                        signal_conn,
-                                                        _stat_queue,
-                                                        result_queue,
-                                                        async_mode)
-
-            processor_manager.start()
-
-        p = multiprocessing.Process(target=helper,
-                                    args=(),
-                                    name="DagFileProcessorManager")
-        p.start()
-        return p
+    def _run_processor_manager(dag_directory,
+                               file_paths,
+                               max_runs,
+                               processor_factory,
+                               signal_conn,
+                               async_mode):
+        from setproctitle import setproctitle
+        setproctitle("airflow scheduler -- DagFileProcessorManager")
+        # Reload configurations and settings to avoid collision with parent 
process.
+        # Because this process may need custom configurations that cannot be 
shared,
+        # e.g. RotatingFileHandler. And it can cause connection corruption if 
we
+        # do not recreate the SQLA connection pool.
+        os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER'] = 'True'
+        # Replicating the behavior of how logging module was loaded
+        # in logging_config.py
+        
reload_module(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit('.', 
1)[0]))
+        reload_module(airflow.settings)
+        airflow.settings.initialize()
+        del os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER']
+        processor_manager = DagFileProcessorManager(dag_directory,
+                                                    file_paths,
+                                                    max_runs,
+                                                    processor_factory,
+                                                    signal_conn,
+                                                    async_mode)
+
+        processor_manager.start()
 
     def harvest_simple_dags(self):
         """
         Harvest DAG parsing results from result queue and sync metadata from 
stat queue.
+
         :return: List of parsing result in SimpleDag format.
         """
-        # Metadata and results to be harvested can be inconsistent,
-        # but it should not be a big problem.
-        self._sync_metadata()
-        # Heartbeating after syncing metadata so we do not restart manager
-        # if it processed all files for max_run times and exit normally.
-        self._heartbeat_manager()
+
         simple_dags = []
-        while True:
-            try:
-                result = self._result_queue.get_nowait()
-                try:
-                    simple_dags.append(result)
-                finally:
-                    self._result_queue.task_done()
-            except Empty:
-                break
+        while self._parent_signal_conn.poll():
+            result = self._parent_signal_conn.recv()
+
+            self.log.debug("Received message of type %s", 
type(result).__name__)
+            if isinstance(result, DagParsingStat):
+                self._sync_metadata(result)
+            else:
+                simple_dags.append(result)
+
+        # Receive any pending messages before checking if the process has 
exited.
+        self._heartbeat_manager()
 
         self._result_count = 0
 
         return simple_dags
 
     def _heartbeat_manager(self):
         """
-        Heartbeat DAG file processor and start it if it is not alive.
-        :return:
+        Heartbeat DAG file processor and retstart it if we are not done.
         """
-        if self._process and not self._process.is_alive() and not self.done:
-            self.start()
+        if self._process and not self._process.is_alive():
+            self._process.join(timeout=0)
 
 Review comment:
   Again here - do we have to join() here if process is not alive() ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to