kaxil commented on code in PR #44972:
URL: https://github.com/apache/airflow/pull/44972#discussion_r1888177170


##########
airflow/dag_processing/manager.py:
##########
@@ -1254,3 +1078,39 @@ def reload_configuration_for_dag_processing():
     importlib.reload(airflow.settings)
     airflow.settings.initialize()
     del os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"]
+
+
+def process_parse_results(
+    run_duration: float,
+    finish_time: datetime,
+    run_count: int,
+    path: str,
+    parsing_result: DagFileParsingResult | None,
+    processor_subdir: str | None,
+    session: Session,
+) -> DagFileStat:

Review Comment:
   Worth adding a docstring describing it on high-level i.e parsing and store 
dag and import errors in DB



##########
airflow/dag_processing/manager.py:
##########
@@ -711,7 +683,10 @@ def clear_nonexistent_import_errors(self, 
session=NEW_SESSION):
         query = 
delete(ParseImportError).where(ParseImportError.processor_subdir == 
self.get_dag_directory())
 
         if self._file_paths:
-            query = 
query.where(ParseImportError.filename.notin_(self._file_paths))
+            query = query.where(
+                ~ParseImportError.filename.in_(self._file_paths),

Review Comment:
   nit:
   
   `notin_` feels more explicit imo.. but I think I know why you did that, 
since we now have 2 conditions, easier to identify what isn't vs what is reqd
   
   ```suggestion
                   ParseImportError.filename.notin_(self._file_paths),
   ```



##########
airflow/dag_processing/manager.py:
##########
@@ -560,54 +505,92 @@ def _run_parsing_loop(self):
                     # only drives "max runs")
                     self.log.debug("BlockingIOError received trying to send 
DagParsingStat, ignoring")
 
-                if max_runs_reached:
-                    self.log.info(
-                        "Exiting dag parsing loop as all files have been 
processed %s times", self._max_runs
-                    )
-                    if span.is_recording():
-                        span.add_event(
-                            name="info",
-                            attributes={
-                                "message": "Exiting dag parsing loop as all 
files have been processed {self._max_runs} times"
-                            },
-                        )
-                    break
+            if self.max_runs_reached():
+                self.log.info(
+                    "Exiting dag parsing loop as all files have been processed 
%s times", self.max_runs
+                )
+                break
 
-                loop_duration = time.monotonic() - loop_start_time
-                if loop_duration < 1:
-                    poll_time = 1 - loop_duration
-                else:
-                    poll_time = 0.0
+            loop_duration = time.monotonic() - loop_start_time
+            if loop_duration < 1:
+                poll_time = 1 - loop_duration
+            else:
+                poll_time = 0.0
+
+    def _service_processor_sockets(self, timeout: float | None = 1.0):
+        """
+        Service subprocess events by polling sockets for activity.
+
+        This runs `select` (or a platform equivalent) to look for activity on 
the sockets connected to the
+        parsing subprocesses, and calls the registered handler function for 
each socket.
+
+        All the parsing processes socket handlers are registered into a single 
Selector
+        """
+        events = self.selector.select(timeout=timeout)
+        for key, _ in events:
+            socket_handler = key.data
+            need_more = socket_handler(key.fileobj)
+
+            if not need_more:
+                self.selector.unregister(key.fileobj)
+                key.fileobj.close()  # type: ignore[union-attr]

Review Comment:
   This isn't a big func so not a big deal -- but thinking if we can re-use: 
`WatchedSubprocess._service_subprocess`



##########
airflow/callbacks/callback_requests.py:
##########
@@ -16,49 +16,37 @@
 # under the License.
 from __future__ import annotations
 
-import json
 from typing import TYPE_CHECKING
 
+from pydantic import BaseModel
+
+from airflow.api_fastapi.execution_api.datamodels import taskinstance as 
ti_datamodel  # noqa: TC001
 from airflow.utils.state import TaskInstanceState
 
 if TYPE_CHECKING:
-    from airflow.models.taskinstance import SimpleTaskInstance
+    from airflow.typing_compat import Self
 
 
-class CallbackRequest:
+class CallbackRequest(BaseModel):
     """
     Base Class with information about the callback to be executed.
 
-    :param full_filepath: File Path to use to run the callback
     :param msg: Additional Message that can be used for logging
     :param processor_subdir: Directory used by Dag Processor when parsed the 
dag.
     """
 
-    def __init__(
-        self,
-        full_filepath: str,
-        processor_subdir: str | None = None,
-        msg: str | None = None,
-    ):
-        self.full_filepath = full_filepath
-        self.processor_subdir = processor_subdir
-        self.msg = msg
-
-    def __eq__(self, other):
-        if isinstance(other, self.__class__):
-            return self.__dict__ == other.__dict__
-        return NotImplemented
+    full_filepath: str
+    """File Path to use to run the callback"""
+    processor_subdir: str | None = None
+    """Directory used by Dag Processor when parsed the dag"""
+    msg: str | None = None
+    """Additional Message that can be used for logging to determine 
failure/zombie"""
 
-    def __repr__(self):
-        return str(self.__dict__)
-
-    def to_json(self) -> str:
-        return json.dumps(self.__dict__)
+    to_json = BaseModel.model_dump_json

Review Comment:
   It probably was used for 
   
   
https://github.com/apache/airflow/blob/8848d6e35f6ead2cc1ed9887af8360fbd61c78c3/airflow/serialization/serialized_objects.py#L772-L775



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to