kaxil commented on code in PR #44972:
URL: https://github.com/apache/airflow/pull/44972#discussion_r1888177170
##########
airflow/dag_processing/manager.py:
##########
@@ -1254,3 +1078,39 @@ def reload_configuration_for_dag_processing():
importlib.reload(airflow.settings)
airflow.settings.initialize()
del os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"]
+
+
+def process_parse_results(
+ run_duration: float,
+ finish_time: datetime,
+ run_count: int,
+ path: str,
+ parsing_result: DagFileParsingResult | None,
+ processor_subdir: str | None,
+ session: Session,
+) -> DagFileStat:
Review Comment:
Worth adding a docstring describing it on high-level i.e parsing and store
dag and import errors in DB
##########
airflow/dag_processing/manager.py:
##########
@@ -711,7 +683,10 @@ def clear_nonexistent_import_errors(self,
session=NEW_SESSION):
query =
delete(ParseImportError).where(ParseImportError.processor_subdir ==
self.get_dag_directory())
if self._file_paths:
- query =
query.where(ParseImportError.filename.notin_(self._file_paths))
+ query = query.where(
+ ~ParseImportError.filename.in_(self._file_paths),
Review Comment:
nit:
`notin_` feels more explicit imo.. but I think I know why you did that,
since we now have 2 conditions, easier to identify what isn't vs what is reqd
```suggestion
ParseImportError.filename.notin_(self._file_paths),
```
##########
airflow/dag_processing/manager.py:
##########
@@ -560,54 +505,92 @@ def _run_parsing_loop(self):
# only drives "max runs")
self.log.debug("BlockingIOError received trying to send
DagParsingStat, ignoring")
- if max_runs_reached:
- self.log.info(
- "Exiting dag parsing loop as all files have been
processed %s times", self._max_runs
- )
- if span.is_recording():
- span.add_event(
- name="info",
- attributes={
- "message": "Exiting dag parsing loop as all
files have been processed {self._max_runs} times"
- },
- )
- break
+ if self.max_runs_reached():
+ self.log.info(
+ "Exiting dag parsing loop as all files have been processed
%s times", self.max_runs
+ )
+ break
- loop_duration = time.monotonic() - loop_start_time
- if loop_duration < 1:
- poll_time = 1 - loop_duration
- else:
- poll_time = 0.0
+ loop_duration = time.monotonic() - loop_start_time
+ if loop_duration < 1:
+ poll_time = 1 - loop_duration
+ else:
+ poll_time = 0.0
+
+ def _service_processor_sockets(self, timeout: float | None = 1.0):
+ """
+ Service subprocess events by polling sockets for activity.
+
+ This runs `select` (or a platform equivalent) to look for activity on
the sockets connected to the
+ parsing subprocesses, and calls the registered handler function for
each socket.
+
+ All the parsing processes socket handlers are registered into a single
Selector
+ """
+ events = self.selector.select(timeout=timeout)
+ for key, _ in events:
+ socket_handler = key.data
+ need_more = socket_handler(key.fileobj)
+
+ if not need_more:
+ self.selector.unregister(key.fileobj)
+ key.fileobj.close() # type: ignore[union-attr]
Review Comment:
This isn't a big func so not a big deal -- but thinking if we can re-use:
`WatchedSubprocess._service_subprocess`
##########
airflow/callbacks/callback_requests.py:
##########
@@ -16,49 +16,37 @@
# under the License.
from __future__ import annotations
-import json
from typing import TYPE_CHECKING
+from pydantic import BaseModel
+
+from airflow.api_fastapi.execution_api.datamodels import taskinstance as
ti_datamodel # noqa: TC001
from airflow.utils.state import TaskInstanceState
if TYPE_CHECKING:
- from airflow.models.taskinstance import SimpleTaskInstance
+ from airflow.typing_compat import Self
-class CallbackRequest:
+class CallbackRequest(BaseModel):
"""
Base Class with information about the callback to be executed.
- :param full_filepath: File Path to use to run the callback
:param msg: Additional Message that can be used for logging
:param processor_subdir: Directory used by Dag Processor when parsed the
dag.
"""
- def __init__(
- self,
- full_filepath: str,
- processor_subdir: str | None = None,
- msg: str | None = None,
- ):
- self.full_filepath = full_filepath
- self.processor_subdir = processor_subdir
- self.msg = msg
-
- def __eq__(self, other):
- if isinstance(other, self.__class__):
- return self.__dict__ == other.__dict__
- return NotImplemented
+ full_filepath: str
+ """File Path to use to run the callback"""
+ processor_subdir: str | None = None
+ """Directory used by Dag Processor when parsed the dag"""
+ msg: str | None = None
+ """Additional Message that can be used for logging to determine
failure/zombie"""
- def __repr__(self):
- return str(self.__dict__)
-
- def to_json(self) -> str:
- return json.dumps(self.__dict__)
+ to_json = BaseModel.model_dump_json
Review Comment:
It probably was used for
https://github.com/apache/airflow/blob/8848d6e35f6ead2cc1ed9887af8360fbd61c78c3/airflow/serialization/serialized_objects.py#L772-L775
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]