This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ca4cd3b2ec Refactor unneeded 'continue' jumps in utils (#33836)
ca4cd3b2ec is described below
commit ca4cd3b2eceaaaa870dd3d3911217e0ed2060e2f
Author: Miroslav Šedivý <[email protected]>
AuthorDate: Fri Sep 1 18:10:26 2023 +0000
Refactor unneeded 'continue' jumps in utils (#33836)
---
airflow/utils/dag_edges.py | 7 +++----
airflow/utils/db.py | 4 +---
airflow/utils/db_cleanup.py | 24 ++++++++++++------------
airflow/utils/email.py | 23 +++++++++++------------
airflow/utils/file.py | 23 ++++++++---------------
airflow/utils/log/file_task_handler.py | 15 +++++++--------
airflow/utils/log/secrets_masker.py | 5 ++---
7 files changed, 44 insertions(+), 57 deletions(-)
diff --git a/airflow/utils/dag_edges.py b/airflow/utils/dag_edges.py
index f7d0f7e7d8..bd1ad268ae 100644
--- a/airflow/utils/dag_edges.py
+++ b/airflow/utils/dag_edges.py
@@ -115,10 +115,9 @@ def dag_edges(dag: DAG):
edge = (task.task_id, child.task_id)
if task.is_setup and child.is_teardown:
setup_teardown_edges.add(edge)
- if edge in edges:
- continue
- edges.add(edge)
- tasks_to_trace_next.append(child)
+ if edge not in edges:
+ edges.add(edge)
+ tasks_to_trace_next.append(child)
tasks_to_trace = tasks_to_trace_next
result = []
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index db4631f148..16b9d6595c 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -1446,9 +1446,7 @@ def check_bad_references(session: Session) ->
Iterable[str]:
dangling_table_name =
_format_airflow_moved_table_name(source_table.name, change_version, "dangling")
if dangling_table_name in existing_table_names:
invalid_row_count = bad_rows_query.count()
- if invalid_row_count <= 0:
- continue
- else:
+ if invalid_row_count:
yield _format_dangling_error(
source_table=source_table.name,
target_table=dangling_table_name,
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index 0f7eb8064c..b246eb8c40 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -435,19 +435,19 @@ def run_cleanup(
_confirm_delete(date=clean_before_timestamp,
tables=sorted(effective_table_names))
existing_tables = reflect_tables(tables=None, session=session).tables
for table_name, table_config in effective_config_dict.items():
- if table_name not in existing_tables:
+ if table_name in existing_tables:
+ with _suppress_with_logging(table_name, session):
+ _cleanup_table(
+ clean_before_timestamp=clean_before_timestamp,
+ dry_run=dry_run,
+ verbose=verbose,
+ **table_config.__dict__,
+ skip_archive=skip_archive,
+ session=session,
+ )
+ session.commit()
+ else:
logger.warning("Table %s not found. Skipping.", table_name)
- continue
- with _suppress_with_logging(table_name, session):
- _cleanup_table(
- clean_before_timestamp=clean_before_timestamp,
- dry_run=dry_run,
- verbose=verbose,
- **table_config.__dict__,
- skip_archive=skip_archive,
- session=session,
- )
- session.commit()
@provide_session
diff --git a/airflow/utils/email.py b/airflow/utils/email.py
index 2957e5e1d3..659c3bdb0f 100644
--- a/airflow/utils/email.py
+++ b/airflow/utils/email.py
@@ -271,18 +271,17 @@ def send_mime_email(
try:
smtp_conn = _get_smtp_connection(smtp_host, smtp_port,
smtp_timeout, smtp_ssl)
except smtplib.SMTPServerDisconnected:
- if attempt < smtp_retry_limit:
- continue
- raise
-
- if smtp_starttls:
- smtp_conn.starttls()
- if smtp_user and smtp_password:
- smtp_conn.login(smtp_user, smtp_password)
- log.info("Sent an alert email to %s", e_to)
- smtp_conn.sendmail(e_from, e_to, mime_msg.as_string())
- smtp_conn.quit()
- break
+ if attempt == smtp_retry_limit:
+ raise
+ else:
+ if smtp_starttls:
+ smtp_conn.starttls()
+ if smtp_user and smtp_password:
+ smtp_conn.login(smtp_user, smtp_password)
+ log.info("Sent an alert email to %s", e_to)
+ smtp_conn.sendmail(e_from, e_to, mime_msg.as_string())
+ smtp_conn.quit()
+ break
def get_email_address_list(addresses: str | Iterable[str]) -> list[str]:
diff --git a/airflow/utils/file.py b/airflow/utils/file.py
index 9178fa4af5..f885e57b37 100644
--- a/airflow/utils/file.py
+++ b/airflow/utils/file.py
@@ -244,12 +244,10 @@ def _find_path_from_directory(
patterns_by_dir.update({dirpath: patterns.copy()})
for file in files:
- if file == ignore_file_name:
- continue
- abs_file_path = Path(root) / file
- if ignore_rule_type.match(abs_file_path, patterns):
- continue
- yield str(abs_file_path)
+ if file != ignore_file_name:
+ abs_file_path = Path(root) / file
+ if not ignore_rule_type.match(abs_file_path, patterns):
+ yield str(abs_file_path)
def find_path_from_directory(
@@ -310,16 +308,11 @@ def find_dag_file_paths(directory: str |
os.PathLike[str], safe_mode: bool) -> l
file_paths = []
for file_path in find_path_from_directory(directory, ".airflowignore"):
+ path = Path(file_path)
try:
- if not os.path.isfile(file_path):
- continue
- _, file_ext = os.path.splitext(os.path.split(file_path)[-1])
- if file_ext != ".py" and not zipfile.is_zipfile(file_path):
- continue
- if not might_contain_dag(file_path, safe_mode):
- continue
-
- file_paths.append(file_path)
+ if path.is_file() and (path.suffix == ".py" or
zipfile.is_zipfile(path)):
+ if might_contain_dag(file_path, safe_mode):
+ file_paths.append(file_path)
except Exception:
log.exception("Error while examining %s", file_path)
diff --git a/airflow/utils/log/file_task_handler.py
b/airflow/utils/log/file_task_handler.py
index 2530b8fdb5..25b664074e 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -109,14 +109,13 @@ def _parse_timestamps_in_log_file(lines: Iterable[str]):
timestamp = None
next_timestamp = None
for idx, line in enumerate(lines):
- if not line:
- continue
- with suppress(Exception):
- # next_timestamp unchanged if line can't be parsed
- next_timestamp = _parse_timestamp(line)
- if next_timestamp:
- timestamp = next_timestamp
- yield timestamp, idx, line
+ if line:
+ with suppress(Exception):
+ # next_timestamp unchanged if line can't be parsed
+ next_timestamp = _parse_timestamp(line)
+ if next_timestamp:
+ timestamp = next_timestamp
+ yield timestamp, idx, line
def _interleave_logs(*logs):
diff --git a/airflow/utils/log/secrets_masker.py
b/airflow/utils/log/secrets_masker.py
index 735d6e03a7..246377c169 100644
--- a/airflow/utils/log/secrets_masker.py
+++ b/airflow/utils/log/secrets_masker.py
@@ -206,9 +206,8 @@ class SecretsMasker(logging.Filter):
if self.replacer:
for k, v in record.__dict__.items():
- if k in self._record_attrs_to_ignore:
- continue
- record.__dict__[k] = self.redact(v)
+ if k not in self._record_attrs_to_ignore:
+ record.__dict__[k] = self.redact(v)
if record.exc_info and record.exc_info[1] is not None:
exc = record.exc_info[1]
self._redact_exception_with_context(exc)