This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9d0006fb9e D205 Support - Utils (#32591)
9d0006fb9e is described below

commit 9d0006fb9ee9c1b4742613fa3c1e82064eda6ba8
Author: D. Ferruzzi <[email protected]>
AuthorDate: Mon Jul 17 16:06:56 2023 -0700

    D205 Support - Utils (#32591)
    
    Co-author: shubham22
    
    Co-authored-by: Niko Oliveira <[email protected]>
---
 airflow/utils/cli.py                          | 18 +++++-------
 airflow/utils/cli_action_loggers.py           |  6 ++--
 airflow/utils/configuration.py                |  3 +-
 airflow/utils/dag_cycle_tester.py             |  5 ++--
 airflow/utils/db.py                           | 24 +++++++--------
 airflow/utils/db_cleanup.py                   |  5 +++-
 airflow/utils/dot_renderer.py                 |  1 +
 airflow/utils/edgemodifier.py                 | 21 ++++++--------
 airflow/utils/email.py                        |  7 +++--
 airflow/utils/file.py                         | 22 +++++++-------
 airflow/utils/hashlib_wrapper.py              |  6 ++--
 airflow/utils/helpers.py                      | 16 +++-------
 airflow/utils/json.py                         |  6 +---
 airflow/utils/log/colored_log.py              |  7 +++--
 airflow/utils/log/file_processor_handler.py   | 12 ++++----
 airflow/utils/log/file_task_handler.py        | 16 +++++-----
 airflow/utils/log/logging_mixin.py            | 25 ++++++++--------
 airflow/utils/log/non_caching_file_handler.py | 26 +++++++++--------
 airflow/utils/log/trigger_handler.py          |  3 +-
 airflow/utils/mixins.py                       |  5 ++--
 airflow/utils/module_loading.py               |  5 ++--
 airflow/utils/net.py                          |  9 +++---
 airflow/utils/operator_helpers.py             | 10 ++++---
 airflow/utils/operator_resources.py           | 10 +++----
 airflow/utils/platform.py                     |  8 ++---
 airflow/utils/process_utils.py                |  1 +
 airflow/utils/session.py                      |  1 +
 airflow/utils/sqlalchemy.py                   | 38 ++++++++++++------------
 airflow/utils/state.py                        |  5 +---
 airflow/utils/task_group.py                   | 42 +++++++++++----------------
 airflow/utils/timezone.py                     |  1 +
 airflow/utils/types.py                        |  5 +---
 32 files changed, 176 insertions(+), 193 deletions(-)

diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py
index c800ed71fc..354c2b236c 100644
--- a/airflow/utils/cli.py
+++ b/airflow/utils/cli.py
@@ -61,9 +61,10 @@ def _check_cli_args(args):
 def action_cli(func=None, check_db=True):
     def action_logging(f: T) -> T:
         """
-        Decorates function to execute function at the same time submitting 
action_logging
-        but in CLI context. It will call action logger callbacks twice,
-        one for pre-execution and the other one for post-execution.
+        Decorates function to execute function at the same time submitting 
action_logging but in CLI context.
+
+        It will call action logger callbacks twice, one for
+        pre-execution and the other one for post-execution.
 
         Action logger will be called with below keyword parameters:
             sub_command : name of sub-command
@@ -84,8 +85,7 @@ def action_cli(func=None, check_db=True):
         @functools.wraps(f)
         def wrapper(*args, **kwargs):
             """
-            An wrapper for cli functions. It assumes to have Namespace instance
-            at 1st positional argument.
+            A wrapper for cli functions; assumes Namespace instance as first 
positional argument.
 
             :param args: Positional argument. It assumes to have Namespace 
instance
                 at 1st positional argument
@@ -126,7 +126,8 @@ def action_cli(func=None, check_db=True):
 
 def _build_metrics(func_name, namespace):
     """
-    Builds metrics dict from function args
+    Builds metrics dict from function args.
+
     It assumes that function arguments is from airflow.bin.cli module's 
function
     and has Namespace instance where it optionally contains "dag_id", 
"task_id",
     and "execution_date".
@@ -359,10 +360,7 @@ def should_ignore_depends_on_past(args) -> bool:
 
 
 def suppress_logs_and_warning(f: T) -> T:
-    """
-    Decorator to suppress logging and warning messages
-    in cli functions.
-    """
+    """Decorator to suppress logging and warning messages in cli functions."""
 
     @functools.wraps(f)
     def _wrapper(*args, **kwargs):
diff --git a/airflow/utils/cli_action_loggers.py 
b/airflow/utils/cli_action_loggers.py
index 02c311d40a..575f4bfb08 100644
--- a/airflow/utils/cli_action_loggers.py
+++ b/airflow/utils/cli_action_loggers.py
@@ -16,8 +16,10 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-An Action Logger module. Singleton pattern has been applied into this module
-so that registered callbacks can be used all through the same python process.
+An Action Logger module.
+
+Singleton pattern has been applied into this module so that registered
+callbacks can be used all through the same python process.
 """
 from __future__ import annotations
 
diff --git a/airflow/utils/configuration.py b/airflow/utils/configuration.py
index 98fb6388e6..84c5e946ee 100644
--- a/airflow/utils/configuration.py
+++ b/airflow/utils/configuration.py
@@ -27,8 +27,7 @@ from airflow.utils.platform import IS_WINDOWS
 
 def tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True):
     """
-    Returns a path for a temporary file including a full copy of the 
configuration
-    settings.
+    Returns a path for a temporary file including a full copy of the 
configuration settings.
 
     :param include_env: Should the value of configuration from ``AIRFLOW__``
         environment variables be included or not
diff --git a/airflow/utils/dag_cycle_tester.py 
b/airflow/utils/dag_cycle_tester.py
index 8f150dc0a3..4cea52200d 100644
--- a/airflow/utils/dag_cycle_tester.py
+++ b/airflow/utils/dag_cycle_tester.py
@@ -33,8 +33,9 @@ CYCLE_DONE = 2
 def test_cycle(dag: DAG) -> None:
     """
     A wrapper function of `check_cycle` for backward compatibility purpose.
-    New code should use `check_cycle` instead since this function name 
`test_cycle` starts with 'test_' and
-    will be considered as a unit test by pytest, resulting in failure.
+
+    New code should use `check_cycle` instead since this function name 
`test_cycle` starts
+    with 'test_' and will be considered as a unit test by pytest, resulting in 
failure.
     """
     from warnings import warn
 
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index eb4c863dc3..f4c2ae8b1b 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -1006,10 +1006,10 @@ def check_username_duplicates(session: Session) -> 
Iterable[str]:
 
 def reflect_tables(tables: list[Base | str] | None, session):
     """
-    When running checks prior to upgrades, we use reflection to determine 
current state of the
-    database.
-    This function gets the current state of each table in the set of models 
provided and returns
-    a SqlAlchemy metadata object containing them.
+    When running checks prior to upgrades, we use reflection to determine 
current state of the database.
+
+    This function gets the current state of each table in the set of models
+    provided and returns a SqlAlchemy metadata object containing them.
     """
     import sqlalchemy.schema
 
@@ -1180,6 +1180,7 @@ def _create_table_as(
 ):
     """
     Create a new table with rows from query.
+
     We have to handle CTAS differently for different dialects.
     """
     from sqlalchemy import column, select, table
@@ -1256,10 +1257,7 @@ def _move_dangling_data_to_new_table(
 
 
 def _dangling_against_dag_run(session, source_table, dag_run):
-    """
-    Given a source table, we generate a subquery that will return 1 for every 
row that
-    has a dagrun.
-    """
+    """Given a source table, we generate a subquery that will return 1 for 
every row that has a dagrun."""
     source_to_dag_run_join_cond = and_(
         source_table.c.dag_id == dag_run.c.dag_id,
         source_table.c.execution_date == dag_run.c.execution_date,
@@ -1274,8 +1272,7 @@ def _dangling_against_dag_run(session, source_table, 
dag_run):
 
 def _dangling_against_task_instance(session, source_table, dag_run, 
task_instance):
     """
-    Given a source table, we generate a subquery that will return 1 for every 
row that
-    has a valid task instance (and associated dagrun).
+    Given a source table, generate a subquery that will return 1 for every row 
that has a valid task instance.
 
     This is used to identify rows that need to be removed from tables prior to 
adding a TI fk.
 
@@ -1367,8 +1364,8 @@ def _move_duplicate_data_to_new_table(
 
 def check_bad_references(session: Session) -> Iterable[str]:
     """
-    Starting in Airflow 2.2, we began a process of replacing `execution_date` 
with `run_id`
-    in many tables.
+    Starting in Airflow 2.2, we began a process of replacing `execution_date` 
with `run_id` in many tables.
+
     Here we go through each table and look for records that can't be mapped to 
a dag run.
     When we find such "dangling" rows we back them up in a special table and 
delete them
     from the main table.
@@ -1383,6 +1380,8 @@ def check_bad_references(session: Session) -> 
Iterable[str]:
     @dataclass
     class BadReferenceConfig:
         """
+        Bad reference config class.
+
         :param bad_rows_func: function that returns subquery which determines 
whether bad rows exist
         :param join_tables: table objects referenced in subquery
         :param ref_table: information-only identifier for categorizing the 
missing ref
@@ -1552,6 +1551,7 @@ def upgradedb(
     session: Session = NEW_SESSION,
 ):
     """
+    Upgrades the DB.
 
     :param to_revision: Optional Alembic revision ID to upgrade *to*.
         If omitted, upgrades to latest revision.
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index 22fb0020e7..36bced9473 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -15,7 +15,9 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-This module took inspiration from the community maintenance dag
+This module took inspiration from the community maintenance dag.
+
+See:
 
(https://github.com/teamclairvoyant/airflow-maintenance-dags/blob/4e5c7682a808082561d60cbc9cafaa477b0d8c65/db-cleanup/airflow-db-cleanup.py).
 """
 from __future__ import annotations
@@ -341,6 +343,7 @@ def _print_config(*, configs: dict[str, _TableConfig]):
 def _suppress_with_logging(table, session):
     """
     Suppresses errors but logs them.
+
     Also stores the exception instance so it can be referred to after exiting 
context.
     """
     try:
diff --git a/airflow/utils/dot_renderer.py b/airflow/utils/dot_renderer.py
index d3b329cb54..f168566f7e 100644
--- a/airflow/utils/dot_renderer.py
+++ b/airflow/utils/dot_renderer.py
@@ -38,6 +38,7 @@ from airflow.utils.task_group import TaskGroup
 def _refine_color(color: str):
     """
     Converts color in #RGB (12 bits) format to #RRGGBB (32 bits), if it 
possible.
+
     Otherwise, it returns the original value. Graphviz does not support colors 
in #RGB format.
 
     :param color: Text representation of color
diff --git a/airflow/utils/edgemodifier.py b/airflow/utils/edgemodifier.py
index c6d8065c45..14aa3ef25e 100644
--- a/airflow/utils/edgemodifier.py
+++ b/airflow/utils/edgemodifier.py
@@ -24,8 +24,9 @@ from airflow.utils.task_group import TaskGroup
 
 class EdgeModifier(DependencyMixin):
     """
-    Class that represents edge information to be added between two
-    tasks/operators. Has shorthand factory functions, like Label("hooray").
+    Class that represents edge information to be added between two 
tasks/operators.
+
+    Has shorthand factory functions, like Label("hooray").
 
     Current implementation supports
         t1 >> Label("Success route") >> t2
@@ -77,8 +78,9 @@ class EdgeModifier(DependencyMixin):
 
     def _convert_streams_to_task_groups(self):
         """
-        Both self._upstream and self._downstream are required to determine if
-        we should convert a node to a TaskGroup or leave it as a DAGNode.
+        Convert a node to a TaskGroup or leave it as a DAGNode.
+
+        Requires both self._upstream and self._downstream.
 
         To do this, we keep a set of group_ids seen among the streams. If we 
find that
         the nodes are from the same TaskGroup, we will leave them as DAGNodes 
and not
@@ -121,8 +123,7 @@ class EdgeModifier(DependencyMixin):
         edge_modifier: EdgeModifier | None = None,
     ):
         """
-        Sets the given task/list onto the upstream attribute, and then checks 
if
-        we have both sides so we can resolve the relationship.
+        Set the given task/list onto the upstream attribute, then attempt to 
resolve the relationship.
 
         Providing this also provides << via DependencyMixin.
         """
@@ -139,8 +140,7 @@ class EdgeModifier(DependencyMixin):
         edge_modifier: EdgeModifier | None = None,
     ):
         """
-        Sets the given task/list onto the downstream attribute, and then 
checks if
-        we have both sides so we can resolve the relationship.
+        Set the given task/list onto the downstream attribute, then attempt to 
resolve the relationship.
 
         Providing this also provides >> via DependencyMixin.
         """
@@ -154,10 +154,7 @@ class EdgeModifier(DependencyMixin):
     def update_relative(
         self, other: DependencyMixin, upstream: bool = True, edge_modifier: 
EdgeModifier | None = None
     ) -> None:
-        """
-        Called if we're not the "main" side of a relationship; we still run the
-        same logic, though.
-        """
+        """Called if we're not the "main" side of a relationship; we still run 
the same logic, though."""
         if upstream:
             self.set_upstream(other)
         else:
diff --git a/airflow/utils/email.py b/airflow/utils/email.py
index e807b8f75d..37e028fbd9 100644
--- a/airflow/utils/email.py
+++ b/airflow/utils/email.py
@@ -321,9 +321,10 @@ def _get_smtp_connection(host: str, port: int, timeout: 
int, with_ssl: bool) ->
 
 def _get_email_list_from_str(addresses: str) -> list[str]:
     """
-    Extract a list of email addresses from a string. The string
-    can contain multiple email addresses separated by
-    any of the following delimiters: ',' or ';'.
+    Extract a list of email addresses from a string.
+
+    The string can contain multiple email addresses separated
+    by any of the following delimiters: ',' or ';'.
 
     :param addresses: A string containing one or more email addresses.
     :return: A list of email addresses.
diff --git a/airflow/utils/file.py b/airflow/utils/file.py
index 08b4048ad5..fb17ee6a8d 100644
--- a/airflow/utils/file.py
+++ b/airflow/utils/file.py
@@ -41,8 +41,9 @@ class _IgnoreRule(Protocol):
     @staticmethod
     def compile(pattern: str, base_dir: Path, definition_file: Path) -> 
_IgnoreRule | None:
         """
-        Build an ignore rule from the supplied pattern where base_dir
-        and definition_file should be absolute paths.
+        Build an ignore rule from the supplied pattern.
+
+        ``base_dir`` and ``definition_file`` should be absolute paths.
         """
 
     @staticmethod
@@ -134,8 +135,9 @@ def TemporaryDirectory(*args, **kwargs):
 
 def mkdirs(path, mode):
     """
-    Creates the directory specified by path, creating intermediate directories
-    as necessary. If directory already exists, this is a no-op.
+    Creates the directory specified by path, creating intermediate directories 
as necessary.
+
+    If directory already exists, this is a no-op.
 
     :param path: The directory to create
     :param mode: The mode to give to the directory e.g. 0o755, ignores umask
@@ -164,10 +166,7 @@ def correct_maybe_zipped(fileloc: str | Path) -> str | 
Path:
 
 
 def correct_maybe_zipped(fileloc: None | str | Path) -> None | str | Path:
-    """
-    If the path contains a folder with a .zip suffix, then
-    the folder is treated as a zip archive and path to zip is returned.
-    """
+    """If the path contains a folder with a .zip suffix, treat it as a zip 
archive and return path."""
     if not fileloc:
         return fileloc
     search_ = ZIP_REGEX.search(str(fileloc))
@@ -182,8 +181,10 @@ def correct_maybe_zipped(fileloc: None | str | Path) -> 
None | str | Path:
 
 def open_maybe_zipped(fileloc, mode="r"):
     """
-    Opens the given file. If the path contains a folder with a .zip suffix, 
then
-    the folder is treated as a zip archive, opening the file inside the 
archive.
+    Opens the given file.
+
+    If the path contains a folder with a .zip suffix, then the folder
+    is treated as a zip archive, opening the file inside the archive.
 
     :return: a file object, as in `open`, or as in `ZipFile.open`.
     """
@@ -332,6 +333,7 @@ COMMENT_PATTERN = re2.compile(r"\s*#.*")
 def might_contain_dag(file_path: str, safe_mode: bool, zip_file: 
zipfile.ZipFile | None = None) -> bool:
     """
     Check whether a Python file contains Airflow DAGs.
+
     When safe_mode is off (with False value), this function always returns 
True.
 
     If might_contain_dag_callable isn't specified, it uses airflow default 
heuristic
diff --git a/airflow/utils/hashlib_wrapper.py b/airflow/utils/hashlib_wrapper.py
index 25f02e5c3a..09850c565c 100644
--- a/airflow/utils/hashlib_wrapper.py
+++ b/airflow/utils/hashlib_wrapper.py
@@ -28,11 +28,9 @@ from airflow import PY39
 
 def md5(__string: ReadableBuffer = b"") -> hashlib._Hash:
     """
-    Safely allows calling the hashlib.md5 function with the "usedforsecurity" 
disabled
-    when specified in the configuration.
+    Safely allows calling the hashlib.md5 function when "usedforsecurity" is 
disabled in the configuration.
 
-    :param string: The data to hash.
-        Default to empty str byte.
+    :param string: The data to hash. Default to empty str byte.
     :return: The hashed value.
     """
     if PY39:
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index a29cf07a59..b900ca1033 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -120,10 +120,7 @@ def is_container(obj: Any) -> bool:
 
 
 def as_tuple(obj: Any) -> tuple:
-    """
-    If obj is a container, returns obj as a tuple.
-    Otherwise, returns a tuple containing obj.
-    """
+    """Return obj as a tuple if obj is a container, otherwise return a tuple 
containing obj."""
     if is_container(obj):
         return tuple(obj)
     else:
@@ -139,10 +136,7 @@ def chunks(items: list[T], chunk_size: int) -> 
Generator[list[T], None, None]:
 
 
 def reduce_in_chunks(fn: Callable[[S, list[T]], S], iterable: list[T], 
initializer: S, chunk_size: int = 0):
-    """
-    Reduce the given list of items by splitting it into chunks
-    of the given size and passing each chunk through the reducer.
-    """
+    """Split the list of items into chunks of a given size and pass each chunk 
through the reducer."""
     if len(iterable) == 0:
         return initializer
     if chunk_size == 0:
@@ -172,8 +166,7 @@ def parse_template_string(template_string: str) -> 
tuple[str | None, jinja2.Temp
 
 def render_log_filename(ti: TaskInstance, try_number, filename_template) -> 
str:
     """
-    Given task instance, try_number, filename_template, return the rendered log
-    filename.
+    Given task instance, try_number, filename_template, return the rendered 
log filename.
 
     :param ti: task instance
     :param try_number: try_number of the task
@@ -327,8 +320,7 @@ def at_most_one(*args) -> bool:
 
 def prune_dict(val: Any, mode="strict"):
     """
-    Given dict ``val``, returns new dict based on ``val`` with all
-    empty elements removed.
+    Given dict ``val``, returns new dict based on ``val`` with all empty 
elements removed.
 
     What constitutes "empty" is controlled by the ``mode`` parameter.  If mode 
is 'strict'
     then only ``None`` elements will be removed.  If mode is ``truthy``, then 
element ``x``
diff --git a/airflow/utils/json.py b/airflow/utils/json.py
index 3a0a70fb75..7f05c8778d 100644
--- a/airflow/utils/json.py
+++ b/airflow/utils/json.py
@@ -105,11 +105,7 @@ class XComEncoder(json.JSONEncoder):
 
 
 class XComDecoder(json.JSONDecoder):
-    """
-    This decoder deserializes dicts to objects if they contain
-    the `__classname__` key otherwise it will return the dict
-    as is.
-    """
+    """Deserialize dicts to objects if they contain the `__classname__` key, 
otherwise return the dict."""
 
     def __init__(self, *args, **kwargs) -> None:
         if not kwargs.get("object_hook"):
diff --git a/airflow/utils/log/colored_log.py b/airflow/utils/log/colored_log.py
index aea7c255e6..3afa39410d 100644
--- a/airflow/utils/log/colored_log.py
+++ b/airflow/utils/log/colored_log.py
@@ -42,9 +42,10 @@ BOLD_OFF = esc("22")
 
 class CustomTTYColoredFormatter(TTYColoredFormatter, TimezoneAware):
     """
-    Custom log formatter which extends `colored.TTYColoredFormatter`
-    by adding attributes to message arguments and coloring error
-    traceback.
+    Custom log formatter.
+
+    Extends `colored.TTYColoredFormatter` by adding attributes
+    to message arguments and coloring error traceback.
     """
 
     def __init__(self, *args, **kwargs):
diff --git a/airflow/utils/log/file_processor_handler.py 
b/airflow/utils/log/file_processor_handler.py
index 79efb387a3..cda24cdd06 100644
--- a/airflow/utils/log/file_processor_handler.py
+++ b/airflow/utils/log/file_processor_handler.py
@@ -30,9 +30,10 @@ from airflow.utils.log.non_caching_file_handler import 
NonCachingFileHandler
 
 class FileProcessorHandler(logging.Handler):
     """
-    FileProcessorHandler is a python log handler that handles
-    dag processor logs. It creates and delegates log handling
-    to `logging.FileHandler` after receiving dag processor context.
+    FileProcessorHandler is a python log handler that handles dag processor 
logs.
+
+    It creates and delegates log handling to `logging.FileHandler`
+    after receiving dag processor context.
 
     :param base_log_folder: Base log folder to place logs.
     :param filename_template: template filename string
@@ -108,8 +109,9 @@ class FileProcessorHandler(logging.Handler):
 
     def _symlink_latest_log_directory(self):
         """
-        Create symbolic link to the current day's log directory to
-        allow easy access to the latest scheduler log files.
+        Create symbolic link to the current day's log directory.
+
+        Allows easy access to the latest scheduler log files.
 
         :return: None
         """
diff --git a/airflow/utils/log/file_task_handler.py 
b/airflow/utils/log/file_task_handler.py
index 9184a20420..5d791aaa0c 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -134,10 +134,10 @@ def _interleave_logs(*logs):
 
 class FileTaskHandler(logging.Handler):
     """
-    FileTaskHandler is a python log handler that handles and reads
-    task instance logs. It creates and delegates log handling
-    to `logging.FileHandler` after receiving task instance context.
-    It reads logs from task instance's host machine.
+    FileTaskHandler is a python log handler that handles and reads task 
instance logs.
+
+    It creates and delegates log handling to `logging.FileHandler` after 
receiving task
+    instance context.  It reads logs from task instance's host machine.
 
     :param base_log_folder: Base log folder to place logs.
     :param filename_template: template filename string
@@ -278,8 +278,7 @@ class FileTaskHandler(logging.Handler):
         metadata: dict[str, Any] | None = None,
     ):
         """
-        Template method that contains custom logic of reading
-        logs given the try_number.
+        Template method that contains custom logic of reading logs given the 
try_number.
 
         :param ti: task instance record
         :param try_number: current try_number to read log from
@@ -462,8 +461,9 @@ class FileTaskHandler(logging.Handler):
 
     def _init_file(self, ti):
         """
-        Create log directory and give it permissions that are configured. See 
above _prepare_log_folder
-        method for more detailed explanation.
+        Create log directory and give it permissions that are configured.
+
+        See above _prepare_log_folder method for more detailed explanation.
 
         :param ti: task instance object
         :return: relative log path of the given task instance
diff --git a/airflow/utils/log/logging_mixin.py 
b/airflow/utils/log/logging_mixin.py
index 646c73f1b4..97e1a29ff6 100644
--- a/airflow/utils/log/logging_mixin.py
+++ b/airflow/utils/log/logging_mixin.py
@@ -54,10 +54,7 @@ def __getattr__(name):
 
 
 def remove_escape_codes(text: str) -> str:
-    """
-    Remove ANSI escapes codes from string. It's used to remove
-    "colors" from log messages.
-    """
+    """Remove ANSI escapes codes from string; used to remove "colors" from log 
messages."""
     return ANSI_ESCAPE.sub("", text)
 
 
@@ -118,15 +115,15 @@ class ExternalLoggingMixin:
 # IO generics (and apparently it has not even been intended)
 # See more: https://giters.com/python/typeshed/issues/6077
 class StreamLogWriter(IOBase, IO[str]):  # type: ignore[misc]
-    """Allows to redirect stdout and stderr to logger."""
+    """
+    Allows to redirect stdout and stderr to logger.
+
+    :param log: The log level method to write to, ie. log.debug, log.warning
+    """
 
     encoding: None = None
 
     def __init__(self, logger, level):
-        """
-        :param log: The log level method to write to, ie. log.debug, 
log.warning
-        :return:
-        """
         self.logger = logger
         self.level = level
         self._buffer = ""
@@ -141,8 +138,9 @@ class StreamLogWriter(IOBase, IO[str]):  # type: 
ignore[misc]
     @property
     def closed(self):
         """
-        Returns False to indicate that the stream is not closed, as it will be
-        open for the duration of Airflow's lifecycle.
+        Return False to indicate that the stream is not closed.
+
+        Streams will be open for the duration of Airflow's lifecycle.
 
         For compatibility with the io.IOBase interface.
         """
@@ -174,6 +172,7 @@ class StreamLogWriter(IOBase, IO[str]):  # type: 
ignore[misc]
     def isatty(self):
         """
         Returns False to indicate the fd is not connected to a tty(-like) 
device.
+
         For compatibility reasons.
         """
         return False
@@ -181,8 +180,10 @@ class StreamLogWriter(IOBase, IO[str]):  # type: 
ignore[misc]
 
 class RedirectStdHandler(StreamHandler):
     """
+    Custom StreamHandler that uses current sys.stderr/stdout as the stream for 
logging.
+
     This class is like a StreamHandler using sys.stderr/stdout, but uses
-    whatever sys.stderr/stderr is currently set to rather than the value of
+    whatever sys.stderr/stdout is currently set to rather than the value of
     sys.stderr/stdout at handler construction time, except when running a
     task in a kubernetes executor pod.
     """
diff --git a/airflow/utils/log/non_caching_file_handler.py 
b/airflow/utils/log/non_caching_file_handler.py
index 46fe69a99b..aa0ca9864e 100644
--- a/airflow/utils/log/non_caching_file_handler.py
+++ b/airflow/utils/log/non_caching_file_handler.py
@@ -36,12 +36,13 @@ def make_file_io_non_caching(io: IO[str]) -> IO[str]:
 
 class NonCachingFileHandler(FileHandler):
     """
-    This is an extension of the python FileHandler that advises the Kernel to 
not cache the file
-    in PageCache when it is written. While there is nothing wrong with such 
cache (it will be cleaned
-    when memory is needed), it causes ever-growing memory usage when scheduler 
is running as it keeps
-    on writing new log files and the files are not rotated later on. This 
might lead to confusion
-    for our users, who are monitoring memory usage of Scheduler - without 
realising that it is
-    harmless and expected in this case.
+    An extension of FileHandler, advises the Kernel to not cache the file in 
PageCache when it is written.
+
+    While there is nothing wrong with such cache (it will be cleaned when 
memory is needed), it
+    causes ever-growing memory usage when scheduler is running as it keeps on 
writing new log
+    files and the files are not rotated later on. This might lead to confusion 
for our users,
+    who are monitoring memory usage of Scheduler - without realising that it 
is harmless and
+    expected in this case.
 
     See https://github.com/apache/airflow/issues/14924
 
@@ -54,12 +55,13 @@ class NonCachingFileHandler(FileHandler):
 
 class NonCachingRotatingFileHandler(RotatingFileHandler):
     """
-    This is an extension of the python RotatingFileHandler that advises the 
Kernel to not cache the file
-    in PageCache when it is written. While there is nothing wrong with such 
cache (it will be cleaned
-    when memory is needed), it causes ever-growing memory usage when scheduler 
is running as it keeps
-    on writing new log files and the files are not rotated later on. This 
might lead to confusion
-    for our users, who are monitoring memory usage of Scheduler - without 
realising that it is
-    harmless and expected in this case.
+    An extension of RotatingFileHandler, advises the Kernel to not cache the 
file in PageCache when written.
+
+    While there is nothing wrong with such cache (it will be cleaned when 
memory is needed), it
+    causes ever-growing memory usage when scheduler is running as it keeps on 
writing new log
+    files and the files are not rotated later on. This might lead to confusion 
for our users,
+    who are monitoring memory usage of Scheduler - without realising that it 
is harmless and
+    expected in this case.
 
     See https://github.com/apache/airflow/issues/27065
 
diff --git a/airflow/utils/log/trigger_handler.py 
b/airflow/utils/log/trigger_handler.py
index 50756cb048..54f8ace65e 100644
--- a/airflow/utils/log/trigger_handler.py
+++ b/airflow/utils/log/trigger_handler.py
@@ -67,8 +67,7 @@ class DropTriggerLogsFilter(logging.Filter):
 
 class TriggererHandlerWrapper(logging.Handler):
     """
-    Wrap inheritors of FileTaskHandler and direct log messages
-    to them based on trigger_id.
+    Wrap inheritors of FileTaskHandler and direct log messages to them based 
on trigger_id.
 
     :meta private:
     """
diff --git a/airflow/utils/mixins.py b/airflow/utils/mixins.py
index d157c7f078..e3c6a8efec 100644
--- a/airflow/utils/mixins.py
+++ b/airflow/utils/mixins.py
@@ -35,8 +35,9 @@ class MultiprocessingStartMethodMixin:
 
     def _get_multiprocessing_start_method(self) -> str:
         """
-        Determine method of creating new processes by checking if the
-        mp_start_method is set in configs, else, it uses the OS default.
+        Determine method of creating new processes.
+
+        Checks if the mp_start_method is set in configs, else, it uses the OS 
default.
         """
         if conf.has_option("core", "mp_start_method"):
             return conf.get_mandatory_value("core", "mp_start_method")
diff --git a/airflow/utils/module_loading.py b/airflow/utils/module_loading.py
index 8decbc6a61..d81ab65e85 100644
--- a/airflow/utils/module_loading.py
+++ b/airflow/utils/module_loading.py
@@ -25,8 +25,9 @@ from typing import Callable
 
 def import_string(dotted_path: str):
     """
-    Import a dotted module path and return the attribute/class designated by 
the
-    last name in the path. Raise ImportError if the import failed.
+    Import a dotted module path and return the attribute/class designated by 
the last name in the path.
+
+    Raise ImportError if the import failed.
     """
     try:
         module_path, class_name = dotted_path.rsplit(".", 1)
diff --git a/airflow/utils/net.py b/airflow/utils/net.py
index 57bd9008b9..992aee67e8 100644
--- a/airflow/utils/net.py
+++ b/airflow/utils/net.py
@@ -26,7 +26,9 @@ from airflow.configuration import conf
 # patched version of socket.getfqdn() - see 
https://github.com/python/cpython/issues/49254
 @lru_cache(maxsize=None)
 def getfqdn(name=""):
-    """Get fully qualified domain name from name.
+    """
+    Get fully qualified domain name from name.
+
     An empty argument is interpreted as meaning the local host.
     """
     name = name.strip()
@@ -50,8 +52,5 @@ def get_host_ip_address():
 
 
 def get_hostname():
-    """
-    Fetch the hostname using the callable from the config or using
-    `airflow.utils.net.getfqdn` as a fallback.
-    """
+    """Fetch the hostname using the callable from config or use 
`airflow.utils.net.getfqdn` as a fallback."""
     return conf.getimport("core", "hostname_callable", 
fallback="airflow.utils.net.getfqdn")()
diff --git a/airflow/utils/operator_helpers.py 
b/airflow/utils/operator_helpers.py
index 0390edc81c..20f272f4f3 100644
--- a/airflow/utils/operator_helpers.py
+++ b/airflow/utils/operator_helpers.py
@@ -62,6 +62,8 @@ AIRFLOW_VAR_NAME_FORMAT_MAPPING = {
 
 def context_to_airflow_vars(context: Mapping[str, Any], in_env_var_format: 
bool = False) -> dict[str, str]:
     """
+    Return values used to externally reconstruct relations between dags, 
dag_runs, tasks and task_instances.
+
     Given a context, this function provides a dictionary of values that can be 
used to
     externally reconstruct relations between dags, dag_runs, tasks and 
task_instances.
     Default to abc.def.ghi format and can be made to ABC_DEF_GHI format if
@@ -185,12 +187,10 @@ def determine_kwargs(
     kwargs: Mapping[str, Any],
 ) -> Mapping[str, Any]:
     """
-    Inspect the signature of a given callable to determine which arguments in 
kwargs need
-    to be passed to the callable.
+    Inspect the signature of a callable to determine which kwargs need to be 
passed to the callable.
 
     :param func: The callable that you want to invoke
-    :param args: The positional arguments that needs to be passed to the 
callable, so we
-        know how many to skip.
+    :param args: The positional arguments that need to be passed to the 
callable, so we know how many to skip.
     :param kwargs: The keyword arguments that need to be filtered before 
passing to the callable.
     :return: A dictionary which contains the keyword arguments that are 
compatible with the callable.
     """
@@ -199,6 +199,8 @@ def determine_kwargs(
 
 def make_kwargs_callable(func: Callable[..., R]) -> Callable[..., R]:
     """
+    Creates a new callable that only forwards necessary arguments from any 
provided input.
+
     Make a new callable that can accept any number of positional or keyword 
arguments
     but only forwards those required by the given callable func.
     """
diff --git a/airflow/utils/operator_resources.py 
b/airflow/utils/operator_resources.py
index 638034a81a..cd6a225244 100644
--- a/airflow/utils/operator_resources.py
+++ b/airflow/utils/operator_resources.py
@@ -70,10 +70,7 @@ class Resource:
 
     @property
     def qty(self):
-        """
-        The number of units of the specified resource that are required for
-        execution of the operator.
-        """
+        """The number of units of the specified resource that are required for 
execution of the operator."""
         return self._qty
 
     def to_dict(self):
@@ -114,8 +111,9 @@ class GpuResource(Resource):
 
 class Resources:
     """
-    The resources required by an operator. Resources that are not specified 
will use the
-    default values from the airflow config.
+    The resources required by an operator.
+
+    Resources that are not specified will use the default values from the 
airflow config.
 
     :param cpus: The number of cpu cores that are required
     :param ram: The amount of RAM required
diff --git a/airflow/utils/platform.py b/airflow/utils/platform.py
index 691aac033b..72906683f6 100644
--- a/airflow/utils/platform.py
+++ b/airflow/utils/platform.py
@@ -32,10 +32,7 @@ log = logging.getLogger(__name__)
 
 
 def is_tty():
-    """
-    Checks if the standard output is connected (is associated with a terminal 
device) to a tty(-like)
-    device.
-    """
+    """Check if stdout is connected (is associated with a terminal device) to 
a tty(-like) device."""
     if not hasattr(sys.stdout, "isatty"):
         return False
     return sys.stdout.isatty()
@@ -69,8 +66,7 @@ def get_airflow_git_version():
 @cache
 def getuser() -> str:
     """
-    Gets the username associated with the current user, or error with a nice
-    error message if there's no current user.
+    Get the username of the current user, or error with a nice error message 
if there's no current user.
 
     We don't want to fall back to os.getuid() because not having a username
     probably means the rest of the user environment is wrong (e.g. no $HOME).
diff --git a/airflow/utils/process_utils.py b/airflow/utils/process_utils.py
index 663f0f496c..74ad50bf74 100644
--- a/airflow/utils/process_utils.py
+++ b/airflow/utils/process_utils.py
@@ -300,6 +300,7 @@ def patch_environ(new_env_variables: dict[str, str]) -> 
Generator[None, None, No
 def check_if_pidfile_process_is_running(pid_file: str, process_name: str):
     """
     Checks if a pidfile already exists and process is still running.
+
     If process is dead then pidfile is removed.
 
     :param pid_file: path to the pidfile
diff --git a/airflow/utils/session.py b/airflow/utils/session.py
index 0f8bdb7103..5c7e9eef50 100644
--- a/airflow/utils/session.py
+++ b/airflow/utils/session.py
@@ -61,6 +61,7 @@ def find_session_idx(func: Callable[PS, RT]) -> int:
 def provide_session(func: Callable[PS, RT]) -> Callable[PS, RT]:
     """
     Function decorator that provides a session if it isn't provided.
+
     If you want to reuse a session or run the function as part of a
     database transaction, you pass it to the function, if not this wrapper
     will create one and close it for you.
diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py
index 04af5bddc3..499aa31c34 100644
--- a/airflow/utils/sqlalchemy.py
+++ b/airflow/utils/sqlalchemy.py
@@ -50,8 +50,8 @@ using_mysql = conf.get_mandatory_value("database", 
"sql_alchemy_conn").lower().s
 
 class UtcDateTime(TypeDecorator):
     """
-    Almost equivalent to :class:`~sqlalchemy.types.TIMESTAMP` with
-    ``timezone=True`` option, but it differs from that by:
+    Similar to :class:`~sqlalchemy.types.TIMESTAMP` with ``timezone=True`` 
option, with some differences.
+
     - Never silently take naive :class:`~datetime.datetime`, instead it
       always raise :exc:`ValueError` unless time zone aware value.
     - :class:`~datetime.datetime` value's :attr:`~datetime.datetime.tzinfo`
@@ -86,11 +86,11 @@ class UtcDateTime(TypeDecorator):
 
     def process_result_value(self, value, dialect):
         """
-        Processes DateTimes from the DB making sure it is always
-        returning UTC. Not using timezone.convert_to_utc as that
-        converts to configured TIMEZONE while the DB might be
-        running with some other setting. We assume UTC datetimes
-        in the database.
+        Processes DateTimes from the DB making sure it is always returning UTC.
+
+        Not using timezone.convert_to_utc as that converts to configured 
TIMEZONE
+        while the DB might be running with some other setting. We assume UTC
+        datetimes in the database.
         """
         if value is not None:
             if value.tzinfo is None:
@@ -110,8 +110,9 @@ class UtcDateTime(TypeDecorator):
 
 class ExtendedJSON(TypeDecorator):
     """
-    A version of the JSON column that uses the Airflow extended JSON
-    serialization provided by airflow.serialization.
+    A version of the JSON column that uses the Airflow extended JSON 
serialization.
+
+    See airflow.serialization.
     """
 
     impl = Text
@@ -244,10 +245,11 @@ def ensure_pod_is_valid_after_unpickling(pod: V1Pod) -> 
V1Pod | None:
 
 class ExecutorConfigType(PickleType):
     """
-    Adds special handling for K8s executor config. If we unpickle a k8s object 
that was
-    pickled under an earlier k8s library version, then the unpickled object 
may throw an error
-    when to_dict is called.  To be more tolerant of version changes we convert 
to JSON using
-    Airflow's serializer before pickling.
+    Adds special handling for K8s executor config.
+
+    If we unpickle a k8s object that was pickled under an earlier k8s library 
version, then
+    the unpickled object may throw an error when to_dict is called.  To be 
more tolerant of
+    version changes we convert to JSON using Airflow's serializer before 
pickling.
     """
 
     cache_ok = True
@@ -293,11 +295,11 @@ class ExecutorConfigType(PickleType):
 
     def compare_values(self, x, y):
         """
-        The TaskInstance.executor_config attribute is a pickled object that 
may contain
-        kubernetes objects.  If the installed library version has changed 
since the
-        object was originally pickled, due to the underlying ``__eq__`` method 
on these
-        objects (which converts them to JSON), we may encounter attribute 
errors. In this
-        case we should replace the stored object.
+        The TaskInstance.executor_config attribute is a pickled object that 
may contain kubernetes objects.
+
+        If the installed library version has changed since the object was 
originally pickled,
+        due to the underlying ``__eq__`` method on these objects (which 
converts them to JSON),
+        we may encounter attribute errors. In this case we should replace the 
stored object.
 
         From https://github.com/apache/airflow/pull/24356 we use our 
serializer to store
         k8s objects, but there could still be raw pickled k8s objects in the 
database,
diff --git a/airflow/utils/state.py b/airflow/utils/state.py
index f18fd48c82..a0c828ee07 100644
--- a/airflow/utils/state.py
+++ b/airflow/utils/state.py
@@ -82,10 +82,7 @@ class DagRunState(str, Enum):
 
 
 class State:
-    """
-    Static class with task instance state constants and color methods to
-    avoid hardcoding.
-    """
+    """Static class with task instance state constants and color methods to 
avoid hardcoding."""
 
     # Backwards-compat constants for code that does not yet use the enum
     # These first three are shared by DagState and TaskState
diff --git a/airflow/utils/task_group.py b/airflow/utils/task_group.py
index 50fa5ca19a..3c3a01bc7d 100644
--- a/airflow/utils/task_group.py
+++ b/airflow/utils/task_group.py
@@ -15,10 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""
-A TaskGroup is a collection of closely related tasks on the same DAG that 
should be grouped
-together when the DAG is displayed graphically.
-"""
+"""A collection of closely related tasks on the same DAG that should be 
grouped together visually."""
 from __future__ import annotations
 
 import copy
@@ -53,8 +50,10 @@ if TYPE_CHECKING:
 
 class TaskGroup(DAGNode):
     """
-    A collection of tasks. When set_downstream() or set_upstream() are called 
on the
-    TaskGroup, it is applied across all tasks within the group if necessary.
+    A collection of tasks.
+
+    When set_downstream() or set_upstream() are called on the TaskGroup, it is 
applied across
+    all tasks within the group if necessary.
 
     :param group_id: a unique, meaningful id for the TaskGroup. group_id must 
not conflict
         with group_id of TaskGroup or task_id of tasks in the DAG. Root 
TaskGroup has group_id
@@ -318,6 +317,7 @@ class TaskGroup(DAGNode):
     ) -> None:
         """
         Call set_upstream/set_downstream for all root/leaf tasks within this 
TaskGroup.
+
         Update 
upstream_group_ids/downstream_group_ids/upstream_task_ids/downstream_task_ids.
         """
         if not isinstance(task_or_task_list, Sequence):
@@ -358,10 +358,7 @@ class TaskGroup(DAGNode):
         return list(self.get_leaves())
 
     def get_roots(self) -> Generator[BaseOperator, None, None]:
-        """
-        Returns a generator of tasks that are root tasks, i.e. those with no 
upstream
-        dependencies within the TaskGroup.
-        """
+        """Return a generator of tasks with no upstream dependencies within 
the TaskGroup."""
         tasks = list(self)
         ids = {x.task_id for x in tasks}
         for task in tasks:
@@ -369,10 +366,7 @@ class TaskGroup(DAGNode):
                 yield task
 
     def get_leaves(self) -> Generator[BaseOperator, None, None]:
-        """
-        Returns a generator of tasks that are leaf tasks, i.e. those with no 
downstream
-        dependencies within the TaskGroup.
-        """
+        """Return a generator of tasks with no downstream dependencies within 
the TaskGroup."""
         tasks = list(self)
         ids = {x.task_id for x in tasks}
 
@@ -393,10 +387,7 @@ class TaskGroup(DAGNode):
                     yield from recurse_for_first_non_setup_teardown(task)
 
     def child_id(self, label):
-        """
-        Prefix label with group_id if prefix_group_id is True. Otherwise 
return the label
-        as-is.
-        """
+        """Prefix label with group_id if prefix_group_id is True. Otherwise 
return the label as-is."""
         if self.prefix_group_id:
             group_id = self.group_id
             if group_id:
@@ -407,6 +398,8 @@ class TaskGroup(DAGNode):
     @property
     def upstream_join_id(self) -> str:
         """
+        Creates a unique ID for upstream dependencies of this TaskGroup.
+
         If this TaskGroup has immediate upstream TaskGroups or tasks, a proxy 
node called
         upstream_join_id will be created in Graph view to join the outgoing 
edges from this
         TaskGroup to reduce the total number of edges needed to be displayed.
@@ -416,6 +409,8 @@ class TaskGroup(DAGNode):
     @property
     def downstream_join_id(self) -> str:
         """
+        Creates a unique ID for downstream dependencies of this TaskGroup.
+
         If this TaskGroup has immediate downstream TaskGroups or tasks, a 
proxy node called
         downstream_join_id will be created in Graph view to join the outgoing 
edges from this
         TaskGroup to reduce the total number of edges needed to be displayed.
@@ -450,7 +445,8 @@ class TaskGroup(DAGNode):
 
     def hierarchical_alphabetical_sort(self):
         """
-        Sorts children in hierarchical alphabetical order:
+        Sorts children in hierarchical alphabetical order.
+
         - groups in alphabetical order first
         - tasks in alphabetical order after them.
 
@@ -462,8 +458,7 @@ class TaskGroup(DAGNode):
 
     def topological_sort(self, _include_subdag_tasks: bool = False):
         """
-        Sorts children in topographical order, such that a task comes after 
any of its
-        upstream dependencies.
+        Sorts children in topographical order, such that a task comes after 
any of its upstream dependencies.
 
         :return: list of tasks in topological order
         """
@@ -680,10 +675,7 @@ class TaskGroupContext:
 
 
 def task_group_to_dict(task_item_or_group):
-    """
-    Create a nested dict representation of this TaskGroup and its children 
used to construct
-    the Graph.
-    """
+    """Create a nested dict representation of this TaskGroup and its children 
used to construct the Graph."""
     from airflow.models.abstractoperator import AbstractOperator
 
     if isinstance(task := task_item_or_group, AbstractOperator):
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index 2b65055f38..1db15ac612 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -241,6 +241,7 @@ def coerce_datetime(v: dt.datetime | None, tz: dt.tzinfo | 
None = None) -> DateT
 def td_format(td_object: None | dt.timedelta | float | int) -> str | None:
     """
     Format a timedelta object or float/int into a readable string for time 
duration.
+
     For example timedelta(seconds=3752) would become `1h:2M:32s`.
     If the time is less than a second, the return will be `<1s`.
     """
diff --git a/airflow/utils/types.py b/airflow/utils/types.py
index 788b072f39..0eab9b3b87 100644
--- a/airflow/utils/types.py
+++ b/airflow/utils/types.py
@@ -68,9 +68,6 @@ class DagRunType(str, enum.Enum):
 
 
 class EdgeInfoType(TypedDict):
-    """
-    Represents extra metadata that the DAG can store about an edge,
-    usually generated from an EdgeModifier.
-    """
+    """Extra metadata that the DAG can store about an edge, usually generated 
from an EdgeModifier."""
 
     label: str | None


Reply via email to