This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ca4cd3b2ec Refactor unneeded 'continue' jumps in utils (#33836)
ca4cd3b2ec is described below

commit ca4cd3b2eceaaaa870dd3d3911217e0ed2060e2f
Author: Miroslav Šedivý <[email protected]>
AuthorDate: Fri Sep 1 18:10:26 2023 +0000

    Refactor unneeded 'continue' jumps in utils (#33836)
---
 airflow/utils/dag_edges.py             |  7 +++----
 airflow/utils/db.py                    |  4 +---
 airflow/utils/db_cleanup.py            | 24 ++++++++++++------------
 airflow/utils/email.py                 | 23 +++++++++++------------
 airflow/utils/file.py                  | 23 ++++++++---------------
 airflow/utils/log/file_task_handler.py | 15 +++++++--------
 airflow/utils/log/secrets_masker.py    |  5 ++---
 7 files changed, 44 insertions(+), 57 deletions(-)

diff --git a/airflow/utils/dag_edges.py b/airflow/utils/dag_edges.py
index f7d0f7e7d8..bd1ad268ae 100644
--- a/airflow/utils/dag_edges.py
+++ b/airflow/utils/dag_edges.py
@@ -115,10 +115,9 @@ def dag_edges(dag: DAG):
                 edge = (task.task_id, child.task_id)
                 if task.is_setup and child.is_teardown:
                     setup_teardown_edges.add(edge)
-                if edge in edges:
-                    continue
-                edges.add(edge)
-                tasks_to_trace_next.append(child)
+                if edge not in edges:
+                    edges.add(edge)
+                    tasks_to_trace_next.append(child)
         tasks_to_trace = tasks_to_trace_next
 
     result = []
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index db4631f148..16b9d6595c 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -1446,9 +1446,7 @@ def check_bad_references(session: Session) -> 
Iterable[str]:
         dangling_table_name = 
_format_airflow_moved_table_name(source_table.name, change_version, "dangling")
         if dangling_table_name in existing_table_names:
             invalid_row_count = bad_rows_query.count()
-            if invalid_row_count <= 0:
-                continue
-            else:
+            if invalid_row_count:
                 yield _format_dangling_error(
                     source_table=source_table.name,
                     target_table=dangling_table_name,
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index 0f7eb8064c..b246eb8c40 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -435,19 +435,19 @@ def run_cleanup(
         _confirm_delete(date=clean_before_timestamp, 
tables=sorted(effective_table_names))
     existing_tables = reflect_tables(tables=None, session=session).tables
     for table_name, table_config in effective_config_dict.items():
-        if table_name not in existing_tables:
+        if table_name in existing_tables:
+            with _suppress_with_logging(table_name, session):
+                _cleanup_table(
+                    clean_before_timestamp=clean_before_timestamp,
+                    dry_run=dry_run,
+                    verbose=verbose,
+                    **table_config.__dict__,
+                    skip_archive=skip_archive,
+                    session=session,
+                )
+                session.commit()
+        else:
             logger.warning("Table %s not found.  Skipping.", table_name)
-            continue
-        with _suppress_with_logging(table_name, session):
-            _cleanup_table(
-                clean_before_timestamp=clean_before_timestamp,
-                dry_run=dry_run,
-                verbose=verbose,
-                **table_config.__dict__,
-                skip_archive=skip_archive,
-                session=session,
-            )
-            session.commit()
 
 
 @provide_session
diff --git a/airflow/utils/email.py b/airflow/utils/email.py
index 2957e5e1d3..659c3bdb0f 100644
--- a/airflow/utils/email.py
+++ b/airflow/utils/email.py
@@ -271,18 +271,17 @@ def send_mime_email(
             try:
                 smtp_conn = _get_smtp_connection(smtp_host, smtp_port, 
smtp_timeout, smtp_ssl)
             except smtplib.SMTPServerDisconnected:
-                if attempt < smtp_retry_limit:
-                    continue
-                raise
-
-            if smtp_starttls:
-                smtp_conn.starttls()
-            if smtp_user and smtp_password:
-                smtp_conn.login(smtp_user, smtp_password)
-            log.info("Sent an alert email to %s", e_to)
-            smtp_conn.sendmail(e_from, e_to, mime_msg.as_string())
-            smtp_conn.quit()
-            break
+                if attempt == smtp_retry_limit:
+                    raise
+            else:
+                if smtp_starttls:
+                    smtp_conn.starttls()
+                if smtp_user and smtp_password:
+                    smtp_conn.login(smtp_user, smtp_password)
+                log.info("Sent an alert email to %s", e_to)
+                smtp_conn.sendmail(e_from, e_to, mime_msg.as_string())
+                smtp_conn.quit()
+                break
 
 
 def get_email_address_list(addresses: str | Iterable[str]) -> list[str]:
diff --git a/airflow/utils/file.py b/airflow/utils/file.py
index 9178fa4af5..f885e57b37 100644
--- a/airflow/utils/file.py
+++ b/airflow/utils/file.py
@@ -244,12 +244,10 @@ def _find_path_from_directory(
             patterns_by_dir.update({dirpath: patterns.copy()})
 
         for file in files:
-            if file == ignore_file_name:
-                continue
-            abs_file_path = Path(root) / file
-            if ignore_rule_type.match(abs_file_path, patterns):
-                continue
-            yield str(abs_file_path)
+            if file != ignore_file_name:
+                abs_file_path = Path(root) / file
+                if not ignore_rule_type.match(abs_file_path, patterns):
+                    yield str(abs_file_path)
 
 
 def find_path_from_directory(
@@ -310,16 +308,11 @@ def find_dag_file_paths(directory: str | 
os.PathLike[str], safe_mode: bool) -> l
     file_paths = []
 
     for file_path in find_path_from_directory(directory, ".airflowignore"):
+        path = Path(file_path)
         try:
-            if not os.path.isfile(file_path):
-                continue
-            _, file_ext = os.path.splitext(os.path.split(file_path)[-1])
-            if file_ext != ".py" and not zipfile.is_zipfile(file_path):
-                continue
-            if not might_contain_dag(file_path, safe_mode):
-                continue
-
-            file_paths.append(file_path)
+            if path.is_file() and (path.suffix == ".py" or 
zipfile.is_zipfile(path)):
+                if might_contain_dag(file_path, safe_mode):
+                    file_paths.append(file_path)
         except Exception:
             log.exception("Error while examining %s", file_path)
 
diff --git a/airflow/utils/log/file_task_handler.py 
b/airflow/utils/log/file_task_handler.py
index 2530b8fdb5..25b664074e 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -109,14 +109,13 @@ def _parse_timestamps_in_log_file(lines: Iterable[str]):
     timestamp = None
     next_timestamp = None
     for idx, line in enumerate(lines):
-        if not line:
-            continue
-        with suppress(Exception):
-            # next_timestamp unchanged if line can't be parsed
-            next_timestamp = _parse_timestamp(line)
-        if next_timestamp:
-            timestamp = next_timestamp
-        yield timestamp, idx, line
+        if line:
+            with suppress(Exception):
+                # next_timestamp unchanged if line can't be parsed
+                next_timestamp = _parse_timestamp(line)
+            if next_timestamp:
+                timestamp = next_timestamp
+            yield timestamp, idx, line
 
 
 def _interleave_logs(*logs):
diff --git a/airflow/utils/log/secrets_masker.py 
b/airflow/utils/log/secrets_masker.py
index 735d6e03a7..246377c169 100644
--- a/airflow/utils/log/secrets_masker.py
+++ b/airflow/utils/log/secrets_masker.py
@@ -206,9 +206,8 @@ class SecretsMasker(logging.Filter):
 
         if self.replacer:
             for k, v in record.__dict__.items():
-                if k in self._record_attrs_to_ignore:
-                    continue
-                record.__dict__[k] = self.redact(v)
+                if k not in self._record_attrs_to_ignore:
+                    record.__dict__[k] = self.redact(v)
             if record.exc_info and record.exc_info[1] is not None:
                 exc = record.exc_info[1]
                 self._redact_exception_with_context(exc)

Reply via email to