amoghrajesh commented on code in PR #44972:
URL: https://github.com/apache/airflow/pull/44972#discussion_r1888126918
##########
airflow/dag_processing/manager.py:
##########
@@ -145,7 +141,7 @@ def start(self) -> None:
context = self._get_multiprocessing_context()
self._last_parsing_stat_received_at = time.monotonic()
- self._parent_signal_conn, child_signal_conn = context.Pipe()
+ parent_signal_conn, child_signal_conn = context.Pipe()
Review Comment:
Can we not initialise to `self._parent_signal_conn` directly?
##########
airflow/dag_processing/manager.py:
##########
@@ -298,74 +301,58 @@ class DagFileProcessorManager(LoggingMixin):
:param signal_conn: connection to communicate signal with processor agent.
"""
- def __init__(
- self,
- dag_directory: os.PathLike[str],
- max_runs: int,
- processor_timeout: timedelta,
- signal_conn: MultiprocessingConnection | None = None,
- ):
- super().__init__()
- # known files; this will be updated every `dag_dir_list_interval` and
stuff added/removed accordingly
- self._file_paths: list[str] = []
- self._file_path_queue: deque[str] = deque()
- self._max_runs = max_runs
- # signal_conn is None for dag_processor_standalone mode.
- self._direct_scheduler_conn = signal_conn
- self._parsing_start_time: float | None = None
- self._dag_directory = dag_directory
- # Set the signal conn in to non-blocking mode, so that attempting to
- # send when the buffer is full errors, rather than hangs for-ever
- # attempting to send (this is to avoid deadlocks!)
- if self._direct_scheduler_conn:
+ _dag_directory: os.PathLike[str]
+ max_runs: int
+ processor_timeout: float = attrs.field(factory=_config_int_factory("core",
"dag_file_processor_timeout"))
+ selector: selectors.BaseSelector =
attrs.field(factory=selectors.DefaultSelector)
+ _direct_scheduler_conn: MultiprocessingConnection | None =
attrs.field(alias="signal_conn", default=None)
+
+ _parallelism: int = attrs.field(factory=_config_int_factory("scheduler",
"parsing_processes"))
+
+ dag_dir_list_interval: int = attrs.field(
+ factory=_config_int_factory("scheduler", "dag_dir_list_interval")
+ )
+ parsing_cleanup_interval: float = attrs.field(
+ factory=_config_int_factory("scheduler", "parsing_cleanup_interval")
+ )
+ _file_process_interval: float = attrs.field(
+ factory=_config_int_factory("scheduler", "min_file_process_interval")
+ )
+ stale_dag_threshold: float =
attrs.field(factory=_config_int_factory("scheduler", "stale_dag_threshold"))
+ last_dag_dir_refresh_time: float = attrs.field(default=0, init=False)
+
+ log: logging.Logger = log
+
+ _last_deactivate_stale_dags_time: float = attrs.field(default=0,
init=False)
+ print_stats_interval: float = attrs.field(
+ factory=_config_int_factory("scheduler", "print_stats_interval")
+ )
+ last_stat_print_time: float = attrs.field(default=0, init=False)
+
+ _file_paths: list[str] = attrs.field(factory=list, init=False)
+ _file_path_queue: deque[str] = attrs.field(factory=deque, init=False)
+ _file_stats: dict[str, DagFileStat] = attrs.field(factory=lambda:
defaultdict(DagFileStat), init=False)
+
+ _processors: dict[str, DagFileProcessorProcess] =
attrs.field(factory=dict, init=False)
+
+ _parsing_start_time: float = attrs.field(init=False)
+ _num_run: int = attrs.field(default=0, init=False)
+
+ _callback_to_execute: dict[str, list[CallbackRequest]] = attrs.field(
+ factory=lambda: defaultdict(list), init=True
+ )
Review Comment:
Yeah it will probably show up in the `repr` otherwise
##########
airflow/dag_processing/manager.py:
##########
@@ -262,10 +258,8 @@ def terminate(self):
"""Send termination signal to DAG parsing processor manager to
terminate all DAG file processors."""
if self._process and self._process.is_alive():
self.log.info("Sending termination message to manager.")
- try:
-
self._parent_signal_conn.send(DagParsingSignal.TERMINATE_MANAGER)
- except ConnectionError:
- pass
+ self._parent_signal_conn.send(None)
+ self._parent_signal_conn.close()
Review Comment:
Do we not need the try block anymore?
##########
airflow/dag_processing/collection.py:
##########
@@ -393,19 +394,26 @@ def update_dags(
dm.is_active = True
dm.has_import_errors = False
dm.last_parsed_time = utcnow()
- dm.default_view = dag.default_view
if hasattr(dag, "_dag_display_property_value"):
dm._dag_display_property_value =
dag._dag_display_property_value
elif dag.dag_display_name != dag.dag_id:
dm._dag_display_property_value = dag.dag_display_name
dm.description = dag.description
- dm.max_active_tasks = dag.max_active_tasks
- dm.max_active_runs = dag.max_active_runs
- dm.max_consecutive_failed_dag_runs =
dag.max_consecutive_failed_dag_runs
- dm.has_task_concurrency_limits = any(
- t.max_active_tis_per_dag is not None or
t.max_active_tis_per_dagrun is not None
- for t in dag.tasks
- )
+ # TODO: this `if is not None` is maybe not the best. It's convient
though
+ if dag.max_active_tasks is not None:
+ dm.max_active_tasks = dag.max_active_tasks
Review Comment:
```
max_active_tasks = Column(Integer, nullable=False)
```
We shouldn't need the check right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]