This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2f16a46db5 Refactor: Simplify code in dag_processing (#33161)
2f16a46db5 is described below
commit 2f16a46db51b67e67b8406fbafe9e9add6eee235
Author: Miroslav Šedivý <[email protected]>
AuthorDate: Wed Aug 9 18:49:28 2023 +0000
Refactor: Simplify code in dag_processing (#33161)
---
airflow/dag_processing/manager.py | 8 ++++----
airflow/dag_processing/processor.py | 12 ++++--------
2 files changed, 8 insertions(+), 12 deletions(-)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index 7e44e89816..dd252db9bd 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -1001,7 +1001,7 @@ class DagFileProcessorManager(LoggingMixin):
processor.terminate()
self._file_stats.pop(file_path)
- to_remove = set(self._file_stats.keys()) - set(self._file_paths)
+ to_remove = set(self._file_stats).difference(self._file_paths)
for key in to_remove:
# Remove the stats for any dag files that don't exist anymore
del self._file_stats[key]
@@ -1076,7 +1076,7 @@ class DagFileProcessorManager(LoggingMixin):
while self._parallelism - len(self._processors) > 0 and
self._file_path_queue:
file_path = self._file_path_queue.popleft()
# Stop creating duplicate processor i.e. processor with the same
filepath
- if file_path in self._processors.keys():
+ if file_path in self._processors:
continue
callback_to_execute_for_file = self._callback_to_execute[file_path]
@@ -1115,7 +1115,7 @@ class DagFileProcessorManager(LoggingMixin):
self._parsing_start_time = time.perf_counter()
# If the file path is already being processed, or if a file was
# processed recently, wait until the next batch
- file_paths_in_progress = self._processors.keys()
+ file_paths_in_progress = set(self._processors)
now = timezone.utcnow()
# Sort the file paths by the parsing order mode
@@ -1173,7 +1173,7 @@ class DagFileProcessorManager(LoggingMixin):
file_path for file_path, stat in self._file_stats.items() if
stat.run_count == self._max_runs
]
- file_paths_to_exclude = set(file_paths_in_progress).union(
+ file_paths_to_exclude = file_paths_in_progress.union(
file_paths_recently_processed,
files_paths_at_run_limit,
)
diff --git a/airflow/dag_processing/processor.py
b/airflow/dag_processing/processor.py
index 369f676878..8b0ecb3640 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -573,9 +573,9 @@ class DagFileProcessor(LoggingMixin):
for task in tasks_missed_sla:
if task.email:
if isinstance(task.email, str):
- emails |= set(get_email_address_list(task.email))
+ emails.update(get_email_address_list(task.email))
elif isinstance(task.email, (list, tuple)):
- emails |= set(task.email)
+ emails.update(task.email)
if emails:
try:
send_email(emails, f"[airflow] SLA miss on
DAG={dag.dag_id}", email_content)
@@ -652,9 +652,7 @@ class DagFileProcessor(LoggingMixin):
task_pools = {task.pool for task in dag.tasks}
nonexistent_pools = task_pools - pools
if nonexistent_pools:
- return (
- f"Dag '{dag.dag_id}' references non-existent pools:
{list(sorted(nonexistent_pools))!r}"
- )
+ return f"Dag '{dag.dag_id}' references non-existent pools:
{sorted(nonexistent_pools)!r}"
pools = {p.pool for p in Pool.get_pools(session)}
for dag in dagbag.dags.values():
@@ -679,9 +677,7 @@ class DagFileProcessor(LoggingMixin):
"""
self._validate_task_pools(dagbag=dagbag)
- stored_warnings = set(
-
session.query(DagWarning).filter(DagWarning.dag_id.in_(dagbag.dags.keys())).all()
- )
+ stored_warnings =
set(session.query(DagWarning).filter(DagWarning.dag_id.in_(dagbag.dags)).all())
for warning_to_delete in stored_warnings - self.dag_warnings:
session.delete(warning_to_delete)