kaxil commented on code in PR #44972:
URL: https://github.com/apache/airflow/pull/44972#discussion_r1888041878


##########
airflow/dag_processing/manager.py:
##########
@@ -262,10 +258,8 @@ def terminate(self):
         """Send termination signal to DAG parsing processor manager to 
terminate all DAG file processors."""
         if self._process and self._process.is_alive():
             self.log.info("Sending termination message to manager.")
-            try:
-                
self._parent_signal_conn.send(DagParsingSignal.TERMINATE_MANAGER)
-            except ConnectionError:
-                pass
+            self._parent_signal_conn.send(None)
+            self._parent_signal_conn.close()

Review Comment:
   Do we need to handle `ConnectionError`/ `OSError`?



##########
airflow/dag_processing/manager.py:
##########
@@ -298,74 +301,58 @@ class DagFileProcessorManager(LoggingMixin):
     :param signal_conn: connection to communicate signal with processor agent.
     """
 
-    def __init__(
-        self,
-        dag_directory: os.PathLike[str],
-        max_runs: int,
-        processor_timeout: timedelta,
-        signal_conn: MultiprocessingConnection | None = None,
-    ):
-        super().__init__()
-        # known files; this will be updated every `dag_dir_list_interval` and 
stuff added/removed accordingly
-        self._file_paths: list[str] = []
-        self._file_path_queue: deque[str] = deque()
-        self._max_runs = max_runs
-        # signal_conn is None for dag_processor_standalone mode.
-        self._direct_scheduler_conn = signal_conn
-        self._parsing_start_time: float | None = None
-        self._dag_directory = dag_directory
-        # Set the signal conn in to non-blocking mode, so that attempting to
-        # send when the buffer is full errors, rather than hangs for-ever
-        # attempting to send (this is to avoid deadlocks!)
-        if self._direct_scheduler_conn:
+    _dag_directory: os.PathLike[str]

Review Comment:
   Any reason to mark this private (`_da...`)?



##########
airflow/dag_processing/collection.py:
##########
@@ -393,19 +394,26 @@ def update_dags(
             dm.is_active = True
             dm.has_import_errors = False
             dm.last_parsed_time = utcnow()
-            dm.default_view = dag.default_view
             if hasattr(dag, "_dag_display_property_value"):
                 dm._dag_display_property_value = 
dag._dag_display_property_value
             elif dag.dag_display_name != dag.dag_id:
                 dm._dag_display_property_value = dag.dag_display_name
             dm.description = dag.description
-            dm.max_active_tasks = dag.max_active_tasks
-            dm.max_active_runs = dag.max_active_runs
-            dm.max_consecutive_failed_dag_runs = 
dag.max_consecutive_failed_dag_runs
-            dm.has_task_concurrency_limits = any(
-                t.max_active_tis_per_dag is not None or 
t.max_active_tis_per_dagrun is not None
-                for t in dag.tasks
-            )
+            # TODO: this `if is not None` is maybe not the best. It's convient 
though
+            if dag.max_active_tasks is not None:
+                dm.max_active_tasks = dag.max_active_tasks

Review Comment:
   Do you know any reason why we didn't have to do it before? 



##########
airflow/callbacks/callback_requests.py:
##########
@@ -16,49 +16,37 @@
 # under the License.
 from __future__ import annotations
 
-import json
 from typing import TYPE_CHECKING
 
+from pydantic import BaseModel
+
+from airflow.api_fastapi.execution_api.datamodels import taskinstance as 
ti_datamodel  # noqa: TC001
 from airflow.utils.state import TaskInstanceState
 
 if TYPE_CHECKING:
-    from airflow.models.taskinstance import SimpleTaskInstance
+    from airflow.typing_compat import Self
 
 
-class CallbackRequest:
+class CallbackRequest(BaseModel):
     """
     Base Class with information about the callback to be executed.
 
-    :param full_filepath: File Path to use to run the callback
     :param msg: Additional Message that can be used for logging
     :param processor_subdir: Directory used by Dag Processor when parsed the 
dag.
     """
 
-    def __init__(
-        self,
-        full_filepath: str,
-        processor_subdir: str | None = None,
-        msg: str | None = None,
-    ):
-        self.full_filepath = full_filepath
-        self.processor_subdir = processor_subdir
-        self.msg = msg
-
-    def __eq__(self, other):
-        if isinstance(other, self.__class__):
-            return self.__dict__ == other.__dict__
-        return NotImplemented
+    full_filepath: str
+    """File Path to use to run the callback"""
+    processor_subdir: str | None = None
+    """Directory used by Dag Processor when parsed the dag"""
+    msg: str | None = None
+    """Additional Message that can be used for logging to determine 
failure/zombie"""
 
-    def __repr__(self):
-        return str(self.__dict__)
-
-    def to_json(self) -> str:
-        return json.dumps(self.__dict__)
+    to_json = BaseModel.model_dump_json

Review Comment:
   That way it is consistent in terms of mapping to pydantic methods: 
`from_json` -> class, `to_json` -> instance



##########
airflow/callbacks/callback_requests.py:
##########
@@ -16,49 +16,37 @@
 # under the License.
 from __future__ import annotations
 
-import json
 from typing import TYPE_CHECKING
 
+from pydantic import BaseModel
+
+from airflow.api_fastapi.execution_api.datamodels import taskinstance as 
ti_datamodel  # noqa: TC001
 from airflow.utils.state import TaskInstanceState
 
 if TYPE_CHECKING:
-    from airflow.models.taskinstance import SimpleTaskInstance
+    from airflow.typing_compat import Self
 
 
-class CallbackRequest:
+class CallbackRequest(BaseModel):
     """
     Base Class with information about the callback to be executed.
 
-    :param full_filepath: File Path to use to run the callback
     :param msg: Additional Message that can be used for logging
     :param processor_subdir: Directory used by Dag Processor when parsed the 
dag.
     """
 
-    def __init__(
-        self,
-        full_filepath: str,
-        processor_subdir: str | None = None,
-        msg: str | None = None,
-    ):
-        self.full_filepath = full_filepath
-        self.processor_subdir = processor_subdir
-        self.msg = msg
-
-    def __eq__(self, other):
-        if isinstance(other, self.__class__):
-            return self.__dict__ == other.__dict__
-        return NotImplemented
+    full_filepath: str
+    """File Path to use to run the callback"""
+    processor_subdir: str | None = None
+    """Directory used by Dag Processor when parsed the dag"""
+    msg: str | None = None
+    """Additional Message that can be used for logging to determine 
failure/zombie"""
 
-    def __repr__(self):
-        return str(self.__dict__)
-
-    def to_json(self) -> str:
-        return json.dumps(self.__dict__)
+    to_json = BaseModel.model_dump_json

Review Comment:
   Should this instead be the following to indicate that it is bound to an 
instance? 
   
   ```py
       def to_json(self, **kwargs) -> str:
           return self.model_dump_json(**kwargs)
   ```
   
   otherwise :
   
   ```py
   In [5]: CallbackRequest.to_json()
   ---------------------------------------------------------------------------
   TypeError                                 Traceback (most recent call last)
   Cell In[5], line 1
   ----> 1 CallbackRequest.to_json()
   
   TypeError: model_dump_json() missing 1 required positional argument: 'self'
   ```



##########
airflow/models/dagcode.py:
##########
@@ -170,18 +169,18 @@ def get_latest_dagcode(cls, dag_id: str, session: Session 
= NEW_SESSION) -> DagC
 
     @classmethod
     @provide_session
-    def update_source_code(cls, dag: DAG, session: Session = NEW_SESSION) -> 
None:
+    def update_source_code(cls, dag_id: str, fileloc: str, session: Session = 
NEW_SESSION) -> None:

Review Comment:
   Need to update docstrings to match new args



##########
airflow/dag_processing/collection.py:
##########
@@ -192,7 +193,7 @@ def _serialize_dag_capturing_errors(dag: DAG, session: 
Session, processor_subdir
             _sync_dag_perms(dag, session=session)
         else:
             # Check and update DagCode
-            DagCode.update_source_code(dag)
+            DagCode.update_source_code(dag.dag_id, dag.fileloc)

Review Comment:
   Should we add Protocol for `LazyDeserializedDAG` so auto-completion works?
   
   <img width="576" alt="image" 
src="https://github.com/user-attachments/assets/fd2363ba-4249-4183-9fc2-bcb2c1c98175";
 />
   



##########
airflow/dag_processing/manager.py:
##########
@@ -298,74 +301,58 @@ class DagFileProcessorManager(LoggingMixin):
     :param signal_conn: connection to communicate signal with processor agent.
     """
 
-    def __init__(
-        self,
-        dag_directory: os.PathLike[str],
-        max_runs: int,
-        processor_timeout: timedelta,
-        signal_conn: MultiprocessingConnection | None = None,
-    ):
-        super().__init__()
-        # known files; this will be updated every `dag_dir_list_interval` and 
stuff added/removed accordingly
-        self._file_paths: list[str] = []
-        self._file_path_queue: deque[str] = deque()
-        self._max_runs = max_runs
-        # signal_conn is None for dag_processor_standalone mode.
-        self._direct_scheduler_conn = signal_conn
-        self._parsing_start_time: float | None = None
-        self._dag_directory = dag_directory
-        # Set the signal conn in to non-blocking mode, so that attempting to
-        # send when the buffer is full errors, rather than hangs for-ever
-        # attempting to send (this is to avoid deadlocks!)
-        if self._direct_scheduler_conn:
+    _dag_directory: os.PathLike[str]
+    max_runs: int
+    processor_timeout: float = attrs.field(factory=_config_int_factory("core", 
"dag_file_processor_timeout"))
+    selector: selectors.BaseSelector = 
attrs.field(factory=selectors.DefaultSelector)
+    _direct_scheduler_conn: MultiprocessingConnection | None = 
attrs.field(alias="signal_conn", default=None)
+
+    _parallelism: int = attrs.field(factory=_config_int_factory("scheduler", 
"parsing_processes"))
+
+    dag_dir_list_interval: int = attrs.field(
+        factory=_config_int_factory("scheduler", "dag_dir_list_interval")
+    )
+    parsing_cleanup_interval: float = attrs.field(
+        factory=_config_int_factory("scheduler", "parsing_cleanup_interval")
+    )
+    _file_process_interval: float = attrs.field(
+        factory=_config_int_factory("scheduler", "min_file_process_interval")
+    )
+    stale_dag_threshold: float = 
attrs.field(factory=_config_int_factory("scheduler", "stale_dag_threshold"))
+    last_dag_dir_refresh_time: float = attrs.field(default=0, init=False)
+
+    log: logging.Logger = log
+
+    _last_deactivate_stale_dags_time: float = attrs.field(default=0, 
init=False)
+    print_stats_interval: float = attrs.field(
+        factory=_config_int_factory("scheduler", "print_stats_interval")
+    )
+    last_stat_print_time: float = attrs.field(default=0, init=False)
+
+    _file_paths: list[str] = attrs.field(factory=list, init=False)
+    _file_path_queue: deque[str] = attrs.field(factory=deque, init=False)
+    _file_stats: dict[str, DagFileStat] = attrs.field(factory=lambda: 
defaultdict(DagFileStat), init=False)
+
+    _processors: dict[str, DagFileProcessorProcess] = 
attrs.field(factory=dict, init=False)
+
+    _parsing_start_time: float = attrs.field(init=False)
+    _num_run: int = attrs.field(default=0, init=False)
+
+    _callback_to_execute: dict[str, list[CallbackRequest]] = attrs.field(
+        factory=lambda: defaultdict(list), init=True
+    )

Review Comment:
   ```suggestion
       _file_paths: list[str] = attrs.field(factory=list, init=False)
       _file_path_queue: deque[str] = attrs.field(factory=deque, init=False)
       _file_stats: dict[str, DagFileStat] = attrs.field(factory=lambda: 
defaultdict(DagFileStat), init=False)
   
       _processors: dict[str, DagFileProcessorProcess] = 
attrs.field(factory=dict, init=False)
   
       _parsing_start_time: float = attrs.field(init=False)
       _num_run: int = attrs.field(default=0, init=False)
   
       _callback_to_execute: dict[str, list[CallbackRequest]] = attrs.field(
           factory=lambda: defaultdict(list), init=False
       )
   ```
   
   I think



##########
airflow/dag_processing/manager.py:
##########
@@ -177,18 +178,17 @@ def _run_processor_manager(
         # Make this process start as a new process group - that makes it easy
         # to kill all sub-process of this at the OS-level, rather than having
         # to iterate the child processes
+
         set_new_process_group()
-        span = Trace.get_current_span()
-        span.set_attribute("dag_directory", str(dag_directory))
         setproctitle("airflow scheduler -- DagFileProcessorManager")
         reload_configuration_for_dag_processing()
         processor_manager = DagFileProcessorManager(
             dag_directory=dag_directory,
             max_runs=max_runs,
-            processor_timeout=processor_timeout,
+            processor_timeout=processor_timeout.total_seconds(),
             signal_conn=signal_conn,
         )
-        processor_manager.start()
+        processor_manager.run()

Review Comment:
   We will also need to change:
   
   
https://github.com/apache/airflow/blob/8848d6e35f6ead2cc1ed9887af8360fbd61c78c3/airflow/jobs/dag_processor_job_runner.py#L62



##########
airflow/dag_processing/manager.py:
##########
@@ -298,74 +301,58 @@ class DagFileProcessorManager(LoggingMixin):
     :param signal_conn: connection to communicate signal with processor agent.

Review Comment:
   This can be nuked



##########
airflow/dag_processing/manager.py:
##########
@@ -298,74 +301,58 @@ class DagFileProcessorManager(LoggingMixin):
     :param signal_conn: connection to communicate signal with processor agent.
     """
 
-    def __init__(
-        self,
-        dag_directory: os.PathLike[str],
-        max_runs: int,
-        processor_timeout: timedelta,
-        signal_conn: MultiprocessingConnection | None = None,
-    ):
-        super().__init__()
-        # known files; this will be updated every `dag_dir_list_interval` and 
stuff added/removed accordingly
-        self._file_paths: list[str] = []
-        self._file_path_queue: deque[str] = deque()
-        self._max_runs = max_runs
-        # signal_conn is None for dag_processor_standalone mode.
-        self._direct_scheduler_conn = signal_conn
-        self._parsing_start_time: float | None = None
-        self._dag_directory = dag_directory
-        # Set the signal conn in to non-blocking mode, so that attempting to
-        # send when the buffer is full errors, rather than hangs for-ever
-        # attempting to send (this is to avoid deadlocks!)
-        if self._direct_scheduler_conn:
+    _dag_directory: os.PathLike[str]
+    max_runs: int
+    processor_timeout: float = attrs.field(factory=_config_int_factory("core", 
"dag_file_processor_timeout"))
+    selector: selectors.BaseSelector = 
attrs.field(factory=selectors.DefaultSelector)
+    _direct_scheduler_conn: MultiprocessingConnection | None = 
attrs.field(alias="signal_conn", default=None)
+
+    _parallelism: int = attrs.field(factory=_config_int_factory("scheduler", 
"parsing_processes"))
+
+    dag_dir_list_interval: int = attrs.field(
+        factory=_config_int_factory("scheduler", "dag_dir_list_interval")
+    )
+    parsing_cleanup_interval: float = attrs.field(
+        factory=_config_int_factory("scheduler", "parsing_cleanup_interval")
+    )
+    _file_process_interval: float = attrs.field(
+        factory=_config_int_factory("scheduler", "min_file_process_interval")
+    )
+    stale_dag_threshold: float = 
attrs.field(factory=_config_int_factory("scheduler", "stale_dag_threshold"))
+    last_dag_dir_refresh_time: float = attrs.field(default=0, init=False)
+
+    log: logging.Logger = log

Review Comment:
   init=False?



##########
airflow/dag_processing/manager.py:
##########
@@ -298,74 +301,58 @@ class DagFileProcessorManager(LoggingMixin):
     :param signal_conn: connection to communicate signal with processor agent.
     """
 
-    def __init__(
-        self,
-        dag_directory: os.PathLike[str],
-        max_runs: int,
-        processor_timeout: timedelta,
-        signal_conn: MultiprocessingConnection | None = None,
-    ):
-        super().__init__()
-        # known files; this will be updated every `dag_dir_list_interval` and 
stuff added/removed accordingly
-        self._file_paths: list[str] = []
-        self._file_path_queue: deque[str] = deque()
-        self._max_runs = max_runs
-        # signal_conn is None for dag_processor_standalone mode.
-        self._direct_scheduler_conn = signal_conn
-        self._parsing_start_time: float | None = None
-        self._dag_directory = dag_directory
-        # Set the signal conn in to non-blocking mode, so that attempting to
-        # send when the buffer is full errors, rather than hangs for-ever
-        # attempting to send (this is to avoid deadlocks!)
-        if self._direct_scheduler_conn:
+    _dag_directory: os.PathLike[str]
+    max_runs: int
+    processor_timeout: float = attrs.field(factory=_config_int_factory("core", 
"dag_file_processor_timeout"))
+    selector: selectors.BaseSelector = 
attrs.field(factory=selectors.DefaultSelector)
+    _direct_scheduler_conn: MultiprocessingConnection | None = 
attrs.field(alias="signal_conn", default=None)

Review Comment:
   Why do we need alias? Should we instead just use `signal_conn` as name?



##########
airflow/dag_processing/manager.py:
##########
@@ -298,74 +301,58 @@ class DagFileProcessorManager(LoggingMixin):
     :param signal_conn: connection to communicate signal with processor agent.
     """
 
-    def __init__(
-        self,
-        dag_directory: os.PathLike[str],
-        max_runs: int,
-        processor_timeout: timedelta,
-        signal_conn: MultiprocessingConnection | None = None,
-    ):
-        super().__init__()
-        # known files; this will be updated every `dag_dir_list_interval` and 
stuff added/removed accordingly
-        self._file_paths: list[str] = []
-        self._file_path_queue: deque[str] = deque()
-        self._max_runs = max_runs
-        # signal_conn is None for dag_processor_standalone mode.
-        self._direct_scheduler_conn = signal_conn
-        self._parsing_start_time: float | None = None
-        self._dag_directory = dag_directory
-        # Set the signal conn in to non-blocking mode, so that attempting to
-        # send when the buffer is full errors, rather than hangs for-ever
-        # attempting to send (this is to avoid deadlocks!)
-        if self._direct_scheduler_conn:
+    _dag_directory: os.PathLike[str]
+    max_runs: int
+    processor_timeout: float = attrs.field(factory=_config_int_factory("core", 
"dag_file_processor_timeout"))
+    selector: selectors.BaseSelector = 
attrs.field(factory=selectors.DefaultSelector)
+    _direct_scheduler_conn: MultiprocessingConnection | None = 
attrs.field(alias="signal_conn", default=None)

Review Comment:
   I am guessing you are using it so the code within this explain is easier to 
understand what that connection is -- but it is straightforward during init



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to