This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 70a99805db Completed D400 for multiple folders (#27767)
70a99805db is described below

commit 70a99805db1b411500190627488f024f9ff5ad6f
Author: Dov Benyomin Sohacheski <[email protected]>
AuthorDate: Sun Nov 27 22:18:31 2022 +0200

    Completed D400 for multiple folders (#27767)
---
 airflow/__init__.py                 |  5 ++-
 airflow/__main__.py                 |  4 +-
 airflow/dag_processing/manager.py   | 74 +++++++++++++++------------------
 airflow/dag_processing/processor.py | 21 +++++-----
 airflow/logging_config.py           |  4 +-
 airflow/plugins_manager.py          | 19 +++++----
 airflow/providers_manager.py        | 83 +++++++++++++++++++++++--------------
 airflow/sentry.py                   |  5 ++-
 airflow/settings.py                 | 42 +++++++++----------
 airflow/stats.py                    | 83 +++++++++++++++++++------------------
 airflow/templates.py                |  5 +++
 airflow/typing_compat.py            |  5 +--
 12 files changed, 185 insertions(+), 165 deletions(-)

diff --git a/airflow/__init__.py b/airflow/__init__.py
index 20cc8550b8..38190dc8b8 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -16,10 +16,11 @@
 # specific language governing permissions and limitations
 # under the License.
 """
+Init setup.
+
 Authentication is implemented using flask_login and different environments can
 implement their own login mechanisms by providing an `airflow_login` module
-in their PYTHONPATH. airflow_login should be based off the
-`airflow.www.login`
+in their PYTHONPATH. airflow_login should be based off the `airflow.www.login`
 
 isort:skip_file
 """
diff --git a/airflow/__main__.py b/airflow/__main__.py
index 6114534e1c..9de8c683b5 100644
--- a/airflow/__main__.py
+++ b/airflow/__main__.py
@@ -17,7 +17,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Main executable module"""
+"""Main executable module."""
 from __future__ import annotations
 
 import os
@@ -29,7 +29,7 @@ from airflow.configuration import conf
 
 
 def main():
-    """Main executable function"""
+    """Main executable function."""
     if conf.get("core", "security") == "kerberos":
         os.environ["KRB5CCNAME"] = conf.get("kerberos", "ccache")
         os.environ["KRB5_KTNAME"] = conf.get("kerberos", "keytab")
diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index d1450b7ed9..570e8f3060 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -66,14 +66,14 @@ from airflow.utils.sqlalchemy import prohibit_commit, 
skip_locked, with_row_lock
 
 
 class DagParsingStat(NamedTuple):
-    """Information on processing progress"""
+    """Information on processing progress."""
 
     done: bool
     all_files_processed: bool
 
 
 class DagFileStat(NamedTuple):
-    """Information about single processing of one file"""
+    """Information about single processing of one file."""
 
     num_dags: int
     import_errors: int
@@ -92,10 +92,11 @@ class DagParsingSignal(enum.Enum):
 
 class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):
     """
-    Agent for DAG file processing. It is responsible for all DAG parsing
-    related jobs in scheduler process. Mainly it can spin up 
DagFileProcessorManager
-    in a subprocess, collect DAG parsing results from it and communicate
-    signal/DAG parsing stat with it.
+    Agent for DAG file processing.
+
+    It is responsible for all DAG parsing related jobs in scheduler process.
+    Mainly it can spin up DagFileProcessorManager in a subprocess,
+    collect DAG parsing results from it and communicate signal/DAG parsing 
stat with it.
 
     This class runs in the main `airflow scheduler` process.
 
@@ -257,7 +258,7 @@ class DagFileProcessorAgent(LoggingMixin, 
MultiprocessingStartMethodMixin):
         processor_manager.start()
 
     def heartbeat(self) -> None:
-        """Check if the DagFileProcessorManager process is alive, and process 
any pending messages"""
+        """Check if the DagFileProcessorManager process is alive, and process 
any pending messages."""
         if not self._parent_signal_conn:
             raise ValueError("Process not started.")
         # Receive any pending messages before checking if the process has 
exited.
@@ -314,19 +315,16 @@ class DagFileProcessorAgent(LoggingMixin, 
MultiprocessingStartMethodMixin):
 
     @property
     def done(self) -> bool:
-        """Has DagFileProcessorManager ended?"""
+        """Whether the DagFileProcessorManager finished."""
         return self._done
 
     @property
     def all_files_processed(self):
-        """Have all files been processed at least once?"""
+        """Whether all files been processed at least once."""
         return self._all_files_processed
 
     def terminate(self):
-        """
-        Send termination signal to DAG parsing processor manager
-        and expect it to terminate all DAG file processors.
-        """
+        """Send termination signal to DAG parsing processor manager to 
terminate all DAG file processors."""
         if self._process and self._process.is_alive():
             self.log.info("Sending termination message to manager.")
             try:
@@ -335,10 +333,7 @@ class DagFileProcessorAgent(LoggingMixin, 
MultiprocessingStartMethodMixin):
                 pass
 
     def end(self):
-        """
-        Terminate (and then kill) the manager process launched.
-        :return:
-        """
+        """Terminate (and then kill) the manager process launched."""
         if not self._process:
             self.log.warning("Ending without manager process.")
             return
@@ -352,6 +347,8 @@ class DagFileProcessorAgent(LoggingMixin, 
MultiprocessingStartMethodMixin):
 
 class DagFileProcessorManager(LoggingMixin):
     """
+    Manage processes responsible for parsing DAGs.
+
     Given a list of DAG definition files, this kicks off several processors
     in parallel to process them and put the results to a multiprocessing.Queue
     for DagFileProcessorAgent to harvest. The parallelism is limited and as the
@@ -454,7 +451,7 @@ class DagFileProcessorManager(LoggingMixin):
         )
 
     def register_exit_signals(self):
-        """Register signals that stop child processes"""
+        """Register signals that stop child processes."""
         signal.signal(signal.SIGINT, self._exit_gracefully)
         signal.signal(signal.SIGTERM, self._exit_gracefully)
         # So that we ignore the debug dump signal, making it easier to send
@@ -471,10 +468,9 @@ class DagFileProcessorManager(LoggingMixin):
 
     def start(self):
         """
-        Use multiple processes to parse and generate tasks for the
-        DAGs in parallel. By processing them in separate processes,
-        we can get parallelism and isolation from potentially harmful
-        user code.
+        Use multiple processes to parse and generate tasks for the DAGs in 
parallel.
+        By processing them in separate processes, we can get parallelism and 
isolation
+        from potentially harmful user code.
         """
         self.register_exit_signals()
 
@@ -491,7 +487,7 @@ class DagFileProcessorManager(LoggingMixin):
     @provide_session
     def _deactivate_stale_dags(self, session=None):
         """
-        Detects DAGs which are no longer present in files
+        Detects DAGs which are no longer present in files.
 
         Deactivate them and remove them in the serialized_dag table
         """
@@ -536,7 +532,6 @@ class DagFileProcessorManager(LoggingMixin):
             self.last_deactivate_stale_dags_time = timezone.utcnow()
 
     def _run_parsing_loop(self):
-
         # In sync mode we want timeout=None -- wait forever until a message is 
received
         if self._async_mode:
             poll_time = 0.0
@@ -687,7 +682,6 @@ class DagFileProcessorManager(LoggingMixin):
             guard.commit()
 
     def _add_callback_to_queue(self, request: CallbackRequest):
-
         # requests are sent by dag processors. SLAs exist per-dag, but can be 
generated once per SLA-enabled
         # task in the dag. If treated like other callbacks, SLAs can cause 
feedback where a SLA arrives,
         # goes to the front of the queue, gets processed, triggers more SLAs 
from the same DAG, which go to
@@ -766,7 +760,7 @@ class DagFileProcessorManager(LoggingMixin):
         return False
 
     def _print_stat(self):
-        """Occasionally print out stats about how fast the files are getting 
processed"""
+        """Occasionally print out stats about how fast the files are getting 
processed."""
         if 0 < self.print_stats_interval < time.monotonic() - 
self.last_stat_print_time:
             if self._file_paths:
                 self._log_file_processing_stats(self._file_paths)
@@ -852,9 +846,8 @@ class DagFileProcessorManager(LoggingMixin):
 
     def get_pid(self, file_path) -> int | None:
         """
+        Retrieve the PID of the process processing the given file or None if 
the file is not being processed.
         :param file_path: the path to the file that's being processed
-        :return: the PID of the process processing the given file or None if
-            the specified file is not being processed
         """
         if file_path in self._processors:
             return self._processors[file_path].pid
@@ -870,6 +863,7 @@ class DagFileProcessorManager(LoggingMixin):
 
     def get_last_runtime(self, file_path) -> float | None:
         """
+        Retrieve the last processing time of a specific path.
         :param file_path: the path to the file that was processed
         :return: the runtime (in seconds) of the process of the last run, or
             None if the file was never processed.
@@ -879,6 +873,7 @@ class DagFileProcessorManager(LoggingMixin):
 
     def get_last_dag_count(self, file_path) -> int | None:
         """
+        Retrieve the total DAG count at a specific path.
         :param file_path: the path to the file that was processed
         :return: the number of dags loaded from that file, or None if the file
             was never processed.
@@ -888,6 +883,7 @@ class DagFileProcessorManager(LoggingMixin):
 
     def get_last_error_count(self, file_path) -> int | None:
         """
+        Retrieve the total number of errors from processing a specific path.
         :param file_path: the path to the file that was processed
         :return: the number of import errors from processing, or None if the 
file
             was never processed.
@@ -897,6 +893,7 @@ class DagFileProcessorManager(LoggingMixin):
 
     def get_last_finish_time(self, file_path) -> datetime | None:
         """
+        Retrieve the last completion time for processing a specific path.
         :param file_path: the path to the file that was processed
         :return: the finish time of the process of the last run, or None if the
             file was never processed.
@@ -906,6 +903,7 @@ class DagFileProcessorManager(LoggingMixin):
 
     def get_start_time(self, file_path) -> datetime | None:
         """
+        Retrieve the last start time for processing a specific path.
         :param file_path: the path to the file that's being processed
         :return: the start time of the process that's processing the
             specified file or None if the file is not currently being processed
@@ -916,8 +914,8 @@ class DagFileProcessorManager(LoggingMixin):
 
     def get_run_count(self, file_path) -> int:
         """
+        The number of times the given file has been parsed.
         :param file_path: the path to the file that's being processed
-        :return: the number of times the given file has been parsed
         """
         stat = self._file_stats.get(file_path)
         return stat.run_count if stat else 0
@@ -984,7 +982,7 @@ class DagFileProcessorManager(LoggingMixin):
         Stats.timing(f"dag_processing.last_duration.{file_name}", 
last_duration)
 
     def collect_results(self) -> None:
-        """Collect the result from any finished DAG processors"""
+        """Collect the result from any finished DAG processors."""
         ready = multiprocessing.connection.wait(
             self.waitables.keys() - [self._direct_scheduler_conn], timeout=0
         )
@@ -1013,7 +1011,7 @@ class DagFileProcessorManager(LoggingMixin):
         )
 
     def start_new_processes(self):
-        """Start more processors if we have enough slots and files to 
process"""
+        """Start more processors if we have enough slots and files to 
process."""
         while self._parallelism - len(self._processors) > 0 and 
self._file_path_queue:
             file_path = self._file_path_queue.popleft()
             # Stop creating duplicate processor i.e. processor with the same 
filepath
@@ -1157,7 +1155,7 @@ class DagFileProcessorManager(LoggingMixin):
             self._processors.pop(proc)
 
     def max_runs_reached(self):
-        """:return: whether all file paths have been processed max_runs 
times"""
+        """:return: whether all file paths have been processed max_runs 
times."""
         if self._max_runs == -1:  # Unlimited runs.
             return False
         for stat in self._file_stats.values():
@@ -1168,26 +1166,20 @@ class DagFileProcessorManager(LoggingMixin):
         return True
 
     def terminate(self):
-        """
-        Stops all running processors
-        :return: None
-        """
+        """Stops all running processors."""
         for processor in self._processors.values():
             Stats.decr("dag_processing.processes")
             processor.terminate()
 
     def end(self):
-        """
-        Kill all child processes on exit since we don't want to leave
-        them as orphaned.
-        """
+        """Kill all child processes on exit since we don't want to leave them 
as orphaned."""
         pids_to_kill = self.get_all_pids()
         if pids_to_kill:
             kill_child_processes_by_pids(pids_to_kill)
 
     def emit_metrics(self):
         """
-        Emit metrics about dag parsing summary
+        Emit metrics about dag parsing summary.
 
         This is called once every time around the parsing "loop" - i.e. after
         all files have been parsed.
diff --git a/airflow/dag_processing/processor.py 
b/airflow/dag_processing/processor.py
index 3580b0429f..323ff94c5e 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -60,7 +60,8 @@ if TYPE_CHECKING:
 
 
 class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin):
-    """Runs DAG processing in a separate process using DagFileProcessor
+    """
+    Runs DAG processing in a separate process using DagFileProcessor.
 
     :param file_path: a Python file containing Airflow DAG definitions
     :param pickle_dags: whether to serialize the DAG objects to the DB
@@ -266,7 +267,7 @@ class DagFileProcessorProcess(LoggingMixin, 
MultiprocessingStartMethodMixin):
     @property
     def exit_code(self) -> int | None:
         """
-        After the process is finished, this can be called to get the return 
code
+        After the process is finished, this can be called to get the return 
code.
 
         :return: the exit code of the process
         """
@@ -367,8 +368,9 @@ class DagFileProcessor(LoggingMixin):
     @provide_session
     def manage_slas(self, dag: DAG, session: Session = None) -> None:
         """
-        Finding all tasks that have SLAs defined, and sending alert emails
-        where needed. New SLA misses are also recorded in the database.
+        Finding all tasks that have SLAs defined, and sending alert emails 
when needed.
+
+        New SLA misses are also recorded in the database.
 
         We are assuming that the scheduler runs often, so we only check for
         tasks that should have succeeded in the past hour.
@@ -524,6 +526,7 @@ class DagFileProcessor(LoggingMixin):
     @staticmethod
     def update_import_errors(session: Session, dagbag: DagBag) -> None:
         """
+        Update any import errors to be displayed in the UI.
         For the DAGs in the given DagBag, record any associated import errors 
and clears
         errors for files that no longer have them. These are usually displayed 
through the
         Airflow UI so that users know that there are issues parsing DAGs.
@@ -564,10 +567,7 @@ class DagFileProcessor(LoggingMixin):
 
     @provide_session
     def _validate_task_pools(self, *, dagbag: DagBag, session: Session = 
NEW_SESSION):
-        """
-        Validates and raise exception if any task in a dag is using a 
non-existent pool
-        :meta private:
-        """
+        """Validates and raise exception if any task in a dag is using a 
non-existent pool."""
         from airflow.models.pool import Pool
 
         def check_pools(dag):
@@ -590,6 +590,7 @@ class DagFileProcessor(LoggingMixin):
 
     def update_dag_warnings(self, *, session: Session, dagbag: DagBag) -> None:
         """
+        Update any import warnings to be displayed in the UI.
         For the DAGs in the given DagBag, record any associated configuration 
warnings and clear
         warnings for files that no longer have them. These are usually 
displayed through the
         Airflow UI so that users know that there are issues parsing DAGs.
@@ -616,8 +617,8 @@ class DagFileProcessor(LoggingMixin):
         self, dagbag: DagBag, callback_requests: list[CallbackRequest], 
session: Session = NEW_SESSION
     ) -> None:
         """
-        Execute on failure callbacks. These objects can come from SchedulerJob 
or from
-        DagFileProcessorManager.
+        Execute on failure callbacks.
+        These objects can come from SchedulerJob or from 
DagFileProcessorManager.
 
         :param dagbag: Dag Bag of dags
         :param callback_requests: failure callbacks to execute
diff --git a/airflow/logging_config.py b/airflow/logging_config.py
index d78f84cb6a..f3791caeba 100644
--- a/airflow/logging_config.py
+++ b/airflow/logging_config.py
@@ -29,7 +29,7 @@ log = logging.getLogger(__name__)
 
 
 def configure_logging():
-    """Configure & Validate Airflow Logging"""
+    """Configure & Validate Airflow Logging."""
     logging_class_path = ""
     try:
         logging_class_path = conf.get("logging", "logging_config_class")
@@ -79,7 +79,7 @@ def configure_logging():
 
 
 def validate_logging_config(logging_config):
-    """Validate the provided Logging Config"""
+    """Validate the provided Logging Config."""
     # Now lets validate the other logging-related settings
     task_log_reader = conf.get("logging", "task_log_reader")
 
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index 51c74a37c3..1d88711020 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -67,7 +67,8 @@ operator_extra_links: list[Any] | None = None
 registered_operator_link_classes: dict[str, type] | None = None
 registered_ti_dep_classes: dict[str, type] | None = None
 timetable_classes: dict[str, type[Timetable]] | None = None
-"""Mapping of class names to class of OperatorLinks registered by plugins.
+"""
+Mapping of class names to class of OperatorLinks registered by plugins.
 
 Used by the DAG serialization code to only allow specific classes to be created
 during deserialization
@@ -184,8 +185,7 @@ class AirflowPlugin:
 
 def is_valid_plugin(plugin_obj):
     """
-    Check whether a potential object is a subclass of
-    the AirflowPlugin class.
+    Check whether a potential object is a subclass of the AirflowPlugin class.
 
     :param plugin_obj: potential subclass of AirflowPlugin
     :return: Whether or not the obj is a valid subclass of
@@ -205,7 +205,7 @@ def is_valid_plugin(plugin_obj):
 
 def register_plugin(plugin_instance):
     """
-    Start plugin load and register it after success initialization
+    Start plugin load and register it after success initialization.
 
     :param plugin_instance: subclass of AirflowPlugin
     """
@@ -239,7 +239,7 @@ def load_entrypoint_plugins():
 
 
 def load_plugins_from_plugin_directory():
-    """Load and register Airflow Plugins from plugins directory"""
+    """Load and register Airflow Plugins from plugins directory."""
     global import_errors
     log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER)
 
@@ -317,7 +317,7 @@ def ensure_plugins_loaded():
 
 
 def initialize_web_ui_plugins():
-    """Collect extension points for WEB UI"""
+    """Collect extension points for WEB UI."""
     global plugins
     global flask_blueprints
     global flask_appbuilder_views
@@ -357,7 +357,7 @@ def initialize_web_ui_plugins():
 
 
 def initialize_ti_deps_plugins():
-    """Creates modules for loaded extension from custom task instance 
dependency rule plugins"""
+    """Create modules for loaded extension from custom task instance 
dependency rule plugins."""
     global registered_ti_dep_classes
     if registered_ti_dep_classes is not None:
         return
@@ -378,7 +378,7 @@ def initialize_ti_deps_plugins():
 
 
 def initialize_extra_operators_links_plugins():
-    """Creates modules for loaded extension from extra operators links 
plugins"""
+    """Create modules for loaded extension from extra operators links 
plugins."""
     global global_operator_extra_links
     global operator_extra_links
     global registered_operator_link_classes
@@ -492,6 +492,7 @@ def integrate_macros_plugins() -> None:
 
 
 def integrate_listener_plugins(listener_manager: ListenerManager) -> None:
+    """Add listeners from plugins."""
     global plugins
 
     ensure_plugins_loaded()
@@ -507,7 +508,7 @@ def integrate_listener_plugins(listener_manager: 
ListenerManager) -> None:
 
 def get_plugin_info(attrs_to_dump: Iterable[str] | None = None) -> 
list[dict[str, Any]]:
     """
-    Dump plugins attributes
+    Dump plugins attributes.
 
     :param attrs_to_dump: A list of plugin attributes to dump
     """
diff --git a/airflow/providers_manager.py b/airflow/providers_manager.py
index 51c5a632c5..5b6c2b60b0 100644
--- a/airflow/providers_manager.py
+++ b/airflow/providers_manager.py
@@ -52,7 +52,9 @@ MIN_PROVIDER_VERSIONS = {
 
 def _ensure_prefix_for_placeholders(field_behaviors: dict[str, Any], 
conn_type: str):
     """
-    If the given field_behaviors dict contains a placeholders node, and there
+    Verify the correct placeholder prefix.
+
+    If the given field_behaviors dict contains a placeholder's node, and there
     are placeholders for extra fields (i.e. anything other than the built-in 
conn
     attrs), and if those extra fields are unprefixed, then add the prefix.
 
@@ -83,6 +85,8 @@ if TYPE_CHECKING:
 
 class LazyDictWithCache(MutableMapping):
     """
+    Lazy-loaded cached dictionary.
+
     Dictionary, which in case you set callable, executes the passed callable 
with `key` attribute
     at first use - and returns and caches the result.
     """
@@ -125,7 +129,7 @@ class LazyDictWithCache(MutableMapping):
 
 
 def _create_provider_info_schema_validator():
-    """Creates JSON schema validator from the provider_info.schema.json"""
+    """Creates JSON schema validator from the provider_info.schema.json."""
     import jsonschema
 
     with 
resource_files("airflow").joinpath("provider_info.schema.json").open("rb") as f:
@@ -136,7 +140,7 @@ def _create_provider_info_schema_validator():
 
 
 def _create_customized_form_field_behaviours_schema_validator():
-    """Creates JSON schema validator from the 
customized_form_field_behaviours.schema.json"""
+    """Creates JSON schema validator from the 
customized_form_field_behaviours.schema.json."""
     import jsonschema
 
     with 
resource_files("airflow").joinpath("customized_form_field_behaviours.schema.json").open("rb")
 as f:
@@ -163,7 +167,7 @@ def _check_builtin_provider_prefix(provider_package: str, 
class_name: str) -> bo
 @dataclass
 class ProviderInfo:
     """
-    Provider information
+    Provider information.
 
     :param version: version string
     :param data: dictionary with information about the provider
@@ -185,14 +189,14 @@ class ProviderInfo:
 
 
 class HookClassProvider(NamedTuple):
-    """Hook class and Provider it comes from"""
+    """Hook class and Provider it comes from."""
 
     hook_class_name: str
     package_name: str
 
 
 class HookInfo(NamedTuple):
-    """Hook information"""
+    """Hook information."""
 
     hook_class_name: str
     connection_id_attribute_name: str
@@ -203,7 +207,7 @@ class HookInfo(NamedTuple):
 
 
 class ConnectionFormWidgetInfo(NamedTuple):
-    """Connection Form Widget information"""
+    """Connection Form Widget information."""
 
     hook_class_name: str
     package_name: str
@@ -217,6 +221,7 @@ logger = logging.getLogger(__name__)
 
 
 def log_debug_import_from_sources(class_name, e, provider_package):
+    """Log debug imports from sources."""
     log.debug(
         "Optional feature disabled on exception when importing '%s' from '%s' 
package",
         class_name,
@@ -226,6 +231,7 @@ def log_debug_import_from_sources(class_name, e, 
provider_package):
 
 
 def log_optional_feature_disabled(class_name, e, provider_package):
+    """Log optional feature disabled."""
     log.debug(
         "Optional feature disabled on exception when importing '%s' from '%s' 
package",
         class_name,
@@ -240,6 +246,7 @@ def log_optional_feature_disabled(class_name, e, 
provider_package):
 
 
 def log_import_warning(class_name, e, provider_package):
+    """Log import warning."""
     log.warning(
         "Exception when importing '%s' from '%s' package",
         class_name,
@@ -312,6 +319,8 @@ def _sanity_check(
 # So we add our own decorator
 def provider_info_cache(cache_name: str) -> Callable[[T], T]:
     """
+    Decorate and cache provider info.
+
     Decorator factory that create decorator that caches initialization of 
provider's parameters
     :param cache_name: Name of the cache
     """
@@ -339,7 +348,9 @@ def provider_info_cache(cache_name: str) -> Callable[[T], 
T]:
 
 class ProvidersManager(LoggingMixin):
     """
-    Manages all provider packages. This is a Singleton class. The first time 
it is
+    Manages all provider packages.
+
+    This is a Singleton class. The first time it is
     instantiated, it discovers all available providers in installed packages 
and
     local source folders (if airflow is run from sources).
     """
@@ -446,11 +457,13 @@ class ProvidersManager(LoggingMixin):
 
     def _discover_all_providers_from_packages(self) -> None:
         """
-        Discovers all providers by scanning packages installed. The list of 
providers should be returned
-        via the 'apache_airflow_provider' entrypoint as a dictionary 
conforming to the
-        'airflow/provider_info.schema.json' schema. Note that the schema is 
different at runtime
-        than provider.yaml.schema.json. The development version of provider 
schema is more strict and changes
-        together with the code. The runtime version is more relaxed (allows 
for additional properties)
+        Discover all providers by scanning packages installed.
+
+        The list of providers should be returned via the 
'apache_airflow_provider'
+        entrypoint as a dictionary conforming to the 
'airflow/provider_info.schema.json'
+        schema. Note that the schema is different at runtime than 
provider.yaml.schema.json.
+        The development version of provider schema is more strict and changes 
together with
+        the code. The runtime version is more relaxed (allows for additional 
properties)
         and verifies only the subset of fields that are needed at runtime.
         """
         for entry_point, dist in 
entry_points_with_dist("apache_airflow_provider"):
@@ -542,9 +555,11 @@ class ProvidersManager(LoggingMixin):
         provider: ProviderInfo,
     ):
         """
-        Discover  hooks from the "connection-types" property. This is new, 
better method that replaces
-        discovery from hook-class-names as it allows to lazy import individual 
Hook classes when they
-        are accessed. The "connection-types" keeps information about both - 
connection type and class
+        Discover hooks from the "connection-types" property.
+
+        This is new, better method that replaces discovery from 
hook-class-names as it
+        allows to lazy import individual Hook classes when they are accessed.
+        The "connection-types" keeps information about both - connection type 
and class
         name so we can discover all connection-types without importing the 
classes.
         :param hook_class_names_registered: set of registered hook class names 
for this provider
         :param already_registered_warning_connection_types: set of connections 
for which warning should be
@@ -595,9 +610,11 @@ class ProvidersManager(LoggingMixin):
         provider_uses_connection_types: bool,
     ):
         """
-        Discovers hooks from "hook-class-names' property. This property is 
deprecated but we should
-        support it in Airflow 2. The hook-class-names array contained just 
Hook names without connection
-        type, therefore we need to import all those classes immediately to 
know which connection types
+        Discover hooks from "hook-class-names' property.
+
+        This property is deprecated but we should support it in Airflow 2.
+        The hook-class-names array contained just Hook names without 
connection type,
+        therefore we need to import all those classes immediately to know 
which connection types
         are supported. This makes it impossible to selectively only import 
those hooks that are used.
         :param already_registered_warning_connection_types: list of connection 
hooks that we should warn
             about when finished discovery
@@ -661,7 +678,7 @@ class ProvidersManager(LoggingMixin):
             )
 
     def _discover_hooks(self) -> None:
-        """Retrieves all connections defined in the providers via Hooks"""
+        """Retrieve all connections defined in the providers via Hooks."""
         for package_name, provider in self._provider_dict.items():
             duplicated_connection_types: set[str] = set()
             hook_class_names_registered: set[str] = set()
@@ -679,7 +696,7 @@ class ProvidersManager(LoggingMixin):
 
     @provider_info_cache("import_all_hooks")
     def _import_info_from_all_hooks(self):
-        """Force-import all hooks and initialize the connections/fields"""
+        """Force-import all hooks and initialize the connections/fields."""
         # Retrieve all hooks to make sure that all of them are imported
         _ = list(self._hooks_lazy_dict.values())
         self._field_behaviours = 
OrderedDict(sorted(self._field_behaviours.items()))
@@ -721,7 +738,7 @@ class ProvidersManager(LoggingMixin):
 
     @staticmethod
     def _get_attr(obj: Any, attr_name: str):
-        """Retrieves attributes of an object, or warns if not found"""
+        """Retrieve attributes of an object, or warn if not found."""
         if not hasattr(obj, attr_name):
             log.warning("The object '%s' is missing %s attribute and cannot be 
registered", obj, attr_name)
             return None
@@ -735,10 +752,11 @@ class ProvidersManager(LoggingMixin):
         package_name: str | None = None,
     ) -> HookInfo | None:
         """
-        Imports hook and retrieves hook information. Either connection_type 
(for lazy loading)
-        or hook_class_name must be set - but not both). Only needs 
package_name if hook_class_name is
-        passed (for lazy loading, package_name is retrieved from 
_connection_type_class_provider_dict
-        together with hook_class_name).
+        Import hook and retrieve hook information.
+
+        Either connection_type (for lazy loading) or hook_class_name must be 
set - but not both).
+        Only needs package_name if hook_class_name is passed (for lazy 
loading, package_name
+        is retrieved from _connection_type_class_provider_dict together with 
hook_class_name).
 
         :param connection_type: type of the connection
         :param hook_class_name: name of the hook class
@@ -883,7 +901,7 @@ class ProvidersManager(LoggingMixin):
             )
 
     def _discover_extra_links(self) -> None:
-        """Retrieves all extra links defined in the providers"""
+        """Retrieves all extra links defined in the providers."""
         for provider_package, provider in self._provider_dict.items():
             if provider.data.get("extra-links"):
                 for extra_link_class_name in provider.data["extra-links"]:
@@ -891,7 +909,7 @@ class ProvidersManager(LoggingMixin):
                         
self._extra_link_class_name_set.add(extra_link_class_name)
 
     def _discover_logging(self) -> None:
-        """Retrieves all logging defined in the providers"""
+        """Retrieve all logging defined in the providers."""
         for provider_package, provider in self._provider_dict.items():
             if provider.data.get("logging"):
                 for logging_class_name in provider.data["logging"]:
@@ -899,7 +917,7 @@ class ProvidersManager(LoggingMixin):
                         self._logging_class_name_set.add(logging_class_name)
 
     def _discover_secrets_backends(self) -> None:
-        """Retrieves all secrets backends defined in the providers"""
+        """Retrieve all secrets backends defined in the providers."""
         for provider_package, provider in self._provider_dict.items():
             if provider.data.get("secrets-backends"):
                 for secrets_backends_class_name in 
provider.data["secrets-backends"]:
@@ -907,7 +925,7 @@ class ProvidersManager(LoggingMixin):
                         
self._secrets_backend_class_name_set.add(secrets_backends_class_name)
 
     def _discover_auth_backends(self) -> None:
-        """Retrieves all API auth backends defined in the providers"""
+        """Retrieve all API auth backends defined in the providers."""
         for provider_package, provider in self._provider_dict.items():
             if provider.data.get("auth-backends"):
                 for auth_backend_module_name in provider.data["auth-backends"]:
@@ -923,8 +941,9 @@ class ProvidersManager(LoggingMixin):
     @property
     def hooks(self) -> MutableMapping[str, HookInfo | None]:
         """
-        Returns dictionary of connection_type-to-hook mapping. Note that the 
dict can contain
-        None values if a hook discovered cannot be imported!
+        Return dictionary of connection_type-to-hook mapping.
+
+        Note that the dict can contain None values if a hook discovered cannot 
be imported!
         """
         self.initialize_providers_hooks()
         # When we return hooks here it will only be used to retrieve hook 
information
diff --git a/airflow/sentry.py b/airflow/sentry.py
index a5d1ff5c97..ed223fcccf 100644
--- a/airflow/sentry.py
+++ b/airflow/sentry.py
@@ -15,7 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Sentry Integration"""
+"""Sentry Integration."""
 from __future__ import annotations
 
 import logging
@@ -150,8 +150,9 @@ if conf.getboolean("sentry", "sentry_on", fallback=False):
 
         def enrich_errors(self, func):
             """
+            Decorate errors.
             Wrap TaskInstance._run_raw_task and 
LocalTaskJob._run_mini_scheduler_on_child_tasks
-             to support task specific tags and breadcrumbs.
+            to support task specific tags and breadcrumbs.
             """
             session_args_idx = find_session_idx(func)
 
diff --git a/airflow/settings.py b/airflow/settings.py
index 537c49141a..09d657a6b2 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -116,7 +116,7 @@ def _get_rich_console(file):
 
 
 def custom_show_warning(message, category, filename, lineno, file=None, 
line=None):
-    """Custom function to print rich and visible warnings"""
+    """Custom function to print rich and visible warnings."""
     # Delay imports until we need it
     from rich.markup import escape
 
@@ -144,8 +144,9 @@ atexit.register(functools.partial(replace_showwarning, 
original_show_warning))
 
 def task_policy(task) -> None:
     """
-    This policy setting allows altering tasks after they are loaded in
-    the DagBag. It allows administrator to rewire some task's parameters.
+    This policy setting allows altering tasks after they are loaded in the 
DagBag.
+
+    It allows administrator to rewire some task's parameters.
     Alternatively you can raise ``AirflowClusterPolicyViolation`` exception
     to stop DAG from being executed.
 
@@ -166,8 +167,9 @@ def task_policy(task) -> None:
 
 def dag_policy(dag) -> None:
     """
-    This policy setting allows altering DAGs after they are loaded in
-    the DagBag. It allows administrator to rewire some DAG's parameters.
+    This policy setting allows altering DAGs after they are loaded in the 
DagBag.
+
+    It allows administrator to rewire some DAG's parameters.
     Alternatively you can raise ``AirflowClusterPolicyViolation`` exception
     to stop DAG from being executed.
 
@@ -185,8 +187,7 @@ def dag_policy(dag) -> None:
 
 def task_instance_mutation_hook(task_instance):
     """
-    This setting allows altering task instances before they are queued by
-    the Airflow scheduler.
+    This setting allows altering task instances before being queued by the 
Airflow scheduler.
 
     To define task_instance_mutation_hook, add a ``airflow_local_settings`` 
module
     to your PYTHONPATH that defines this ``task_instance_mutation_hook`` 
function.
@@ -202,6 +203,8 @@ task_instance_mutation_hook.is_noop = True  # type: ignore
 
 def pod_mutation_hook(pod):
     """
+    Mutate pod before scheduling.
+
     This setting allows altering ``kubernetes.client.models.V1Pod`` object
     before they are passed to the Kubernetes client for scheduling.
 
@@ -241,7 +244,7 @@ def get_dagbag_import_timeout(dag_file_path: str) -> int | 
float:
 
 
 def configure_vars():
-    """Configure Global Variables from airflow.cfg"""
+    """Configure Global Variables from airflow.cfg."""
     global SQL_ALCHEMY_CONN
     global DAGS_FOLDER
     global PLUGINS_FOLDER
@@ -260,7 +263,7 @@ def configure_vars():
 
 
 def configure_orm(disable_connection_pool=False, pool_class=None):
-    """Configure ORM using SQLAlchemy"""
+    """Configure ORM using SQLAlchemy."""
     from airflow.utils.log.secrets_masker import mask_secret
 
     log.debug("Setting up DB connection pool (PID %s)", os.getpid())
@@ -320,7 +323,7 @@ DEFAULT_ENGINE_ARGS = {
 
 
 def prepare_engine_args(disable_connection_pool=False, pool_class=None):
-    """Prepare SQLAlchemy engine args"""
+    """Prepare SQLAlchemy engine args."""
     default_args = {}
     for dialect, default in DEFAULT_ENGINE_ARGS.items():
         if SQL_ALCHEMY_CONN.startswith(dialect):
@@ -403,7 +406,7 @@ def prepare_engine_args(disable_connection_pool=False, 
pool_class=None):
 
 
 def dispose_orm():
-    """Properly close pooled database connections"""
+    """Properly close pooled database connections."""
     log.debug("Disposing DB connection pool (PID %s)", os.getpid())
     global engine
     global Session
@@ -417,13 +420,13 @@ def dispose_orm():
 
 
 def reconfigure_orm(disable_connection_pool=False, pool_class=None):
-    """Properly close database connections and re-configure ORM"""
+    """Properly close database connections and re-configure ORM."""
     dispose_orm()
     configure_orm(disable_connection_pool=disable_connection_pool, 
pool_class=pool_class)
 
 
 def configure_adapters():
-    """Register Adapters and DB Converters"""
+    """Register Adapters and DB Converters."""
     from pendulum import DateTime as Pendulum
 
     if SQL_ALCHEMY_CONN.startswith("sqlite"):
@@ -447,7 +450,7 @@ def configure_adapters():
 
 
 def validate_session():
-    """Validate ORM Session"""
+    """Validate ORM Session."""
     global engine
 
     worker_precheck = conf.getboolean("celery", "worker_precheck", 
fallback=False)
@@ -467,14 +470,11 @@ def validate_session():
 
 
 def configure_action_logging() -> None:
-    """
-    Any additional configuration (register callback) for 
airflow.utils.action_loggers
-    module
-    """
+    """Any additional configuration (register callback) for 
airflow.utils.action_loggers module."""
 
 
 def prepare_syspath():
-    """Ensures that certain subfolders of AIRFLOW_HOME are on the classpath"""
+    """Ensure certain subfolders of AIRFLOW_HOME are on the classpath."""
     if DAGS_FOLDER not in sys.path:
         sys.path.append(DAGS_FOLDER)
 
@@ -519,7 +519,7 @@ def get_session_lifetime_config():
 
 
 def import_local_settings():
-    """Import airflow_local_settings.py files to allow overriding any configs 
in settings.py file"""
+    """Import airflow_local_settings.py files to allow overriding any configs 
in settings.py file."""
     try:
         import airflow_local_settings
 
@@ -561,7 +561,7 @@ def import_local_settings():
 
 
 def initialize():
-    """Initialize Airflow with all the settings from this file"""
+    """Initialize Airflow with all the settings from this file."""
     configure_vars()
     prepare_syspath()
     import_local_settings()
diff --git a/airflow/stats.py b/airflow/stats.py
index d231964fd6..a20d8cfa48 100644
--- a/airflow/stats.py
+++ b/airflow/stats.py
@@ -33,7 +33,7 @@ log = logging.getLogger(__name__)
 
 
 class TimerProtocol(Protocol):
-    """Type protocol for StatsLogger.timer"""
+    """Type protocol for StatsLogger.timer."""
 
     def __enter__(self):
         ...
@@ -42,36 +42,36 @@ class TimerProtocol(Protocol):
         ...
 
     def start(self):
-        """Start the timer"""
+        """Start the timer."""
         ...
 
     def stop(self, send=True):
-        """Stop, and (by default) submit the timer to StatsD"""
+        """Stop, and (by default) submit the timer to StatsD."""
         ...
 
 
 class StatsLogger(Protocol):
-    """This class is only used for TypeChecking (for IDEs, mypy, etc)"""
+    """This class is only used for TypeChecking (for IDEs, mypy, etc)."""
 
     @classmethod
     def incr(cls, stat: str, count: int = 1, rate: int = 1) -> None:
-        """Increment stat"""
+        """Increment stat."""
 
     @classmethod
     def decr(cls, stat: str, count: int = 1, rate: int = 1) -> None:
-        """Decrement stat"""
+        """Decrement stat."""
 
     @classmethod
     def gauge(cls, stat: str, value: float, rate: int = 1, delta: bool = 
False) -> None:
-        """Gauge stat"""
+        """Gauge stat."""
 
     @classmethod
     def timing(cls, stat: str, dt: float | datetime.timedelta) -> None:
-        """Stats timing"""
+        """Stats timing."""
 
     @classmethod
     def timer(cls, *args, **kwargs) -> TimerProtocol:
-        """Timer metric that can be cancelled"""
+        """Timer metric that can be cancelled."""
 
 
 class Timer:
@@ -139,41 +139,41 @@ class Timer:
         self.stop()
 
     def start(self):
-        """Start the timer"""
+        """Start the timer."""
         if self.real_timer:
             self.real_timer.start()
         self._start_time = time.perf_counter()
         return self
 
     def stop(self, send=True):
-        """Stop the timer, and optionally send it to stats backend"""
+        """Stop the timer, and optionally send it to stats backend."""
         self.duration = time.perf_counter() - self._start_time
         if send and self.real_timer:
             self.real_timer.stop()
 
 
 class DummyStatsLogger:
-    """If no StatsLogger is configured, DummyStatsLogger is used as a 
fallback"""
+    """If no StatsLogger is configured, DummyStatsLogger is used as a 
fallback."""
 
     @classmethod
     def incr(cls, stat, count=1, rate=1):
-        """Increment stat"""
+        """Increment stat."""
 
     @classmethod
     def decr(cls, stat, count=1, rate=1):
-        """Decrement stat"""
+        """Decrement stat."""
 
     @classmethod
     def gauge(cls, stat, value, rate=1, delta=False):
-        """Gauge stat"""
+        """Gauge stat."""
 
     @classmethod
     def timing(cls, stat, dt):
-        """Stats timing"""
+        """Stats timing."""
 
     @classmethod
     def timer(cls, *args, **kwargs):
-        """Timer metric that can be cancelled"""
+        """Timer metric that can be cancelled."""
         return Timer()
 
 
@@ -183,8 +183,10 @@ ALLOWED_CHARACTERS = set(string.ascii_letters + 
string.digits + "_.-")
 
 
 def stat_name_default_handler(stat_name, max_length=250) -> str:
-    """A function that validate the StatsD stat name, apply changes to the 
stat name
-    if necessary and return the transformed stat name.
+    """
+    Validate the StatsD stat name.
+
+    Apply changes when necessary and return the transformed stat name.
     """
     if not isinstance(stat_name, str):
         raise InvalidStatsNameException("The stat_name has to be a string")
@@ -200,7 +202,7 @@ def stat_name_default_handler(stat_name, max_length=250) -> 
str:
 
 
 def get_current_handler_stat_name_func() -> Callable[[str], str]:
-    """Get Stat Name Handler from airflow.cfg"""
+    """Get Stat Name Handler from airflow.cfg."""
     return conf.getimport("metrics", "stat_name_handler") or 
stat_name_default_handler
 
 
@@ -208,8 +210,9 @@ T = TypeVar("T", bound=Callable)
 
 
 def validate_stat(fn: T) -> T:
-    """Check if stat name contains invalid characters.
-    Log and not emit stats if name is invalid
+    """
+    Check if stat name contains invalid characters.
+    Log and not emit stats if name is invalid.
     """
 
     @wraps(fn)
@@ -227,7 +230,7 @@ def validate_stat(fn: T) -> T:
 
 
 class AllowListValidator:
-    """Class to filter unwanted stats"""
+    """Class to filter unwanted stats."""
 
     def __init__(self, allow_list=None):
         if allow_list:
@@ -237,7 +240,7 @@ class AllowListValidator:
             self.allow_list = None
 
     def test(self, stat):
-        """Test if stat is in the Allow List"""
+        """Test if stat is in the Allow List."""
         if self.allow_list is not None:
             return stat.strip().lower().startswith(self.allow_list)
         else:
@@ -245,7 +248,7 @@ class AllowListValidator:
 
 
 class SafeStatsdLogger:
-    """StatsD Logger"""
+    """StatsD Logger."""
 
     def __init__(self, statsd_client, 
allow_list_validator=AllowListValidator()):
         self.statsd = statsd_client
@@ -253,42 +256,42 @@ class SafeStatsdLogger:
 
     @validate_stat
     def incr(self, stat, count=1, rate=1):
-        """Increment stat"""
+        """Increment stat."""
         if self.allow_list_validator.test(stat):
             return self.statsd.incr(stat, count, rate)
         return None
 
     @validate_stat
     def decr(self, stat, count=1, rate=1):
-        """Decrement stat"""
+        """Decrement stat."""
         if self.allow_list_validator.test(stat):
             return self.statsd.decr(stat, count, rate)
         return None
 
     @validate_stat
     def gauge(self, stat, value, rate=1, delta=False):
-        """Gauge stat"""
+        """Gauge stat."""
         if self.allow_list_validator.test(stat):
             return self.statsd.gauge(stat, value, rate, delta)
         return None
 
     @validate_stat
     def timing(self, stat, dt):
-        """Stats timing"""
+        """Stats timing."""
         if self.allow_list_validator.test(stat):
             return self.statsd.timing(stat, dt)
         return None
 
     @validate_stat
     def timer(self, stat=None, *args, **kwargs):
-        """Timer metric that can be cancelled"""
+        """Timer metric that can be cancelled."""
         if stat and self.allow_list_validator.test(stat):
             return Timer(self.statsd.timer(stat, *args, **kwargs))
         return Timer()
 
 
 class SafeDogStatsdLogger:
-    """DogStatsd Logger"""
+    """DogStatsd Logger."""
 
     def __init__(self, dogstatsd_client, 
allow_list_validator=AllowListValidator()):
         self.dogstatsd = dogstatsd_client
@@ -296,7 +299,7 @@ class SafeDogStatsdLogger:
 
     @validate_stat
     def incr(self, stat, count=1, rate=1, tags=None):
-        """Increment stat"""
+        """Increment stat."""
         if self.allow_list_validator.test(stat):
             tags = tags or []
             return self.dogstatsd.increment(metric=stat, value=count, 
tags=tags, sample_rate=rate)
@@ -304,7 +307,7 @@ class SafeDogStatsdLogger:
 
     @validate_stat
     def decr(self, stat, count=1, rate=1, tags=None):
-        """Decrement stat"""
+        """Decrement stat."""
         if self.allow_list_validator.test(stat):
             tags = tags or []
             return self.dogstatsd.decrement(metric=stat, value=count, 
tags=tags, sample_rate=rate)
@@ -312,7 +315,7 @@ class SafeDogStatsdLogger:
 
     @validate_stat
     def gauge(self, stat, value, rate=1, delta=False, tags=None):
-        """Gauge stat"""
+        """Gauge stat."""
         if self.allow_list_validator.test(stat):
             tags = tags or []
             return self.dogstatsd.gauge(metric=stat, value=value, tags=tags, 
sample_rate=rate)
@@ -320,7 +323,7 @@ class SafeDogStatsdLogger:
 
     @validate_stat
     def timing(self, stat, dt: float | datetime.timedelta, tags: list[str] | 
None = None):
-        """Stats timing"""
+        """Stats timing."""
         if self.allow_list_validator.test(stat):
             tags = tags or []
             if isinstance(dt, datetime.timedelta):
@@ -330,7 +333,7 @@ class SafeDogStatsdLogger:
 
     @validate_stat
     def timer(self, stat=None, *args, tags=None, **kwargs):
-        """Timer metric that can be cancelled"""
+        """Timer metric that can be cancelled."""
         if stat and self.allow_list_validator.test(stat):
             tags = tags or []
             return Timer(self.dogstatsd.timed(stat, *args, tags=tags, 
**kwargs))
@@ -363,7 +366,7 @@ class _Stats(type):
 
     @classmethod
     def get_statsd_logger(cls):
-        """Returns logger for StatsD"""
+        """Returns logger for StatsD."""
         # no need to check for the scheduler/statsd_on -> this method is only 
called when it is set
         # and previously it would crash with None is callable if it was called 
without it.
         from statsd import StatsClient
@@ -392,7 +395,7 @@ class _Stats(type):
 
     @classmethod
     def get_dogstatsd_logger(cls):
-        """Get DataDog StatsD logger"""
+        """Get DataDog StatsD logger."""
         from datadog import DogStatsd
 
         dogstatsd = DogStatsd(
@@ -407,7 +410,7 @@ class _Stats(type):
 
     @classmethod
     def get_constant_tags(cls):
-        """Get constant DataDog tags to add to all stats"""
+        """Get constant DataDog tags to add to all stats."""
         tags = []
         tags_in_string = conf.get("metrics", "statsd_datadog_tags", 
fallback=None)
         if tags_in_string is None or tags_in_string == "":
@@ -423,4 +426,4 @@ if TYPE_CHECKING:
 else:
 
     class Stats(metaclass=_Stats):
-        """Empty class for Stats - we use metaclass to inject the right one"""
+        """Empty class for Stats - we use metaclass to inject the right one."""
diff --git a/airflow/templates.py b/airflow/templates.py
index 2b466a4952..8cd113054d 100644
--- a/airflow/templates.py
+++ b/airflow/templates.py
@@ -48,30 +48,35 @@ class SandboxedEnvironment(_AirflowEnvironmentMixin, 
jinja2.sandbox.SandboxedEnv
 
 
 def ds_filter(value: datetime.date | datetime.time | None) -> str | None:
+    """Date filter."""
     if value is None:
         return None
     return value.strftime("%Y-%m-%d")
 
 
 def ds_nodash_filter(value: datetime.date | datetime.time | None) -> str | 
None:
+    """Date filter without dashes."""
     if value is None:
         return None
     return value.strftime("%Y%m%d")
 
 
 def ts_filter(value: datetime.date | datetime.time | None) -> str | None:
+    """Timestamp filter."""
     if value is None:
         return None
     return value.isoformat()
 
 
 def ts_nodash_filter(value: datetime.date | datetime.time | None) -> str | 
None:
+    """Timestamp filter without dashes."""
     if value is None:
         return None
     return value.strftime("%Y%m%dT%H%M%S")
 
 
 def ts_nodash_with_tz_filter(value: datetime.date | datetime.time | None) -> 
str | None:
+    """Timestamp filter with timezone."""
     if value is None:
         return None
     return value.isoformat().replace("-", "").replace(":", "")
diff --git a/airflow/typing_compat.py b/airflow/typing_compat.py
index 5f2321962a..e7bc6de304 100644
--- a/airflow/typing_compat.py
+++ b/airflow/typing_compat.py
@@ -15,10 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""
-This module provides helper code to make type annotation within Airflow
-codebase easier.
-"""
+"""This module provides helper code to make type annotation within Airflow 
codebase easier."""
 from __future__ import annotations
 
 __all__ = [

Reply via email to