This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-7-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 4e00fe8ec01c9a2bbfea02ac7841d42eb45ea7f5 Author: Miroslav Šedivý <[email protected]> AuthorDate: Wed Aug 9 18:49:28 2023 +0000 Refactor: Simplify code in dag_processing (#33161) (cherry picked from commit 2f16a46db51b67e67b8406fbafe9e9add6eee235) --- airflow/dag_processing/manager.py | 8 ++++---- airflow/dag_processing/processor.py | 12 ++++-------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 7e44e89816..dd252db9bd 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -1001,7 +1001,7 @@ class DagFileProcessorManager(LoggingMixin): processor.terminate() self._file_stats.pop(file_path) - to_remove = set(self._file_stats.keys()) - set(self._file_paths) + to_remove = set(self._file_stats).difference(self._file_paths) for key in to_remove: # Remove the stats for any dag files that don't exist anymore del self._file_stats[key] @@ -1076,7 +1076,7 @@ class DagFileProcessorManager(LoggingMixin): while self._parallelism - len(self._processors) > 0 and self._file_path_queue: file_path = self._file_path_queue.popleft() # Stop creating duplicate processor i.e. processor with the same filepath - if file_path in self._processors.keys(): + if file_path in self._processors: continue callback_to_execute_for_file = self._callback_to_execute[file_path] @@ -1115,7 +1115,7 @@ class DagFileProcessorManager(LoggingMixin): self._parsing_start_time = time.perf_counter() # If the file path is already being processed, or if a file was # processed recently, wait until the next batch - file_paths_in_progress = self._processors.keys() + file_paths_in_progress = set(self._processors) now = timezone.utcnow() # Sort the file paths by the parsing order mode @@ -1173,7 +1173,7 @@ class DagFileProcessorManager(LoggingMixin): file_path for file_path, stat in self._file_stats.items() if stat.run_count == self._max_runs ] - file_paths_to_exclude = set(file_paths_in_progress).union( + file_paths_to_exclude = file_paths_in_progress.union( file_paths_recently_processed, files_paths_at_run_limit, ) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 369f676878..8b0ecb3640 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -573,9 +573,9 @@ class DagFileProcessor(LoggingMixin): for task in tasks_missed_sla: if task.email: if isinstance(task.email, str): - emails |= set(get_email_address_list(task.email)) + emails.update(get_email_address_list(task.email)) elif isinstance(task.email, (list, tuple)): - emails |= set(task.email) + emails.update(task.email) if emails: try: send_email(emails, f"[airflow] SLA miss on DAG={dag.dag_id}", email_content) @@ -652,9 +652,7 @@ class DagFileProcessor(LoggingMixin): task_pools = {task.pool for task in dag.tasks} nonexistent_pools = task_pools - pools if nonexistent_pools: - return ( - f"Dag '{dag.dag_id}' references non-existent pools: {list(sorted(nonexistent_pools))!r}" - ) + return f"Dag '{dag.dag_id}' references non-existent pools: {sorted(nonexistent_pools)!r}" pools = {p.pool for p in Pool.get_pools(session)} for dag in dagbag.dags.values(): @@ -679,9 +677,7 @@ class DagFileProcessor(LoggingMixin): """ self._validate_task_pools(dagbag=dagbag) - stored_warnings = set( - session.query(DagWarning).filter(DagWarning.dag_id.in_(dagbag.dags.keys())).all() - ) + stored_warnings = set(session.query(DagWarning).filter(DagWarning.dag_id.in_(dagbag.dags)).all()) for warning_to_delete in stored_warnings - self.dag_warnings: session.delete(warning_to_delete)
