This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-7-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4e00fe8ec01c9a2bbfea02ac7841d42eb45ea7f5
Author: Miroslav Šedivý <[email protected]>
AuthorDate: Wed Aug 9 18:49:28 2023 +0000

    Refactor: Simplify code in dag_processing (#33161)
    
    (cherry picked from commit 2f16a46db51b67e67b8406fbafe9e9add6eee235)
---
 airflow/dag_processing/manager.py   |  8 ++++----
 airflow/dag_processing/processor.py | 12 ++++--------
 2 files changed, 8 insertions(+), 12 deletions(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 7e44e89816..dd252db9bd 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -1001,7 +1001,7 @@ class DagFileProcessorManager(LoggingMixin):
                 processor.terminate()
                 self._file_stats.pop(file_path)
 
-        to_remove = set(self._file_stats.keys()) - set(self._file_paths)
+        to_remove = set(self._file_stats).difference(self._file_paths)
         for key in to_remove:
             # Remove the stats for any dag files that don't exist anymore
             del self._file_stats[key]
@@ -1076,7 +1076,7 @@ class DagFileProcessorManager(LoggingMixin):
         while self._parallelism - len(self._processors) > 0 and 
self._file_path_queue:
             file_path = self._file_path_queue.popleft()
             # Stop creating duplicate processor i.e. processor with the same 
filepath
-            if file_path in self._processors.keys():
+            if file_path in self._processors:
                 continue
 
             callback_to_execute_for_file = self._callback_to_execute[file_path]
@@ -1115,7 +1115,7 @@ class DagFileProcessorManager(LoggingMixin):
         self._parsing_start_time = time.perf_counter()
         # If the file path is already being processed, or if a file was
         # processed recently, wait until the next batch
-        file_paths_in_progress = self._processors.keys()
+        file_paths_in_progress = set(self._processors)
         now = timezone.utcnow()
 
         # Sort the file paths by the parsing order mode
@@ -1173,7 +1173,7 @@ class DagFileProcessorManager(LoggingMixin):
             file_path for file_path, stat in self._file_stats.items() if 
stat.run_count == self._max_runs
         ]
 
-        file_paths_to_exclude = set(file_paths_in_progress).union(
+        file_paths_to_exclude = file_paths_in_progress.union(
             file_paths_recently_processed,
             files_paths_at_run_limit,
         )
diff --git a/airflow/dag_processing/processor.py 
b/airflow/dag_processing/processor.py
index 369f676878..8b0ecb3640 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -573,9 +573,9 @@ class DagFileProcessor(LoggingMixin):
             for task in tasks_missed_sla:
                 if task.email:
                     if isinstance(task.email, str):
-                        emails |= set(get_email_address_list(task.email))
+                        emails.update(get_email_address_list(task.email))
                     elif isinstance(task.email, (list, tuple)):
-                        emails |= set(task.email)
+                        emails.update(task.email)
             if emails:
                 try:
                     send_email(emails, f"[airflow] SLA miss on 
DAG={dag.dag_id}", email_content)
@@ -652,9 +652,7 @@ class DagFileProcessor(LoggingMixin):
             task_pools = {task.pool for task in dag.tasks}
             nonexistent_pools = task_pools - pools
             if nonexistent_pools:
-                return (
-                    f"Dag '{dag.dag_id}' references non-existent pools: 
{list(sorted(nonexistent_pools))!r}"
-                )
+                return f"Dag '{dag.dag_id}' references non-existent pools: 
{sorted(nonexistent_pools)!r}"
 
         pools = {p.pool for p in Pool.get_pools(session)}
         for dag in dagbag.dags.values():
@@ -679,9 +677,7 @@ class DagFileProcessor(LoggingMixin):
         """
         self._validate_task_pools(dagbag=dagbag)
 
-        stored_warnings = set(
-            
session.query(DagWarning).filter(DagWarning.dag_id.in_(dagbag.dags.keys())).all()
-        )
+        stored_warnings = 
set(session.query(DagWarning).filter(DagWarning.dag_id.in_(dagbag.dags)).all())
 
         for warning_to_delete in stored_warnings - self.dag_warnings:
             session.delete(warning_to_delete)

Reply via email to