kaxil commented on code in PR #44972:
URL: https://github.com/apache/airflow/pull/44972#discussion_r1888145365


##########
airflow/dag_processing/manager.py:
##########
@@ -454,104 +437,66 @@ def deactivate_stale_dags(
             )
             deactivated = deactivated_dagmodel.rowcount
             if deactivated:
-                cls.logger().info("Deactivated %i DAGs which are no longer 
present in file.", deactivated)
+                self.log.info("Deactivated %i DAGs which are no longer present 
in file.", deactivated)
 
     def _run_parsing_loop(self):
-        poll_time = 0.0
+        # initialize cache to mutualize calls to Variable.get in DAGs
+        # needs to be done before this process is forked to create the DAG 
parsing processes.
+        SecretCache.init()
 
-        self._refresh_dag_dir()
-        self.prepare_file_path_queue()
-        max_callbacks_per_loop = conf.getint("scheduler", 
"max_callbacks_per_loop")
+        if self._direct_scheduler_conn is not None:
+            self.selector.register(
+                self._direct_scheduler_conn, selectors.EVENT_READ, 
self._read_from_direct_scheduler_conn
+            )
+
+        poll_time = 0.0
 
-        self.start_new_processes()
         while True:
-            with Trace.start_span(span_name="dag_parsing_loop", 
component="DagFileProcessorManager") as span:
-                loop_start_time = time.monotonic()
-                ready = multiprocessing.connection.wait(self.waitables.keys(), 
timeout=poll_time)
-                if span.is_recording():
-                    span.add_event(name="heartbeat")
-                self.heartbeat()
-                if self._direct_scheduler_conn is not None and 
self._direct_scheduler_conn in ready:
-                    agent_signal = self._direct_scheduler_conn.recv()
-
-                    self.log.debug("Received %s signal from 
DagFileProcessorAgent", agent_signal)
-                    if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
-                        self.terminate()
-                        break
-                    elif agent_signal == DagParsingSignal.END_MANAGER:
-                        self.end()
-                        sys.exit(os.EX_OK)
-                    elif isinstance(agent_signal, CallbackRequest):
-                        self._add_callback_to_queue(agent_signal)
-                    else:
-                        raise ValueError(f"Invalid message 
{type(agent_signal)}")
-
-                for sentinel in ready:
-                    if sentinel is not self._direct_scheduler_conn:
-                        processor = self.waitables.get(sentinel)
-                        if processor:
-                            self._collect_results_from_processor(processor)
-                            self.waitables.pop(sentinel)
-                            self._processors.pop(processor.file_path)
-
-                if self.standalone_dag_processor:
-                    for callback in DagFileProcessorManager._fetch_callbacks(
-                        max_callbacks_per_loop, self.standalone_dag_processor, 
self.get_dag_directory()
-                    ):
-                        self._add_callback_to_queue(callback)
-                self._scan_stale_dags()
-                DagWarning.purge_inactive_dag_warnings()
-                refreshed_dag_dir = self._refresh_dag_dir()
-
-                if span.is_recording():
-                    span.add_event(name="_kill_timed_out_processors")
-                self._kill_timed_out_processors()
-
-                # Generate more file paths to process if we processed all the 
files already. Note for this
-                # to clear down, we must have cleared all files found from 
scanning the dags dir _and_ have
-                # cleared all files added as a result of callbacks
-                if not self._file_path_queue:
-                    self.emit_metrics()
-                    if span.is_recording():
-                        span.add_event(name="prepare_file_path_queue")
-                    self.prepare_file_path_queue()
+            loop_start_time = time.monotonic()
+            refreshed_dag_dir = self._refresh_dag_dir()
+
+            self._kill_timed_out_processors()
 
+            if not self._file_path_queue:
+                # Generate more file paths to process if we processed all the 
files already. Note for this to
+                # clear down, we must have cleared all files found from 
scanning the dags dir _and_ have
+                # cleared all files added as a result of callbacks
+                self.prepare_file_path_queue()
+                self.emit_metrics()
+            elif refreshed_dag_dir:
                 # if new files found in dag dir, add them
-                elif refreshed_dag_dir:
-                    if span.is_recording():
-                        span.add_event(name="add_new_file_path_to_queue")
-                    self.add_new_file_path_to_queue()
+                self.add_new_file_path_to_queue()
+
+            self._refresh_requested_filelocs()
+
+            self._start_new_processes()
 
-                self._refresh_requested_filelocs()
-                if span.is_recording():
-                    span.add_event(name="start_new_processes")
-                self.start_new_processes()
+            self._service_processor_sockets(timeout=poll_time)
 
-                # Update number of loop iteration.
-                self._num_run += 1
+            self._collect_results()
 
-                # Collect anything else that has finished, but don't kick off 
any more processors
-                if span.is_recording():
-                    span.add_event(name="collect_results")
-                self.collect_results()
+            if self.standalone_dag_processor:
+                for callback in self._fetch_callbacks():
+                    self._add_callback_to_queue(callback)
+            self._scan_stale_dags()
+            DagWarning.purge_inactive_dag_warnings()

Review Comment:
   So in one of the next PRs, I assume we will need API endpoints for things 
like this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to