ashb commented on code in PR #44972:
URL: https://github.com/apache/airflow/pull/44972#discussion_r1887707013


##########
airflow/dag_processing/manager.py:
##########
@@ -882,159 +826,62 @@ def set_file_paths(self, new_file_paths):
 
         self._processors = filtered_processors
 
-    def wait_until_finished(self):
-        """Sleeps until all the processors are done."""
-        for processor in self._processors.values():
-            while not processor.done:
-                time.sleep(0.1)
-
     @provide_session
-    def _collect_results_from_processor(self, processor, session: Session = 
NEW_SESSION) -> None:
-        self.log.debug("Processor for %s finished", processor.file_path)
-        Stats.decr("dag_processing.processes", tags={"file_path": 
processor.file_path, "action": "finish"})
-        last_finish_time = timezone.utcnow()
-
-        if processor.result is not None:
-            num_dags, count_import_errors, last_num_of_db_queries = 
processor.result
-        else:
-            self.log.error(
-                "Processor for %s exited with return code %s.", 
processor.file_path, processor.exit_code
-            )
-            count_import_errors = -1
-            num_dags = 0
-            last_num_of_db_queries = 0
-
-        last_duration = (last_finish_time - 
processor.start_time).total_seconds()
-        stat = DagFileStat(
-            num_dags=num_dags,
-            import_errors=count_import_errors,
-            last_finish_time=last_finish_time,
-            last_duration=last_duration,
-            run_count=self._file_stats[processor.file_path].run_count + 1,
-            last_num_of_db_queries=last_num_of_db_queries,
-        )
-        self._file_stats[processor.file_path] = stat
-        file_name = Path(processor.file_path).stem
-        """crude exposure of instrumentation code which may need to be 
furnished"""
-        span = Trace.get_tracer("DagFileProcessorManager").start_span(
-            "dag_processing", start_time=datetime_to_nano(processor.start_time)
-        )
-        span.set_attributes(
-            {
-                "file_path": processor.file_path,
-                "run_count": stat.run_count,
-            }
-        )
-
-        if processor.result is None:
-            span.set_attributes(
-                {
-                    "error": True,
-                    "processor.exit_code": processor.exit_code,
-                }
-            )
-        else:
-            span.set_attributes(
-                {
-                    "num_dags": num_dags,
-                    "import_errors": count_import_errors,
-                }
+    def _collect_results(self, session: Session = NEW_SESSION):
+        # TODO: Use an explicit session in this fn
+        finished = []
+        for path, proc in self._processors.items():
+            if proc.exit_code is None:
+                # This processor hasn't finished yet
+                continue
+            finished.append(path)
+
+            # Collect the DAGS and import errors into the DB, emit metrics etc.
+            self._file_stats[path] = process_parse_results(
+                run_duration=time.time() - proc.start_time,
+                finish_time=timezone.utcnow(),
+                run_count=self._file_stats[path].run_count,
+                parsing_result=proc.parsing_result,
+                path=path,
+                processor_subdir=self.get_dag_directory(),
+                session=session,
             )
-            if count_import_errors > 0:
-                span.set_attribute("error", True)
-                import_errors = session.scalars(
-                    select(ParseImportError).where(ParseImportError.filename 
== processor.file_path)
-                ).all()
-                for import_error in import_errors:
-                    span.add_event(
-                        name="exception",
-                        attributes={
-                            "filename": import_error.filename,
-                            "exception.type": "ParseImportError",
-                            "exception.name": "Import error when processing 
DAG file",
-                            "exception.stacktrace": import_error.stacktrace,
-                        },
-                    )
-
-        span.end(end_time=datetime_to_nano(last_finish_time))
-
-        Stats.timing(f"dag_processing.last_duration.{file_name}", 
last_duration * 1000.0)
-        Stats.timing("dag_processing.last_duration", last_duration * 1000.0, 
tags={"file_name": file_name})
-
-    def collect_results(self) -> None:
-        """Collect the result from any finished DAG processors."""
-        ready = multiprocessing.connection.wait(
-            self.waitables.keys() - [self._direct_scheduler_conn], timeout=0
-        )
 
-        for sentinel in ready:
-            if sentinel is not self._direct_scheduler_conn:
-                processor = cast(DagFileProcessorProcess, 
self.waitables[sentinel])
-                self.waitables.pop(processor.waitable_handle)
-                self._processors.pop(processor.file_path)
-                self._collect_results_from_processor(processor)
+        for path in finished:
+            self._processors.pop(path)
 
-        self.log.debug("%s/%s DAG parsing processes running", 
len(self._processors), self._parallelism)
+    def _create_process(self, file_path):
+        id = uuid7()
 
-        self.log.debug("%s file paths queued for processing", 
len(self._file_path_queue))
+        callback_to_execute_for_file = 
self._callback_to_execute.pop(file_path, [])
 
-    @staticmethod
-    def _create_process(file_path, dag_directory, callback_requests):
-        """Create DagFileProcessorProcess instance."""
-        return DagFileProcessorProcess(
-            file_path=file_path,
-            dag_directory=dag_directory,
-            callback_requests=callback_requests,
+        return DagFileProcessorProcess.start(
+            id=id,
+            path=file_path,
+            callbacks=callback_to_execute_for_file,
+            selector=self.selector,
         )
 
-    @add_span
-    def start_new_processes(self):
+    def _start_new_processes(self):
         """Start more processors if we have enough slots and files to 
process."""
-        # initialize cache to mutualize calls to Variable.get in DAGs
-        # needs to be done before this process is forked to create the DAG 
parsing processes.
-        SecretCache.init()

Review Comment:
   This was moved up to the main loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to