This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9d0006fb9e D205 Support - Utils (#32591)
9d0006fb9e is described below
commit 9d0006fb9ee9c1b4742613fa3c1e82064eda6ba8
Author: D. Ferruzzi <[email protected]>
AuthorDate: Mon Jul 17 16:06:56 2023 -0700
D205 Support - Utils (#32591)
Co-author: shubham22
Co-authored-by: Niko Oliveira <[email protected]>
---
airflow/utils/cli.py | 18 +++++-------
airflow/utils/cli_action_loggers.py | 6 ++--
airflow/utils/configuration.py | 3 +-
airflow/utils/dag_cycle_tester.py | 5 ++--
airflow/utils/db.py | 24 +++++++--------
airflow/utils/db_cleanup.py | 5 +++-
airflow/utils/dot_renderer.py | 1 +
airflow/utils/edgemodifier.py | 21 ++++++--------
airflow/utils/email.py | 7 +++--
airflow/utils/file.py | 22 +++++++-------
airflow/utils/hashlib_wrapper.py | 6 ++--
airflow/utils/helpers.py | 16 +++-------
airflow/utils/json.py | 6 +---
airflow/utils/log/colored_log.py | 7 +++--
airflow/utils/log/file_processor_handler.py | 12 ++++----
airflow/utils/log/file_task_handler.py | 16 +++++-----
airflow/utils/log/logging_mixin.py | 25 ++++++++--------
airflow/utils/log/non_caching_file_handler.py | 26 +++++++++--------
airflow/utils/log/trigger_handler.py | 3 +-
airflow/utils/mixins.py | 5 ++--
airflow/utils/module_loading.py | 5 ++--
airflow/utils/net.py | 9 +++---
airflow/utils/operator_helpers.py | 10 ++++---
airflow/utils/operator_resources.py | 10 +++----
airflow/utils/platform.py | 8 ++---
airflow/utils/process_utils.py | 1 +
airflow/utils/session.py | 1 +
airflow/utils/sqlalchemy.py | 38 ++++++++++++------------
airflow/utils/state.py | 5 +---
airflow/utils/task_group.py | 42 +++++++++++----------------
airflow/utils/timezone.py | 1 +
airflow/utils/types.py | 5 +---
32 files changed, 176 insertions(+), 193 deletions(-)
diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py
index c800ed71fc..354c2b236c 100644
--- a/airflow/utils/cli.py
+++ b/airflow/utils/cli.py
@@ -61,9 +61,10 @@ def _check_cli_args(args):
def action_cli(func=None, check_db=True):
def action_logging(f: T) -> T:
"""
- Decorates function to execute function at the same time submitting
action_logging
- but in CLI context. It will call action logger callbacks twice,
- one for pre-execution and the other one for post-execution.
+ Decorates function to execute function at the same time submitting
action_logging but in CLI context.
+
+ It will call action logger callbacks twice, one for
+ pre-execution and the other one for post-execution.
Action logger will be called with below keyword parameters:
sub_command : name of sub-command
@@ -84,8 +85,7 @@ def action_cli(func=None, check_db=True):
@functools.wraps(f)
def wrapper(*args, **kwargs):
"""
- An wrapper for cli functions. It assumes to have Namespace instance
- at 1st positional argument.
+ A wrapper for cli functions; assumes Namespace instance as first
positional argument.
:param args: Positional argument. It assumes to have Namespace
instance
at 1st positional argument
@@ -126,7 +126,8 @@ def action_cli(func=None, check_db=True):
def _build_metrics(func_name, namespace):
"""
- Builds metrics dict from function args
+ Builds metrics dict from function args.
+
It assumes that function arguments is from airflow.bin.cli module's
function
and has Namespace instance where it optionally contains "dag_id",
"task_id",
and "execution_date".
@@ -359,10 +360,7 @@ def should_ignore_depends_on_past(args) -> bool:
def suppress_logs_and_warning(f: T) -> T:
- """
- Decorator to suppress logging and warning messages
- in cli functions.
- """
+ """Decorator to suppress logging and warning messages in cli functions."""
@functools.wraps(f)
def _wrapper(*args, **kwargs):
diff --git a/airflow/utils/cli_action_loggers.py
b/airflow/utils/cli_action_loggers.py
index 02c311d40a..575f4bfb08 100644
--- a/airflow/utils/cli_action_loggers.py
+++ b/airflow/utils/cli_action_loggers.py
@@ -16,8 +16,10 @@
# specific language governing permissions and limitations
# under the License.
"""
-An Action Logger module. Singleton pattern has been applied into this module
-so that registered callbacks can be used all through the same python process.
+An Action Logger module.
+
+Singleton pattern has been applied into this module so that registered
+callbacks can be used all through the same python process.
"""
from __future__ import annotations
diff --git a/airflow/utils/configuration.py b/airflow/utils/configuration.py
index 98fb6388e6..84c5e946ee 100644
--- a/airflow/utils/configuration.py
+++ b/airflow/utils/configuration.py
@@ -27,8 +27,7 @@ from airflow.utils.platform import IS_WINDOWS
def tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True):
"""
- Returns a path for a temporary file including a full copy of the
configuration
- settings.
+ Returns a path for a temporary file including a full copy of the
configuration settings.
:param include_env: Should the value of configuration from ``AIRFLOW__``
environment variables be included or not
diff --git a/airflow/utils/dag_cycle_tester.py
b/airflow/utils/dag_cycle_tester.py
index 8f150dc0a3..4cea52200d 100644
--- a/airflow/utils/dag_cycle_tester.py
+++ b/airflow/utils/dag_cycle_tester.py
@@ -33,8 +33,9 @@ CYCLE_DONE = 2
def test_cycle(dag: DAG) -> None:
"""
A wrapper function of `check_cycle` for backward compatibility purpose.
- New code should use `check_cycle` instead since this function name
`test_cycle` starts with 'test_' and
- will be considered as a unit test by pytest, resulting in failure.
+
+ New code should use `check_cycle` instead since this function name
`test_cycle` starts
+ with 'test_' and will be considered as a unit test by pytest, resulting in
failure.
"""
from warnings import warn
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index eb4c863dc3..f4c2ae8b1b 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -1006,10 +1006,10 @@ def check_username_duplicates(session: Session) ->
Iterable[str]:
def reflect_tables(tables: list[Base | str] | None, session):
"""
- When running checks prior to upgrades, we use reflection to determine
current state of the
- database.
- This function gets the current state of each table in the set of models
provided and returns
- a SqlAlchemy metadata object containing them.
+ When running checks prior to upgrades, we use reflection to determine
current state of the database.
+
+ This function gets the current state of each table in the set of models
+ provided and returns a SqlAlchemy metadata object containing them.
"""
import sqlalchemy.schema
@@ -1180,6 +1180,7 @@ def _create_table_as(
):
"""
Create a new table with rows from query.
+
We have to handle CTAS differently for different dialects.
"""
from sqlalchemy import column, select, table
@@ -1256,10 +1257,7 @@ def _move_dangling_data_to_new_table(
def _dangling_against_dag_run(session, source_table, dag_run):
- """
- Given a source table, we generate a subquery that will return 1 for every
row that
- has a dagrun.
- """
+ """Given a source table, we generate a subquery that will return 1 for
every row that has a dagrun."""
source_to_dag_run_join_cond = and_(
source_table.c.dag_id == dag_run.c.dag_id,
source_table.c.execution_date == dag_run.c.execution_date,
@@ -1274,8 +1272,7 @@ def _dangling_against_dag_run(session, source_table,
dag_run):
def _dangling_against_task_instance(session, source_table, dag_run,
task_instance):
"""
- Given a source table, we generate a subquery that will return 1 for every
row that
- has a valid task instance (and associated dagrun).
+ Given a source table, generate a subquery that will return 1 for every row
that has a valid task instance.
This is used to identify rows that need to be removed from tables prior to
adding a TI fk.
@@ -1367,8 +1364,8 @@ def _move_duplicate_data_to_new_table(
def check_bad_references(session: Session) -> Iterable[str]:
"""
- Starting in Airflow 2.2, we began a process of replacing `execution_date`
with `run_id`
- in many tables.
+ Starting in Airflow 2.2, we began a process of replacing `execution_date`
with `run_id` in many tables.
+
Here we go through each table and look for records that can't be mapped to
a dag run.
When we find such "dangling" rows we back them up in a special table and
delete them
from the main table.
@@ -1383,6 +1380,8 @@ def check_bad_references(session: Session) ->
Iterable[str]:
@dataclass
class BadReferenceConfig:
"""
+ Bad reference config class.
+
:param bad_rows_func: function that returns subquery which determines
whether bad rows exist
:param join_tables: table objects referenced in subquery
:param ref_table: information-only identifier for categorizing the
missing ref
@@ -1552,6 +1551,7 @@ def upgradedb(
session: Session = NEW_SESSION,
):
"""
+ Upgrades the DB.
:param to_revision: Optional Alembic revision ID to upgrade *to*.
If omitted, upgrades to latest revision.
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index 22fb0020e7..36bced9473 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -15,7 +15,9 @@
# specific language governing permissions and limitations
# under the License.
"""
-This module took inspiration from the community maintenance dag
+This module took inspiration from the community maintenance dag.
+
+See:
(https://github.com/teamclairvoyant/airflow-maintenance-dags/blob/4e5c7682a808082561d60cbc9cafaa477b0d8c65/db-cleanup/airflow-db-cleanup.py).
"""
from __future__ import annotations
@@ -341,6 +343,7 @@ def _print_config(*, configs: dict[str, _TableConfig]):
def _suppress_with_logging(table, session):
"""
Suppresses errors but logs them.
+
Also stores the exception instance so it can be referred to after exiting
context.
"""
try:
diff --git a/airflow/utils/dot_renderer.py b/airflow/utils/dot_renderer.py
index d3b329cb54..f168566f7e 100644
--- a/airflow/utils/dot_renderer.py
+++ b/airflow/utils/dot_renderer.py
@@ -38,6 +38,7 @@ from airflow.utils.task_group import TaskGroup
def _refine_color(color: str):
"""
Converts color in #RGB (12 bits) format to #RRGGBB (32 bits), if it
possible.
+
Otherwise, it returns the original value. Graphviz does not support colors
in #RGB format.
:param color: Text representation of color
diff --git a/airflow/utils/edgemodifier.py b/airflow/utils/edgemodifier.py
index c6d8065c45..14aa3ef25e 100644
--- a/airflow/utils/edgemodifier.py
+++ b/airflow/utils/edgemodifier.py
@@ -24,8 +24,9 @@ from airflow.utils.task_group import TaskGroup
class EdgeModifier(DependencyMixin):
"""
- Class that represents edge information to be added between two
- tasks/operators. Has shorthand factory functions, like Label("hooray").
+ Class that represents edge information to be added between two
tasks/operators.
+
+ Has shorthand factory functions, like Label("hooray").
Current implementation supports
t1 >> Label("Success route") >> t2
@@ -77,8 +78,9 @@ class EdgeModifier(DependencyMixin):
def _convert_streams_to_task_groups(self):
"""
- Both self._upstream and self._downstream are required to determine if
- we should convert a node to a TaskGroup or leave it as a DAGNode.
+ Convert a node to a TaskGroup or leave it as a DAGNode.
+
+ Requires both self._upstream and self._downstream.
To do this, we keep a set of group_ids seen among the streams. If we
find that
the nodes are from the same TaskGroup, we will leave them as DAGNodes
and not
@@ -121,8 +123,7 @@ class EdgeModifier(DependencyMixin):
edge_modifier: EdgeModifier | None = None,
):
"""
- Sets the given task/list onto the upstream attribute, and then checks
if
- we have both sides so we can resolve the relationship.
+ Set the given task/list onto the upstream attribute, then attempt to
resolve the relationship.
Providing this also provides << via DependencyMixin.
"""
@@ -139,8 +140,7 @@ class EdgeModifier(DependencyMixin):
edge_modifier: EdgeModifier | None = None,
):
"""
- Sets the given task/list onto the downstream attribute, and then
checks if
- we have both sides so we can resolve the relationship.
+ Set the given task/list onto the downstream attribute, then attempt to
resolve the relationship.
Providing this also provides >> via DependencyMixin.
"""
@@ -154,10 +154,7 @@ class EdgeModifier(DependencyMixin):
def update_relative(
self, other: DependencyMixin, upstream: bool = True, edge_modifier:
EdgeModifier | None = None
) -> None:
- """
- Called if we're not the "main" side of a relationship; we still run the
- same logic, though.
- """
+ """Called if we're not the "main" side of a relationship; we still run
the same logic, though."""
if upstream:
self.set_upstream(other)
else:
diff --git a/airflow/utils/email.py b/airflow/utils/email.py
index e807b8f75d..37e028fbd9 100644
--- a/airflow/utils/email.py
+++ b/airflow/utils/email.py
@@ -321,9 +321,10 @@ def _get_smtp_connection(host: str, port: int, timeout:
int, with_ssl: bool) ->
def _get_email_list_from_str(addresses: str) -> list[str]:
"""
- Extract a list of email addresses from a string. The string
- can contain multiple email addresses separated by
- any of the following delimiters: ',' or ';'.
+ Extract a list of email addresses from a string.
+
+ The string can contain multiple email addresses separated
+ by any of the following delimiters: ',' or ';'.
:param addresses: A string containing one or more email addresses.
:return: A list of email addresses.
diff --git a/airflow/utils/file.py b/airflow/utils/file.py
index 08b4048ad5..fb17ee6a8d 100644
--- a/airflow/utils/file.py
+++ b/airflow/utils/file.py
@@ -41,8 +41,9 @@ class _IgnoreRule(Protocol):
@staticmethod
def compile(pattern: str, base_dir: Path, definition_file: Path) ->
_IgnoreRule | None:
"""
- Build an ignore rule from the supplied pattern where base_dir
- and definition_file should be absolute paths.
+ Build an ignore rule from the supplied pattern.
+
+ ``base_dir`` and ``definition_file`` should be absolute paths.
"""
@staticmethod
@@ -134,8 +135,9 @@ def TemporaryDirectory(*args, **kwargs):
def mkdirs(path, mode):
"""
- Creates the directory specified by path, creating intermediate directories
- as necessary. If directory already exists, this is a no-op.
+ Creates the directory specified by path, creating intermediate directories
as necessary.
+
+ If directory already exists, this is a no-op.
:param path: The directory to create
:param mode: The mode to give to the directory e.g. 0o755, ignores umask
@@ -164,10 +166,7 @@ def correct_maybe_zipped(fileloc: str | Path) -> str |
Path:
def correct_maybe_zipped(fileloc: None | str | Path) -> None | str | Path:
- """
- If the path contains a folder with a .zip suffix, then
- the folder is treated as a zip archive and path to zip is returned.
- """
+ """If the path contains a folder with a .zip suffix, treat it as a zip
archive and return path."""
if not fileloc:
return fileloc
search_ = ZIP_REGEX.search(str(fileloc))
@@ -182,8 +181,10 @@ def correct_maybe_zipped(fileloc: None | str | Path) ->
None | str | Path:
def open_maybe_zipped(fileloc, mode="r"):
"""
- Opens the given file. If the path contains a folder with a .zip suffix,
then
- the folder is treated as a zip archive, opening the file inside the
archive.
+ Opens the given file.
+
+ If the path contains a folder with a .zip suffix, then the folder
+ is treated as a zip archive, opening the file inside the archive.
:return: a file object, as in `open`, or as in `ZipFile.open`.
"""
@@ -332,6 +333,7 @@ COMMENT_PATTERN = re2.compile(r"\s*#.*")
def might_contain_dag(file_path: str, safe_mode: bool, zip_file:
zipfile.ZipFile | None = None) -> bool:
"""
Check whether a Python file contains Airflow DAGs.
+
When safe_mode is off (with False value), this function always returns
True.
If might_contain_dag_callable isn't specified, it uses airflow default
heuristic
diff --git a/airflow/utils/hashlib_wrapper.py b/airflow/utils/hashlib_wrapper.py
index 25f02e5c3a..09850c565c 100644
--- a/airflow/utils/hashlib_wrapper.py
+++ b/airflow/utils/hashlib_wrapper.py
@@ -28,11 +28,9 @@ from airflow import PY39
def md5(__string: ReadableBuffer = b"") -> hashlib._Hash:
"""
- Safely allows calling the hashlib.md5 function with the "usedforsecurity"
disabled
- when specified in the configuration.
+ Safely allows calling the hashlib.md5 function when "usedforsecurity" is
disabled in the configuration.
- :param string: The data to hash.
- Default to empty str byte.
+ :param string: The data to hash. Default to empty str byte.
:return: The hashed value.
"""
if PY39:
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index a29cf07a59..b900ca1033 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -120,10 +120,7 @@ def is_container(obj: Any) -> bool:
def as_tuple(obj: Any) -> tuple:
- """
- If obj is a container, returns obj as a tuple.
- Otherwise, returns a tuple containing obj.
- """
+ """Return obj as a tuple if obj is a container, otherwise return a tuple
containing obj."""
if is_container(obj):
return tuple(obj)
else:
@@ -139,10 +136,7 @@ def chunks(items: list[T], chunk_size: int) ->
Generator[list[T], None, None]:
def reduce_in_chunks(fn: Callable[[S, list[T]], S], iterable: list[T],
initializer: S, chunk_size: int = 0):
- """
- Reduce the given list of items by splitting it into chunks
- of the given size and passing each chunk through the reducer.
- """
+ """Split the list of items into chunks of a given size and pass each chunk
through the reducer."""
if len(iterable) == 0:
return initializer
if chunk_size == 0:
@@ -172,8 +166,7 @@ def parse_template_string(template_string: str) ->
tuple[str | None, jinja2.Temp
def render_log_filename(ti: TaskInstance, try_number, filename_template) ->
str:
"""
- Given task instance, try_number, filename_template, return the rendered log
- filename.
+ Given task instance, try_number, filename_template, return the rendered
log filename.
:param ti: task instance
:param try_number: try_number of the task
@@ -327,8 +320,7 @@ def at_most_one(*args) -> bool:
def prune_dict(val: Any, mode="strict"):
"""
- Given dict ``val``, returns new dict based on ``val`` with all
- empty elements removed.
+ Given dict ``val``, returns new dict based on ``val`` with all empty
elements removed.
What constitutes "empty" is controlled by the ``mode`` parameter. If mode
is 'strict'
then only ``None`` elements will be removed. If mode is ``truthy``, then
element ``x``
diff --git a/airflow/utils/json.py b/airflow/utils/json.py
index 3a0a70fb75..7f05c8778d 100644
--- a/airflow/utils/json.py
+++ b/airflow/utils/json.py
@@ -105,11 +105,7 @@ class XComEncoder(json.JSONEncoder):
class XComDecoder(json.JSONDecoder):
- """
- This decoder deserializes dicts to objects if they contain
- the `__classname__` key otherwise it will return the dict
- as is.
- """
+ """Deserialize dicts to objects if they contain the `__classname__` key,
otherwise return the dict."""
def __init__(self, *args, **kwargs) -> None:
if not kwargs.get("object_hook"):
diff --git a/airflow/utils/log/colored_log.py b/airflow/utils/log/colored_log.py
index aea7c255e6..3afa39410d 100644
--- a/airflow/utils/log/colored_log.py
+++ b/airflow/utils/log/colored_log.py
@@ -42,9 +42,10 @@ BOLD_OFF = esc("22")
class CustomTTYColoredFormatter(TTYColoredFormatter, TimezoneAware):
"""
- Custom log formatter which extends `colored.TTYColoredFormatter`
- by adding attributes to message arguments and coloring error
- traceback.
+ Custom log formatter.
+
+ Extends `colored.TTYColoredFormatter` by adding attributes
+ to message arguments and coloring error traceback.
"""
def __init__(self, *args, **kwargs):
diff --git a/airflow/utils/log/file_processor_handler.py
b/airflow/utils/log/file_processor_handler.py
index 79efb387a3..cda24cdd06 100644
--- a/airflow/utils/log/file_processor_handler.py
+++ b/airflow/utils/log/file_processor_handler.py
@@ -30,9 +30,10 @@ from airflow.utils.log.non_caching_file_handler import
NonCachingFileHandler
class FileProcessorHandler(logging.Handler):
"""
- FileProcessorHandler is a python log handler that handles
- dag processor logs. It creates and delegates log handling
- to `logging.FileHandler` after receiving dag processor context.
+ FileProcessorHandler is a python log handler that handles dag processor
logs.
+
+ It creates and delegates log handling to `logging.FileHandler`
+ after receiving dag processor context.
:param base_log_folder: Base log folder to place logs.
:param filename_template: template filename string
@@ -108,8 +109,9 @@ class FileProcessorHandler(logging.Handler):
def _symlink_latest_log_directory(self):
"""
- Create symbolic link to the current day's log directory to
- allow easy access to the latest scheduler log files.
+ Create symbolic link to the current day's log directory.
+
+ Allows easy access to the latest scheduler log files.
:return: None
"""
diff --git a/airflow/utils/log/file_task_handler.py
b/airflow/utils/log/file_task_handler.py
index 9184a20420..5d791aaa0c 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -134,10 +134,10 @@ def _interleave_logs(*logs):
class FileTaskHandler(logging.Handler):
"""
- FileTaskHandler is a python log handler that handles and reads
- task instance logs. It creates and delegates log handling
- to `logging.FileHandler` after receiving task instance context.
- It reads logs from task instance's host machine.
+ FileTaskHandler is a python log handler that handles and reads task
instance logs.
+
+ It creates and delegates log handling to `logging.FileHandler` after
receiving task
+ instance context. It reads logs from task instance's host machine.
:param base_log_folder: Base log folder to place logs.
:param filename_template: template filename string
@@ -278,8 +278,7 @@ class FileTaskHandler(logging.Handler):
metadata: dict[str, Any] | None = None,
):
"""
- Template method that contains custom logic of reading
- logs given the try_number.
+ Template method that contains custom logic of reading logs given the
try_number.
:param ti: task instance record
:param try_number: current try_number to read log from
@@ -462,8 +461,9 @@ class FileTaskHandler(logging.Handler):
def _init_file(self, ti):
"""
- Create log directory and give it permissions that are configured. See
above _prepare_log_folder
- method for more detailed explanation.
+ Create log directory and give it permissions that are configured.
+
+ See above _prepare_log_folder method for more detailed explanation.
:param ti: task instance object
:return: relative log path of the given task instance
diff --git a/airflow/utils/log/logging_mixin.py
b/airflow/utils/log/logging_mixin.py
index 646c73f1b4..97e1a29ff6 100644
--- a/airflow/utils/log/logging_mixin.py
+++ b/airflow/utils/log/logging_mixin.py
@@ -54,10 +54,7 @@ def __getattr__(name):
def remove_escape_codes(text: str) -> str:
- """
- Remove ANSI escapes codes from string. It's used to remove
- "colors" from log messages.
- """
+ """Remove ANSI escapes codes from string; used to remove "colors" from log
messages."""
return ANSI_ESCAPE.sub("", text)
@@ -118,15 +115,15 @@ class ExternalLoggingMixin:
# IO generics (and apparently it has not even been intended)
# See more: https://giters.com/python/typeshed/issues/6077
class StreamLogWriter(IOBase, IO[str]): # type: ignore[misc]
- """Allows to redirect stdout and stderr to logger."""
+ """
+ Allows to redirect stdout and stderr to logger.
+
+ :param log: The log level method to write to, ie. log.debug, log.warning
+ """
encoding: None = None
def __init__(self, logger, level):
- """
- :param log: The log level method to write to, ie. log.debug,
log.warning
- :return:
- """
self.logger = logger
self.level = level
self._buffer = ""
@@ -141,8 +138,9 @@ class StreamLogWriter(IOBase, IO[str]): # type:
ignore[misc]
@property
def closed(self):
"""
- Returns False to indicate that the stream is not closed, as it will be
- open for the duration of Airflow's lifecycle.
+ Return False to indicate that the stream is not closed.
+
+ Streams will be open for the duration of Airflow's lifecycle.
For compatibility with the io.IOBase interface.
"""
@@ -174,6 +172,7 @@ class StreamLogWriter(IOBase, IO[str]): # type:
ignore[misc]
def isatty(self):
"""
Returns False to indicate the fd is not connected to a tty(-like)
device.
+
For compatibility reasons.
"""
return False
@@ -181,8 +180,10 @@ class StreamLogWriter(IOBase, IO[str]): # type:
ignore[misc]
class RedirectStdHandler(StreamHandler):
"""
+ Custom StreamHandler that uses current sys.stderr/stdout as the stream for
logging.
+
This class is like a StreamHandler using sys.stderr/stdout, but uses
- whatever sys.stderr/stderr is currently set to rather than the value of
+ whatever sys.stderr/stdout is currently set to rather than the value of
sys.stderr/stdout at handler construction time, except when running a
task in a kubernetes executor pod.
"""
diff --git a/airflow/utils/log/non_caching_file_handler.py
b/airflow/utils/log/non_caching_file_handler.py
index 46fe69a99b..aa0ca9864e 100644
--- a/airflow/utils/log/non_caching_file_handler.py
+++ b/airflow/utils/log/non_caching_file_handler.py
@@ -36,12 +36,13 @@ def make_file_io_non_caching(io: IO[str]) -> IO[str]:
class NonCachingFileHandler(FileHandler):
"""
- This is an extension of the python FileHandler that advises the Kernel to
not cache the file
- in PageCache when it is written. While there is nothing wrong with such
cache (it will be cleaned
- when memory is needed), it causes ever-growing memory usage when scheduler
is running as it keeps
- on writing new log files and the files are not rotated later on. This
might lead to confusion
- for our users, who are monitoring memory usage of Scheduler - without
realising that it is
- harmless and expected in this case.
+ An extension of FileHandler, advises the Kernel to not cache the file in
PageCache when it is written.
+
+ While there is nothing wrong with such cache (it will be cleaned when
memory is needed), it
+ causes ever-growing memory usage when scheduler is running as it keeps on
writing new log
+ files and the files are not rotated later on. This might lead to confusion
for our users,
+ who are monitoring memory usage of Scheduler - without realising that it
is harmless and
+ expected in this case.
See https://github.com/apache/airflow/issues/14924
@@ -54,12 +55,13 @@ class NonCachingFileHandler(FileHandler):
class NonCachingRotatingFileHandler(RotatingFileHandler):
"""
- This is an extension of the python RotatingFileHandler that advises the
Kernel to not cache the file
- in PageCache when it is written. While there is nothing wrong with such
cache (it will be cleaned
- when memory is needed), it causes ever-growing memory usage when scheduler
is running as it keeps
- on writing new log files and the files are not rotated later on. This
might lead to confusion
- for our users, who are monitoring memory usage of Scheduler - without
realising that it is
- harmless and expected in this case.
+ An extension of RotatingFileHandler, advises the Kernel to not cache the
file in PageCache when written.
+
+ While there is nothing wrong with such cache (it will be cleaned when
memory is needed), it
+ causes ever-growing memory usage when scheduler is running as it keeps on
writing new log
+ files and the files are not rotated later on. This might lead to confusion
for our users,
+ who are monitoring memory usage of Scheduler - without realising that it
is harmless and
+ expected in this case.
See https://github.com/apache/airflow/issues/27065
diff --git a/airflow/utils/log/trigger_handler.py
b/airflow/utils/log/trigger_handler.py
index 50756cb048..54f8ace65e 100644
--- a/airflow/utils/log/trigger_handler.py
+++ b/airflow/utils/log/trigger_handler.py
@@ -67,8 +67,7 @@ class DropTriggerLogsFilter(logging.Filter):
class TriggererHandlerWrapper(logging.Handler):
"""
- Wrap inheritors of FileTaskHandler and direct log messages
- to them based on trigger_id.
+ Wrap inheritors of FileTaskHandler and direct log messages to them based
on trigger_id.
:meta private:
"""
diff --git a/airflow/utils/mixins.py b/airflow/utils/mixins.py
index d157c7f078..e3c6a8efec 100644
--- a/airflow/utils/mixins.py
+++ b/airflow/utils/mixins.py
@@ -35,8 +35,9 @@ class MultiprocessingStartMethodMixin:
def _get_multiprocessing_start_method(self) -> str:
"""
- Determine method of creating new processes by checking if the
- mp_start_method is set in configs, else, it uses the OS default.
+ Determine method of creating new processes.
+
+ Checks if the mp_start_method is set in configs, else, it uses the OS
default.
"""
if conf.has_option("core", "mp_start_method"):
return conf.get_mandatory_value("core", "mp_start_method")
diff --git a/airflow/utils/module_loading.py b/airflow/utils/module_loading.py
index 8decbc6a61..d81ab65e85 100644
--- a/airflow/utils/module_loading.py
+++ b/airflow/utils/module_loading.py
@@ -25,8 +25,9 @@ from typing import Callable
def import_string(dotted_path: str):
"""
- Import a dotted module path and return the attribute/class designated by
the
- last name in the path. Raise ImportError if the import failed.
+ Import a dotted module path and return the attribute/class designated by
the last name in the path.
+
+ Raise ImportError if the import failed.
"""
try:
module_path, class_name = dotted_path.rsplit(".", 1)
diff --git a/airflow/utils/net.py b/airflow/utils/net.py
index 57bd9008b9..992aee67e8 100644
--- a/airflow/utils/net.py
+++ b/airflow/utils/net.py
@@ -26,7 +26,9 @@ from airflow.configuration import conf
# patched version of socket.getfqdn() - see
https://github.com/python/cpython/issues/49254
@lru_cache(maxsize=None)
def getfqdn(name=""):
- """Get fully qualified domain name from name.
+ """
+ Get fully qualified domain name from name.
+
An empty argument is interpreted as meaning the local host.
"""
name = name.strip()
@@ -50,8 +52,5 @@ def get_host_ip_address():
def get_hostname():
- """
- Fetch the hostname using the callable from the config or using
- `airflow.utils.net.getfqdn` as a fallback.
- """
+ """Fetch the hostname using the callable from config or use
`airflow.utils.net.getfqdn` as a fallback."""
return conf.getimport("core", "hostname_callable",
fallback="airflow.utils.net.getfqdn")()
diff --git a/airflow/utils/operator_helpers.py
b/airflow/utils/operator_helpers.py
index 0390edc81c..20f272f4f3 100644
--- a/airflow/utils/operator_helpers.py
+++ b/airflow/utils/operator_helpers.py
@@ -62,6 +62,8 @@ AIRFLOW_VAR_NAME_FORMAT_MAPPING = {
def context_to_airflow_vars(context: Mapping[str, Any], in_env_var_format:
bool = False) -> dict[str, str]:
"""
+ Return values used to externally reconstruct relations between dags,
dag_runs, tasks and task_instances.
+
Given a context, this function provides a dictionary of values that can be
used to
externally reconstruct relations between dags, dag_runs, tasks and
task_instances.
Default to abc.def.ghi format and can be made to ABC_DEF_GHI format if
@@ -185,12 +187,10 @@ def determine_kwargs(
kwargs: Mapping[str, Any],
) -> Mapping[str, Any]:
"""
- Inspect the signature of a given callable to determine which arguments in
kwargs need
- to be passed to the callable.
+ Inspect the signature of a callable to determine which kwargs need to be
passed to the callable.
:param func: The callable that you want to invoke
- :param args: The positional arguments that needs to be passed to the
callable, so we
- know how many to skip.
+ :param args: The positional arguments that need to be passed to the
callable, so we know how many to skip.
:param kwargs: The keyword arguments that need to be filtered before
passing to the callable.
:return: A dictionary which contains the keyword arguments that are
compatible with the callable.
"""
@@ -199,6 +199,8 @@ def determine_kwargs(
def make_kwargs_callable(func: Callable[..., R]) -> Callable[..., R]:
"""
+ Creates a new callable that only forwards necessary arguments from any
provided input.
+
Make a new callable that can accept any number of positional or keyword
arguments
but only forwards those required by the given callable func.
"""
diff --git a/airflow/utils/operator_resources.py
b/airflow/utils/operator_resources.py
index 638034a81a..cd6a225244 100644
--- a/airflow/utils/operator_resources.py
+++ b/airflow/utils/operator_resources.py
@@ -70,10 +70,7 @@ class Resource:
@property
def qty(self):
- """
- The number of units of the specified resource that are required for
- execution of the operator.
- """
+ """The number of units of the specified resource that are required for
execution of the operator."""
return self._qty
def to_dict(self):
@@ -114,8 +111,9 @@ class GpuResource(Resource):
class Resources:
"""
- The resources required by an operator. Resources that are not specified
will use the
- default values from the airflow config.
+ The resources required by an operator.
+
+ Resources that are not specified will use the default values from the
airflow config.
:param cpus: The number of cpu cores that are required
:param ram: The amount of RAM required
diff --git a/airflow/utils/platform.py b/airflow/utils/platform.py
index 691aac033b..72906683f6 100644
--- a/airflow/utils/platform.py
+++ b/airflow/utils/platform.py
@@ -32,10 +32,7 @@ log = logging.getLogger(__name__)
def is_tty():
- """
- Checks if the standard output is connected (is associated with a terminal
device) to a tty(-like)
- device.
- """
+ """Check if stdout is connected (is associated with a terminal device) to
a tty(-like) device."""
if not hasattr(sys.stdout, "isatty"):
return False
return sys.stdout.isatty()
@@ -69,8 +66,7 @@ def get_airflow_git_version():
@cache
def getuser() -> str:
"""
- Gets the username associated with the current user, or error with a nice
- error message if there's no current user.
+ Get the username of the current user, or error with a nice error message
if there's no current user.
We don't want to fall back to os.getuid() because not having a username
probably means the rest of the user environment is wrong (e.g. no $HOME).
diff --git a/airflow/utils/process_utils.py b/airflow/utils/process_utils.py
index 663f0f496c..74ad50bf74 100644
--- a/airflow/utils/process_utils.py
+++ b/airflow/utils/process_utils.py
@@ -300,6 +300,7 @@ def patch_environ(new_env_variables: dict[str, str]) ->
Generator[None, None, No
def check_if_pidfile_process_is_running(pid_file: str, process_name: str):
"""
Checks if a pidfile already exists and process is still running.
+
If process is dead then pidfile is removed.
:param pid_file: path to the pidfile
diff --git a/airflow/utils/session.py b/airflow/utils/session.py
index 0f8bdb7103..5c7e9eef50 100644
--- a/airflow/utils/session.py
+++ b/airflow/utils/session.py
@@ -61,6 +61,7 @@ def find_session_idx(func: Callable[PS, RT]) -> int:
def provide_session(func: Callable[PS, RT]) -> Callable[PS, RT]:
"""
Function decorator that provides a session if it isn't provided.
+
If you want to reuse a session or run the function as part of a
database transaction, you pass it to the function, if not this wrapper
will create one and close it for you.
diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py
index 04af5bddc3..499aa31c34 100644
--- a/airflow/utils/sqlalchemy.py
+++ b/airflow/utils/sqlalchemy.py
@@ -50,8 +50,8 @@ using_mysql = conf.get_mandatory_value("database",
"sql_alchemy_conn").lower().s
class UtcDateTime(TypeDecorator):
"""
- Almost equivalent to :class:`~sqlalchemy.types.TIMESTAMP` with
- ``timezone=True`` option, but it differs from that by:
+ Similar to :class:`~sqlalchemy.types.TIMESTAMP` with ``timezone=True``
option, with some differences.
+
- Never silently take naive :class:`~datetime.datetime`, instead it
always raise :exc:`ValueError` unless time zone aware value.
- :class:`~datetime.datetime` value's :attr:`~datetime.datetime.tzinfo`
@@ -86,11 +86,11 @@ class UtcDateTime(TypeDecorator):
def process_result_value(self, value, dialect):
"""
- Processes DateTimes from the DB making sure it is always
- returning UTC. Not using timezone.convert_to_utc as that
- converts to configured TIMEZONE while the DB might be
- running with some other setting. We assume UTC datetimes
- in the database.
+ Processes DateTimes from the DB making sure it is always returning UTC.
+
+ Not using timezone.convert_to_utc as that converts to configured
TIMEZONE
+ while the DB might be running with some other setting. We assume UTC
+ datetimes in the database.
"""
if value is not None:
if value.tzinfo is None:
@@ -110,8 +110,9 @@ class UtcDateTime(TypeDecorator):
class ExtendedJSON(TypeDecorator):
"""
- A version of the JSON column that uses the Airflow extended JSON
- serialization provided by airflow.serialization.
+ A version of the JSON column that uses the Airflow extended JSON
serialization.
+
+ See airflow.serialization.
"""
impl = Text
@@ -244,10 +245,11 @@ def ensure_pod_is_valid_after_unpickling(pod: V1Pod) ->
V1Pod | None:
class ExecutorConfigType(PickleType):
"""
- Adds special handling for K8s executor config. If we unpickle a k8s object
that was
- pickled under an earlier k8s library version, then the unpickled object
may throw an error
- when to_dict is called. To be more tolerant of version changes we convert
to JSON using
- Airflow's serializer before pickling.
+ Adds special handling for K8s executor config.
+
+ If we unpickle a k8s object that was pickled under an earlier k8s library
version, then
+ the unpickled object may throw an error when to_dict is called. To be
more tolerant of
+ version changes we convert to JSON using Airflow's serializer before
pickling.
"""
cache_ok = True
@@ -293,11 +295,11 @@ class ExecutorConfigType(PickleType):
def compare_values(self, x, y):
"""
- The TaskInstance.executor_config attribute is a pickled object that
may contain
- kubernetes objects. If the installed library version has changed
since the
- object was originally pickled, due to the underlying ``__eq__`` method
on these
- objects (which converts them to JSON), we may encounter attribute
errors. In this
- case we should replace the stored object.
+ The TaskInstance.executor_config attribute is a pickled object that
may contain kubernetes objects.
+
+ If the installed library version has changed since the object was
originally pickled,
+ due to the underlying ``__eq__`` method on these objects (which
converts them to JSON),
+ we may encounter attribute errors. In this case we should replace the
stored object.
From https://github.com/apache/airflow/pull/24356 we use our
serializer to store
k8s objects, but there could still be raw pickled k8s objects in the
database,
diff --git a/airflow/utils/state.py b/airflow/utils/state.py
index f18fd48c82..a0c828ee07 100644
--- a/airflow/utils/state.py
+++ b/airflow/utils/state.py
@@ -82,10 +82,7 @@ class DagRunState(str, Enum):
class State:
- """
- Static class with task instance state constants and color methods to
- avoid hardcoding.
- """
+ """Static class with task instance state constants and color methods to
avoid hardcoding."""
# Backwards-compat constants for code that does not yet use the enum
# These first three are shared by DagState and TaskState
diff --git a/airflow/utils/task_group.py b/airflow/utils/task_group.py
index 50fa5ca19a..3c3a01bc7d 100644
--- a/airflow/utils/task_group.py
+++ b/airflow/utils/task_group.py
@@ -15,10 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""
-A TaskGroup is a collection of closely related tasks on the same DAG that
should be grouped
-together when the DAG is displayed graphically.
-"""
+"""A collection of closely related tasks on the same DAG that should be
grouped together visually."""
from __future__ import annotations
import copy
@@ -53,8 +50,10 @@ if TYPE_CHECKING:
class TaskGroup(DAGNode):
"""
- A collection of tasks. When set_downstream() or set_upstream() are called
on the
- TaskGroup, it is applied across all tasks within the group if necessary.
+ A collection of tasks.
+
+ When set_downstream() or set_upstream() are called on the TaskGroup, it is
applied across
+ all tasks within the group if necessary.
:param group_id: a unique, meaningful id for the TaskGroup. group_id must
not conflict
with group_id of TaskGroup or task_id of tasks in the DAG. Root
TaskGroup has group_id
@@ -318,6 +317,7 @@ class TaskGroup(DAGNode):
) -> None:
"""
Call set_upstream/set_downstream for all root/leaf tasks within this
TaskGroup.
+
Update
upstream_group_ids/downstream_group_ids/upstream_task_ids/downstream_task_ids.
"""
if not isinstance(task_or_task_list, Sequence):
@@ -358,10 +358,7 @@ class TaskGroup(DAGNode):
return list(self.get_leaves())
def get_roots(self) -> Generator[BaseOperator, None, None]:
- """
- Returns a generator of tasks that are root tasks, i.e. those with no
upstream
- dependencies within the TaskGroup.
- """
+ """Return a generator of tasks with no upstream dependencies within
the TaskGroup."""
tasks = list(self)
ids = {x.task_id for x in tasks}
for task in tasks:
@@ -369,10 +366,7 @@ class TaskGroup(DAGNode):
yield task
def get_leaves(self) -> Generator[BaseOperator, None, None]:
- """
- Returns a generator of tasks that are leaf tasks, i.e. those with no
downstream
- dependencies within the TaskGroup.
- """
+ """Return a generator of tasks with no downstream dependencies within
the TaskGroup."""
tasks = list(self)
ids = {x.task_id for x in tasks}
@@ -393,10 +387,7 @@ class TaskGroup(DAGNode):
yield from recurse_for_first_non_setup_teardown(task)
def child_id(self, label):
- """
- Prefix label with group_id if prefix_group_id is True. Otherwise
return the label
- as-is.
- """
+ """Prefix label with group_id if prefix_group_id is True. Otherwise
return the label as-is."""
if self.prefix_group_id:
group_id = self.group_id
if group_id:
@@ -407,6 +398,8 @@ class TaskGroup(DAGNode):
@property
def upstream_join_id(self) -> str:
"""
+ Creates a unique ID for upstream dependencies of this TaskGroup.
+
If this TaskGroup has immediate upstream TaskGroups or tasks, a proxy
node called
upstream_join_id will be created in Graph view to join the outgoing
edges from this
TaskGroup to reduce the total number of edges needed to be displayed.
@@ -416,6 +409,8 @@ class TaskGroup(DAGNode):
@property
def downstream_join_id(self) -> str:
"""
+ Creates a unique ID for downstream dependencies of this TaskGroup.
+
If this TaskGroup has immediate downstream TaskGroups or tasks, a
proxy node called
downstream_join_id will be created in Graph view to join the outgoing
edges from this
TaskGroup to reduce the total number of edges needed to be displayed.
@@ -450,7 +445,8 @@ class TaskGroup(DAGNode):
def hierarchical_alphabetical_sort(self):
"""
- Sorts children in hierarchical alphabetical order:
+ Sorts children in hierarchical alphabetical order.
+
- groups in alphabetical order first
- tasks in alphabetical order after them.
@@ -462,8 +458,7 @@ class TaskGroup(DAGNode):
def topological_sort(self, _include_subdag_tasks: bool = False):
"""
- Sorts children in topographical order, such that a task comes after
any of its
- upstream dependencies.
+ Sorts children in topographical order, such that a task comes after
any of its upstream dependencies.
:return: list of tasks in topological order
"""
@@ -680,10 +675,7 @@ class TaskGroupContext:
def task_group_to_dict(task_item_or_group):
- """
- Create a nested dict representation of this TaskGroup and its children
used to construct
- the Graph.
- """
+ """Create a nested dict representation of this TaskGroup and its children
used to construct the Graph."""
from airflow.models.abstractoperator import AbstractOperator
if isinstance(task := task_item_or_group, AbstractOperator):
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index 2b65055f38..1db15ac612 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -241,6 +241,7 @@ def coerce_datetime(v: dt.datetime | None, tz: dt.tzinfo |
None = None) -> DateT
def td_format(td_object: None | dt.timedelta | float | int) -> str | None:
"""
Format a timedelta object or float/int into a readable string for time
duration.
+
For example timedelta(seconds=3752) would become `1h:2M:32s`.
If the time is less than a second, the return will be `<1s`.
"""
diff --git a/airflow/utils/types.py b/airflow/utils/types.py
index 788b072f39..0eab9b3b87 100644
--- a/airflow/utils/types.py
+++ b/airflow/utils/types.py
@@ -68,9 +68,6 @@ class DagRunType(str, enum.Enum):
class EdgeInfoType(TypedDict):
- """
- Represents extra metadata that the DAG can store about an edge,
- usually generated from an EdgeModifier.
- """
+ """Extra metadata that the DAG can store about an edge, usually generated
from an EdgeModifier."""
label: str | None