This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 54f1fb0574 Docstring improvements (#31375)
54f1fb0574 is described below
commit 54f1fb0574a6ecf8f415bbf6da1aaf6f1999bb29
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Fri May 19 12:55:34 2023 +0800
Docstring improvements (#31375)
---
airflow/api/common/mark_tasks.py | 1 -
airflow/models/dag.py | 44 +++---
airflow/models/dagrun.py | 7 +-
airflow/models/trigger.py | 8 +-
airflow/models/variable.py | 27 ++--
airflow/operators/datetime.py | 4 +-
airflow/operators/python.py | 12 +-
airflow/operators/subdag.py | 15 +-
airflow/operators/weekday.py | 50 +++----
.../backcompat/backwards_compat_converters.py | 28 +---
airflow/providers/google/go_module_utils.py | 7 +-
airflow/security/utils.py | 8 +-
airflow/serialization/helpers.py | 8 +-
airflow/serialization/pydantic/dag_run.py | 2 +-
airflow/serialization/pydantic/dataset.py | 8 +-
airflow/serialization/pydantic/job.py | 2 +-
airflow/serialization/pydantic/taskinstance.py | 2 +-
airflow/serialization/serializers/kubernetes.py | 6 +-
airflow/utils/cli_action_loggers.py | 23 ++-
airflow/utils/dates.py | 38 ++---
airflow/utils/log/timezone_aware.py | 16 +--
airflow/utils/process_utils.py | 8 +-
airflow/utils/sqlalchemy.py | 9 +-
airflow/utils/timezone.py | 42 +++---
airflow/www/extensions/init_appbuilder.py | 41 +++---
airflow/www/extensions/init_manifest_files.py | 15 +-
airflow/www/fab_security/manager.py | 36 ++---
dev/breeze/src/airflow_breeze/utils/confirm.py | 4 +-
.../airflow_breeze/utils/docker_command_utils.py | 30 ++--
dev/provider_packages/prepare_provider_packages.py | 159 +++++++++++----------
.../logging-monitoring/logging-tasks.rst | 2 +-
docs/exts/docs_build/code_utils.py | 5 +-
scripts/in_container/verify_providers.py | 88 +++++-------
tests/conftest.py | 64 ++++-----
tests/dags/test_logging_in_dag.py | 5 +-
tests/utils/test_cli_util.py | 5 +-
36 files changed, 384 insertions(+), 445 deletions(-)
diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py
index 83030efb48..df6b87d5c2 100644
--- a/airflow/api/common/mark_tasks.py
+++ b/airflow/api/common/mark_tasks.py
@@ -256,7 +256,6 @@ def verify_dagruns(
:param state: state of the dag_run to set if commit is True
:param session: session to use
:param current_task: current task
- :return:
"""
for dag_run in dag_runs:
dag_run.dag = current_task.subdag
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index b536510f7c..4009893fee 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2649,13 +2649,14 @@ class DAG(LoggingMixin):
"""
def add_logger_if_needed(ti: TaskInstance):
- """
- Add a formatted logger to the taskinstance so all logs are
surfaced to the command line instead
- of into a task file. Since this is a local test run, it is much
better for the user to see logs
- in the command line, rather than needing to search for a log file.
- Args:
- ti: The taskinstance that will receive a logger.
+ """Add a formatted logger to the task instance.
+
+ This allows all logs to surface to the command line, instead of
into
+ a task file. Since this is a local test run, it is much better for
+ the user to see logs in the command line, rather than needing to
+ search for a log file.
+ :param ti: The task instance that will receive a logger.
"""
format = logging.Formatter("[%(asctime)s]
{%(filename)s:%(lineno)d} %(levelname)s - %(message)s")
handler = logging.StreamHandler(sys.stdout)
@@ -3256,9 +3257,10 @@ class DagTag(Base):
class DagOwnerAttributes(Base):
- """
- Table defining different owner attributes. For example, a link for an
owner that will be passed as
- a hyperlink to the DAGs view.
+ """Table defining different owner attributes.
+
+ For example, a link for an owner that will be passed as a hyperlink to the
+ "DAGs" view.
"""
__tablename__ = "dag_owner_attributes"
@@ -3835,17 +3837,17 @@ def _get_or_create_dagrun(
run_id: str,
session: Session,
) -> DagRun:
- """
- Create a DAGRun, but only after clearing the previous instance of said
dagrun to prevent collisions.
- This function is only meant for the `dag.test` function as a helper
function.
+ """Create a DAG run, replacing an existing instance if needed to prevent
collisions.
+
+ This function is only meant to be used by :meth:`DAG.test` as a helper
function.
+
+ :param dag: DAG to be used to find run.
+ :param conf: Configuration to pass to newly created run.
+ :param start_date: Start date of new run.
+ :param execution_date: Logical date for finding an existing run.
+ :param run_id: Run ID for the new DAG run.
- :param dag: Dag to be used to find dagrun
- :param conf: configuration to pass to newly created dagrun
- :param start_date: start date of new dagrun, defaults to execution_date
- :param execution_date: execution_date for finding the dagrun
- :param run_id: run_id to pass to new dagrun
- :param session: sqlalchemy session
- :return:
+ :return: The newly created DAG run.
"""
log.info("dagrun id: %s", dag.dag_id)
dr: DagRun = (
@@ -3862,7 +3864,7 @@ def _get_or_create_dagrun(
run_id=run_id,
start_date=start_date or execution_date,
session=session,
- conf=conf, # type: ignore
+ conf=conf,
)
- log.info("created dagrun " + str(dr))
+ log.info("created dagrun %s", dr)
return dr
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 5647b3410d..42845b34bd 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -97,9 +97,10 @@ def _creator_note(val):
class DagRun(Base, LoggingMixin):
- """
- DagRun describes an instance of a Dag. It can be created
- by the scheduler (for regular runs) or by an external trigger.
+ """Invocation instance of a DAG.
+
+ A DAG run can be created by the scheduler (i.e. scheduled runs), or by an
+ external trigger (i.e. manual runs).
"""
__tablename__ = "dag_run"
diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index a2791f0c27..8a02d05357 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -112,9 +112,11 @@ class Trigger(Base):
@internal_api_call
@provide_session
def clean_unused(cls, session: Session = NEW_SESSION) -> None:
- """
- Deletes all triggers that have no tasks/DAGs dependent on them
- (triggers have a one-to-many relationship to both).
+ """Deletes all triggers that have no tasks dependent on them.
+
+ Triggers have a one-to-many relationship to task instances, so we need
+ to clean those up first. Afterwards we can drop the triggers not
+ referenced by anyone.
"""
# Update all task instances with trigger IDs that are not DEFERRED to
remove them
for attempt in run_with_db_retries():
diff --git a/airflow/models/variable.py b/airflow/models/variable.py
index 2659592557..843f60f903 100644
--- a/airflow/models/variable.py
+++ b/airflow/models/variable.py
@@ -127,8 +127,7 @@ class Variable(Base, LoggingMixin):
default_var: Any = __NO_DEFAULT_SENTINEL,
deserialize_json: bool = False,
) -> Any:
- """
- Gets a value for an Airflow Variable Key.
+ """Gets a value for an Airflow Variable Key
:param key: Variable Key
:param default_var: Default value of the Variable if the Variable
doesn't exist
@@ -158,16 +157,15 @@ class Variable(Base, LoggingMixin):
description: str | None = None,
serialize_json: bool = False,
session: Session = None,
- ):
- """
- Sets a value for an Airflow Variable with a given Key.
- This operation will overwrite an existing variable.
+ ) -> None:
+ """Sets a value for an Airflow Variable with a given Key.
+
+ This operation overwrites an existing variable.
:param key: Variable Key
:param value: Value to set for the Variable
:param description: Description of the Variable
:param serialize_json: Serialize the value to a JSON string
- :param session: SQL Alchemy Sessions
"""
# check if the secret exists in the custom secrets' backend.
Variable.check_for_write_conflict(key)
@@ -188,14 +186,12 @@ class Variable(Base, LoggingMixin):
value: Any,
serialize_json: bool = False,
session: Session = None,
- ):
- """
- Updates a given Airflow Variable with the Provided value.
+ ) -> None:
+ """Updates a given Airflow Variable with the Provided value.
:param key: Variable Key
:param value: Value to set for the Variable
:param serialize_json: Serialize the value to a JSON string
- :param session: SQL Alchemy Session
"""
Variable.check_for_write_conflict(key)
@@ -212,11 +208,9 @@ class Variable(Base, LoggingMixin):
@provide_session
@internal_api_call
def delete(key: str, session: Session = None) -> int:
- """
- Delete an Airflow Variable for a given key.
+ """Delete an Airflow Variable for a given key.
- :param key: Variable Key
- :param session: SQL Alchemy Sessions
+ :param key: Variable Keys
"""
return session.query(Variable).filter(Variable.key == key).delete()
@@ -228,8 +222,7 @@ class Variable(Base, LoggingMixin):
@staticmethod
def check_for_write_conflict(key: str) -> None:
- """
- Logs a warning if a variable exists outside of the metastore.
+ """Logs a warning if a variable exists outside of the metastore.
If we try to write a variable to the metastore while the same key
exists in an environment variable or custom secrets backend, then
diff --git a/airflow/operators/datetime.py b/airflow/operators/datetime.py
index f56fc2d3d3..7c3c648130 100644
--- a/airflow/operators/datetime.py
+++ b/airflow/operators/datetime.py
@@ -27,8 +27,8 @@ from airflow.utils.context import Context
class BranchDateTimeOperator(BaseBranchOperator):
- """
- Branches into one of two lists of tasks depending on the current datetime.
+ """Branches into one of two lists of tasks depending on the current
datetime.
+
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:BranchDateTimeOperator`.
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 7a632114c9..9300d6ca90 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -51,15 +51,10 @@ from airflow.utils.python_virtualenv import
prepare_virtualenv, write_python_scr
def task(python_callable: Callable | None = None, multiple_outputs: bool |
None = None, **kwargs):
- """
- Deprecated function.
- Calls @task.python and allows users to turn a python function into
- an Airflow task. Please use the following instead.
-
- from airflow.decorators import task
+ """Deprecated. Use :func:`airflow.decorators.task` instead.
- @task
- def my_task()
+ Calls ``@task.python`` and allows users to turn a Python function into
+ an Airflow task.
:param python_callable: A reference to an object that is callable
:param op_kwargs: a dictionary of keyword arguments that will get unpacked
@@ -69,7 +64,6 @@ def task(python_callable: Callable | None = None,
multiple_outputs: bool | None
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. Dict will unroll to xcom values with
keys as keys.
Defaults to False.
- :return:
"""
# To maintain backwards compatibility, we import the task object into this
file
# This prevents breakages in dags that use `from airflow.operators.python
import task`
diff --git a/airflow/operators/subdag.py b/airflow/operators/subdag.py
index 52f4b27403..9ac595afab 100644
--- a/airflow/operators/subdag.py
+++ b/airflow/operators/subdag.py
@@ -127,15 +127,14 @@ class SubDagOperator(BaseSensorOperator):
)
return dag_runs[0] if dag_runs else None
- def _reset_dag_run_and_task_instances(self, dag_run, execution_date):
- """
- Set task instance states to allow for execution.
- Set the DagRun state to RUNNING and set the failed TaskInstances to
None state
- for scheduler to pick up.
+ def _reset_dag_run_and_task_instances(self, dag_run: DagRun,
execution_date: datetime) -> None:
+ """Set task instance states to allow for execution.
+
+ The state of the DAG run will be set to RUNNING, and failed task
+ instances to ``None`` for scheduler to pick up.
- :param dag_run: DAG run
- :param execution_date: Execution date
- :return: None
+ :param dag_run: DAG run to reset.
+ :param execution_date: Execution date to select task instances.
"""
with create_session() as session:
dag_run.state = State.RUNNING
diff --git a/airflow/operators/weekday.py b/airflow/operators/weekday.py
index d35204d31c..ea57ac65bd 100644
--- a/airflow/operators/weekday.py
+++ b/airflow/operators/weekday.py
@@ -28,56 +28,58 @@ from airflow.utils.weekday import WeekDay
class BranchDayOfWeekOperator(BaseBranchOperator):
- """
- Branches into one of two lists of tasks depending on the current day.
- For more information on how to use this operator, take a look at the guide.
+ """Branches into one of two lists of tasks depending on the current day.
+ For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:BranchDayOfWeekOperator`
- **Example** (with single day): ::
+ **Example** (with single day):
+
+ .. code-block:: python
from airflow.operators.empty import EmptyOperator
- monday = EmptyOperator(task_id='monday')
- other_day = EmptyOperator(task_id='other_day')
+ monday = EmptyOperator(task_id="monday")
+ other_day = EmptyOperator(task_id="other_day")
monday_check = DayOfWeekSensor(
- task_id='monday_check',
- week_day='Monday',
+ task_id="monday_check",
+ week_day="Monday",
use_task_logical_date=True,
- follow_task_ids_if_true='monday',
- follow_task_ids_if_false='other_day',
- dag=dag)
+ follow_task_ids_if_true="monday",
+ follow_task_ids_if_false="other_day",
+ )
monday_check >> [monday, other_day]
- **Example** (with :class:`~airflow.utils.weekday.WeekDay` enum): ::
+ **Example** (with :class:`~airflow.utils.weekday.WeekDay` enum):
+
+ .. code-block:: python
# import WeekDay Enum
from airflow.utils.weekday import WeekDay
from airflow.operators.empty import EmptyOperator
- workday = EmptyOperator(task_id='workday')
- weekend = EmptyOperator(task_id='weekend')
+ workday = EmptyOperator(task_id="workday")
+ weekend = EmptyOperator(task_id="weekend")
weekend_check = BranchDayOfWeekOperator(
- task_id='weekend_check',
+ task_id="weekend_check",
week_day={WeekDay.SATURDAY, WeekDay.SUNDAY},
use_task_logical_date=True,
- follow_task_ids_if_true='weekend',
- follow_task_ids_if_false='workday',
- dag=dag)
+ follow_task_ids_if_true="weekend",
+ follow_task_ids_if_false="workday",
+ )
# add downstream dependencies as you would do with any branch operator
weekend_check >> [workday, weekend]
:param follow_task_ids_if_true: task id or task ids to follow if criteria
met
:param follow_task_ids_if_false: task id or task ids to follow if criteria
does not met
:param week_day: Day of the week to check (full name). Optionally, a set
- of days can also be provided using a set.
- Example values:
+ of days can also be provided using a set. Example values:
- * ``"MONDAY"``,
- * ``{"Saturday", "Sunday"}``
- * ``{WeekDay.TUESDAY}``
- * ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``
+ * ``"MONDAY"``,
+ * ``{"Saturday", "Sunday"}``
+ * ``{WeekDay.TUESDAY}``
+ * ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``
To use `WeekDay` enum, import it from `airflow.utils.weekday`
diff --git
a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
index 9d37da983e..91257ced2d 100644
---
a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
+++
b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
@@ -43,41 +43,33 @@ def _convert_from_dict(obj, new_class):
def convert_volume(volume) -> k8s.V1Volume:
- """
- Converts an airflow Volume object into a k8s.V1Volume
+ """Converts an airflow Volume object into a k8s.V1Volume
:param volume:
- :return: k8s.V1Volume
"""
return _convert_kube_model_object(volume, k8s.V1Volume)
def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount:
- """
- Converts an airflow VolumeMount object into a k8s.V1VolumeMount
+ """Converts an airflow VolumeMount object into a k8s.V1VolumeMount
:param volume_mount:
- :return: k8s.V1VolumeMount
"""
return _convert_kube_model_object(volume_mount, k8s.V1VolumeMount)
def convert_port(port) -> k8s.V1ContainerPort:
- """
- Converts an airflow Port object into a k8s.V1ContainerPort
+ """Converts an airflow Port object into a k8s.V1ContainerPort
:param port:
- :return: k8s.V1ContainerPort
"""
return _convert_kube_model_object(port, k8s.V1ContainerPort)
def convert_env_vars(env_vars) -> list[k8s.V1EnvVar]:
- """
- Converts a dictionary into a list of env_vars
+ """Converts a dictionary into a list of env_vars
:param env_vars:
- :return:
"""
if isinstance(env_vars, dict):
res = []
@@ -91,21 +83,17 @@ def convert_env_vars(env_vars) -> list[k8s.V1EnvVar]:
def convert_pod_runtime_info_env(pod_runtime_info_envs) -> k8s.V1EnvVar:
- """
- Converts a PodRuntimeInfoEnv into an k8s.V1EnvVar
+ """Converts a PodRuntimeInfoEnv into an k8s.V1EnvVar
:param pod_runtime_info_envs:
- :return:
"""
return _convert_kube_model_object(pod_runtime_info_envs, k8s.V1EnvVar)
def convert_image_pull_secrets(image_pull_secrets) ->
list[k8s.V1LocalObjectReference]:
- """
- Converts a PodRuntimeInfoEnv into an k8s.V1EnvVar
+ """Converts a PodRuntimeInfoEnv into an k8s.V1EnvVar
:param image_pull_secrets:
- :return:
"""
if isinstance(image_pull_secrets, str):
secrets = image_pull_secrets.split(",")
@@ -115,11 +103,9 @@ def convert_image_pull_secrets(image_pull_secrets) ->
list[k8s.V1LocalObjectRefe
def convert_configmap(configmaps) -> k8s.V1EnvFromSource:
- """
- Converts a str into an k8s.V1EnvFromSource
+ """Converts a str into an k8s.V1EnvFromSource
:param configmaps:
- :return:
"""
return
k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmaps))
diff --git a/airflow/providers/google/go_module_utils.py
b/airflow/providers/google/go_module_utils.py
index a05590dd0a..8e1f608191 100644
--- a/airflow/providers/google/go_module_utils.py
+++ b/airflow/providers/google/go_module_utils.py
@@ -24,12 +24,12 @@ from airflow.utils.process_utils import
execute_in_subprocess
def init_module(go_module_name: str, go_module_path: str) -> None:
- """Initialize a Go module. If a ``go.mod`` file already exists, this
function
- will do nothing.
+ """Initialize a Go module.
+
+ If a ``go.mod`` file already exists, this function will do nothing.
:param go_module_name: The name of the Go module to initialize.
:param go_module_path: The path to the directory containing the Go module.
- :return:
"""
if os.path.isfile(os.path.join(go_module_path, "go.mod")):
return
@@ -41,7 +41,6 @@ def install_dependencies(go_module_path: str) -> None:
"""Install dependencies for a Go module.
:param go_module_path: The path to the directory containing the Go module.
- :return:
"""
go_mod_tidy = ["go", "mod", "tidy"]
execute_in_subprocess(go_mod_tidy, cwd=go_module_path)
diff --git a/airflow/security/utils.py b/airflow/security/utils.py
index 46c31c2d74..1ffe0e0d29 100644
--- a/airflow/security/utils.py
+++ b/airflow/security/utils.py
@@ -41,11 +41,11 @@ from airflow.utils.net import get_hostname
def get_components(principal) -> list[str] | None:
- """
- Returns components retrieved from the kerberos principal.
- -> (short name, instance (FQDN), realm).
+ """Split the kerberos principal string into parts.
- ``principal`` .
+ :return: *None* if the principal is empty. Otherwise split the value into
+ parts. Assuming the principal string is valid, the return value should
+ contain three components: short name, instance (FQDN), and realm.
"""
if not principal:
return None
diff --git a/airflow/serialization/helpers.py b/airflow/serialization/helpers.py
index 07481c58dc..d8e6afdc0a 100644
--- a/airflow/serialization/helpers.py
+++ b/airflow/serialization/helpers.py
@@ -23,12 +23,10 @@ from airflow.settings import json
def serialize_template_field(template_field: Any) -> str | dict | list | int |
float:
- """
- Return a serializable representation of the templated_field.
- If a templated_field contains a Class or Instance for recursive
templating, store them
- as strings. If the templated_field is not recursive return the field.
+ """Return a serializable representation of the templated field.
- :param template_field: Task's Templated Field
+ If ``templated_field`` contains a class or instance that requires recursive
+ templating, store them as strings. Otherwise simply return the field as-is.
"""
def is_jsonable(x):
diff --git a/airflow/serialization/pydantic/dag_run.py
b/airflow/serialization/pydantic/dag_run.py
index b7ebceb826..a5ae6b4a22 100644
--- a/airflow/serialization/pydantic/dag_run.py
+++ b/airflow/serialization/pydantic/dag_run.py
@@ -46,6 +46,6 @@ class DagRunPydantic(BaseModelPydantic):
consumed_dataset_events: List[DatasetEventPydantic]
class Config:
- """Make sure it deals automatically with ORM classes of SQL Alchemy."""
+ """Make sure it deals automatically with SQLAlchemy ORM classes."""
orm_mode = True
diff --git a/airflow/serialization/pydantic/dataset.py
b/airflow/serialization/pydantic/dataset.py
index f3752f4ade..4ffcb6dc02 100644
--- a/airflow/serialization/pydantic/dataset.py
+++ b/airflow/serialization/pydantic/dataset.py
@@ -32,7 +32,7 @@ class DagScheduleDatasetReferencePydantic(BaseModelPydantic):
updated_at: datetime
class Config:
- """Make sure it deals automatically with ORM classes of SQL Alchemy."""
+ """Make sure it deals automatically with SQLAlchemy ORM classes."""
orm_mode = True
@@ -50,7 +50,7 @@ class TaskOutletDatasetReferencePydantic(BaseModelPydantic):
updated_at = datetime
class Config:
- """Make sure it deals automatically with ORM classes of SQL Alchemy."""
+ """Make sure it deals automatically with SQLAlchemy ORM classes."""
orm_mode = True
@@ -69,7 +69,7 @@ class DatasetPydantic(BaseModelPydantic):
producing_tasks: List[TaskOutletDatasetReferencePydantic]
class Config:
- """Make sure it deals automatically with ORM classes of SQL Alchemy."""
+ """Make sure it deals automatically with SQLAlchemy ORM classes."""
orm_mode = True
@@ -87,6 +87,6 @@ class DatasetEventPydantic(BaseModelPydantic):
dataset: DatasetPydantic
class Config:
- """Make sure it deals automatically with ORM classes of SQL Alchemy."""
+ """Make sure it deals automatically with SQLAlchemy ORM classes."""
orm_mode = True
diff --git a/airflow/serialization/pydantic/job.py
b/airflow/serialization/pydantic/job.py
index 99253e3b7a..b36a9826eb 100644
--- a/airflow/serialization/pydantic/job.py
+++ b/airflow/serialization/pydantic/job.py
@@ -47,6 +47,6 @@ class JobPydantic(BaseModelPydantic):
max_tis_per_query: Optional[int]
class Config:
- """Make sure it deals automatically with ORM classes of SQL Alchemy."""
+ """Make sure it deals automatically with SQLAlchemy ORM classes."""
orm_mode = True
diff --git a/airflow/serialization/pydantic/taskinstance.py
b/airflow/serialization/pydantic/taskinstance.py
index 92a282b6bd..236c42c260 100644
--- a/airflow/serialization/pydantic/taskinstance.py
+++ b/airflow/serialization/pydantic/taskinstance.py
@@ -58,7 +58,7 @@ class TaskInstancePydantic(BaseModelPydantic):
run_as_user: Optional[str]
class Config:
- """Make sure it deals automatically with ORM classes of SQL Alchemy."""
+ """Make sure it deals automatically with SQLAlchemy ORM classes."""
orm_mode = True
diff --git a/airflow/serialization/serializers/kubernetes.py
b/airflow/serialization/serializers/kubernetes.py
index 900d219da5..d38836affc 100644
--- a/airflow/serialization/serializers/kubernetes.py
+++ b/airflow/serialization/serializers/kubernetes.py
@@ -46,11 +46,9 @@ def serialize(o: object) -> tuple[U, str, int, bool]:
if isinstance(o, (k8s.V1Pod, k8s.V1ResourceRequirements)):
from airflow.kubernetes.pod_generator import PodGenerator
+ # We're running this in an except block, so we don't want it to fail
+ # under any circumstances, e.g. accessing a non-existing attribute.
def safe_get_name(pod):
- """
- We're running this in an except block, so we don't want it to
- fail under any circumstances, e.g. by accessing an attribute that
isn't there.
- """
try:
return pod.metadata.name
except Exception:
diff --git a/airflow/utils/cli_action_loggers.py
b/airflow/utils/cli_action_loggers.py
index 8962e35221..02c311d40a 100644
--- a/airflow/utils/cli_action_loggers.py
+++ b/airflow/utils/cli_action_loggers.py
@@ -27,8 +27,7 @@ from typing import Callable
def register_pre_exec_callback(action_logger):
- """
- Registers more action_logger function callback for pre-execution.
+ """Registers more action_logger function callback for pre-execution.
This function callback is expected to be called with keyword args.
For more about the arguments that is being passed to the callback,
@@ -42,8 +41,7 @@ def register_pre_exec_callback(action_logger):
def register_post_exec_callback(action_logger):
- """
- Registers more action_logger function callback for post-execution.
+ """Registers more action_logger function callback for post-execution.
This function callback is expected to be called with keyword args.
For more about the arguments that is being passed to the callback,
@@ -57,8 +55,8 @@ def register_post_exec_callback(action_logger):
def on_pre_execution(**kwargs):
- """
- Calls callbacks before execution.
+ """Calls callbacks before execution.
+
Note that any exception from callback will be logged but won't be
propagated.
:param kwargs:
@@ -73,8 +71,8 @@ def on_pre_execution(**kwargs):
def on_post_execution(**kwargs):
- """
- Calls callbacks after execution.
+ """Calls callbacks after execution.
+
As it's being called after execution, it can capture status of execution,
duration, etc. Note that any exception from callback will be logged but
won't be propagated.
@@ -91,13 +89,10 @@ def on_post_execution(**kwargs):
def default_action_log(sub_command, user, task_id, dag_id, execution_date,
host_name, full_command, **_):
- """
- A default action logger callback that behave same as
www.utils.action_logging
- which uses global session and pushes log ORM object.
+ """Default action logger callback that behaves similar to
``action_logging``.
- :param log: An log ORM instance
- :param **_: other keyword arguments that is not being used by this function
- :return: None
+ The difference is this function uses the global ORM session, and pushes a
+ ``Log`` row into the database instead of actually logging.
"""
from sqlalchemy.exc import OperationalError, ProgrammingError
diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py
index 820ace78b2..a4f70c8dfe 100644
--- a/airflow/utils/dates.py
+++ b/airflow/utils/dates.py
@@ -19,11 +19,13 @@ from __future__ import annotations
import warnings
from datetime import datetime, timedelta
+from typing import Collection
from croniter import croniter
from dateutil.relativedelta import relativedelta # for doctest
from airflow.exceptions import RemovedInAirflow3Warning
+from airflow.typing_compat import Literal
from airflow.utils import timezone
cron_presets: dict[str, str] = {
@@ -42,10 +44,7 @@ def date_range(
num: int | None = None,
delta: str | timedelta | relativedelta | None = None,
) -> list[datetime]:
- """
- Get a set of dates as a list based on a start, end and delta, delta
- can be something that can be added to `datetime.datetime`
- or a cron expression as a `str`.
+ """Get a list of dates in the specified range, separated by delta.
.. code-block:: pycon
>>> from airflow.utils.dates import date_range
@@ -136,11 +135,12 @@ def date_range(
return sorted(dates)
-def round_time(dt, delta, start_date=timezone.make_aware(datetime.min)):
- """
- Returns the datetime of the form start_date + i * delta
- which is closest to dt for any non-negative integer i.
- Note that delta may be a datetime.timedelta or a dateutil.relativedelta.
+def round_time(
+ dt: datetime,
+ delta: str | timedelta | relativedelta,
+ start_date: datetime = timezone.make_aware(datetime.min),
+):
+ """Returns ``start_date + i * delta`` for given ``i`` where the result is
closest to ``dt``.
.. code-block:: pycon
@@ -219,11 +219,13 @@ def round_time(dt, delta,
start_date=timezone.make_aware(datetime.min)):
# and this function returns start_date.
-def infer_time_unit(time_seconds_arr):
- """
- Determine the most appropriate time unit for an array of time durations
- specified in seconds.
- e.g. 5400 seconds => 'minutes', 36000 seconds => 'hours'.
+TimeUnit = Literal["days", "hours", "minutes", "seconds"]
+
+
+def infer_time_unit(time_seconds_arr: Collection[float]) -> TimeUnit:
+ """Determine the most appropriate time unit for given durations (in
seconds).
+
+ e.g. 5400 seconds => 'minutes', 36000 seconds => 'hours'
"""
if len(time_seconds_arr) == 0:
return "hours"
@@ -238,7 +240,7 @@ def infer_time_unit(time_seconds_arr):
return "days"
-def scale_time_units(time_seconds_arr, unit):
+def scale_time_units(time_seconds_arr: Collection[float], unit: TimeUnit) ->
Collection[float]:
"""Convert an array of time durations in seconds to the specified time
unit."""
if unit == "minutes":
return list(map(lambda x: x / 60, time_seconds_arr))
@@ -250,9 +252,9 @@ def scale_time_units(time_seconds_arr, unit):
def days_ago(n, hour=0, minute=0, second=0, microsecond=0):
- """
- Get a datetime object representing `n` days ago. By default the time is
- set to midnight.
+ """Get a datetime object representing *n* days ago.
+
+ By default the time is set to midnight.
"""
warnings.warn(
"Function `days_ago` is deprecated and will be removed in Airflow 3.0.
"
diff --git a/airflow/utils/log/timezone_aware.py
b/airflow/utils/log/timezone_aware.py
index ac2d88f020..999ccda5a7 100644
--- a/airflow/utils/log/timezone_aware.py
+++ b/airflow/utils/log/timezone_aware.py
@@ -22,12 +22,11 @@ import pendulum
class TimezoneAware(logging.Formatter):
- """
- Override `default_time_format`, `default_msec_format` and `formatTime` to
specify utc offset.
- utc offset is the matter, without it, time conversion could be wrong.
- With this Formatter, `%(asctime)s` will be formatted containing utc
offset. (ISO 8601).
+ """Override time-formatting methods to include UTC offset.
- e.g. 2022-06-12T13:00:00.123+0000
+ Since Airflow parses the logs to perform time conversion, UTC offset is
+ critical information. This formatter ensures ``%(asctime)s`` is formatted
+ containing the offset in ISO 8601, e.g. ``2022-06-12T13:00:00.123+0000``.
"""
default_time_format = "%Y-%m-%dT%H:%M:%S"
@@ -35,9 +34,10 @@ class TimezoneAware(logging.Formatter):
default_tz_format = "%z"
def formatTime(self, record, datefmt=None):
- """
- Returns the creation time of the specified LogRecord in ISO 8601 date
and time format
- in the local time zone.
+ """Format time in record.
+
+ This returns the creation time of the specified LogRecord in ISO 8601
+ date and time format in the local time zone.
"""
dt = pendulum.from_timestamp(record.created,
tz=pendulum.local_timezone())
if datefmt:
diff --git a/airflow/utils/process_utils.py b/airflow/utils/process_utils.py
index 50dc669efe..663f0f496c 100644
--- a/airflow/utils/process_utils.py
+++ b/airflow/utils/process_utils.py
@@ -324,11 +324,13 @@ def check_if_pidfile_process_is_running(pid_file: str,
process_name: str):
def set_new_process_group() -> None:
- """
- Try to set current process to a new process group.
+ """Try to set current process to a new process group.
+
That makes it easy to kill all sub-process of this at the OS-level,
rather than having to iterate the child processes.
- If current process spawn by system call ``exec()`` than keep current
process group.
+
+ If current process was spawned by system call ``exec()``, the current
+ process group is kept.
"""
if os.getpid() == os.getsid(0):
# If PID = SID than process a session leader, and it is not possible
to change process group
diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py
index 2e47290f6e..f12f6f44c8 100644
--- a/airflow/utils/sqlalchemy.py
+++ b/airflow/utils/sqlalchemy.py
@@ -396,10 +396,11 @@ def nowait(session: Session) -> dict[str, Any]:
def nulls_first(col, session: Session) -> dict[str, Any]:
- """
- Adds a nullsfirst construct to the column ordering. Currently only
Postgres supports it.
- In MySQL & Sqlite NULL values are considered lower than any non-NULL
value, therefore, NULL values
- appear first when the order is ASC (ascending).
+ """Specify *NULLS FIRST* to the column ordering.
+
+ This is only done to Postgres, currently the only backend that supports it.
+ Other databases do not need it since NULL values are considered lower than
+ any other values, and appear first when the order is ASC (ascending).
"""
if session.bind.dialect.name == "postgresql":
return nullsfirst(col)
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index 5d134697f9..2b65055f38 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -29,33 +29,31 @@ utc = pendulum.tz.timezone("UTC")
def is_localized(value):
- """
- Determine if a given datetime.datetime is aware.
- The concept is defined in Python's docs:
- http://docs.python.org/library/datetime.html#datetime.tzinfo
- Assuming value.tzinfo is either None or a proper datetime.tzinfo,
- value.utcoffset() implements the appropriate logic.
+ """Determine if a given datetime.datetime is aware.
+
+ The concept is defined in Python documentation. Assuming the tzinfo is
+ either None or a proper ``datetime.tzinfo`` instance, ``value.utcoffset()``
+ implements the appropriate logic.
+
+ .. seealso:: http://docs.python.org/library/datetime.html#datetime.tzinfo
"""
return value.utcoffset() is not None
def is_naive(value):
- """
- Determine if a given datetime.datetime is naive.
- The concept is defined in Python's docs:
- http://docs.python.org/library/datetime.html#datetime.tzinfo
- Assuming value.tzinfo is either None or a proper datetime.tzinfo,
- value.utcoffset() implements the appropriate logic.
+ """Determine if a given datetime.datetime is naive.
+
+ The concept is defined in Python documentation. Assuming the tzinfo is
+ either None or a proper ``datetime.tzinfo`` instance, ``value.utcoffset()``
+ implements the appropriate logic.
+
+ .. seealso:: http://docs.python.org/library/datetime.html#datetime.tzinfo
"""
return value.utcoffset() is None
def utcnow() -> dt.datetime:
- """
- Get the current date and time in UTC.
-
- :return:
- """
+ """Get the current date and time in UTC."""
# pendulum utcnow() is not used as that sets a TimezoneInfo object
# instead of a Timezone. This is not picklable and also creates issues
# when using replace()
@@ -66,11 +64,7 @@ def utcnow() -> dt.datetime:
def utc_epoch() -> dt.datetime:
- """
- Gets the epoch in the users timezone.
-
- :return:
- """
+ """Gets the epoch in the users timezone."""
# pendulum utcnow() is not used as that sets a TimezoneInfo object
# instead of a Timezone. This is not picklable and also creates issues
# when using replace()
@@ -91,9 +85,7 @@ def convert_to_utc(value: dt.datetime) -> DateTime:
def convert_to_utc(value: dt.datetime | None) -> DateTime | None:
- """
- Returns the datetime with the default timezone added if timezone
- information was not associated.
+ """Creates a datetime with the default timezone added if none is
associated.
:param value: datetime
:return: datetime with tzinfo
diff --git a/airflow/www/extensions/init_appbuilder.py
b/airflow/www/extensions/init_appbuilder.py
index b1c70daf47..ac9d2c9107 100644
--- a/airflow/www/extensions/init_appbuilder.py
+++ b/airflow/www/extensions/init_appbuilder.py
@@ -408,7 +408,9 @@ class AirflowAppBuilder:
then this link will be a part of the menu. Otherwise, it
will not be included in the menu items. Defaults to
:code:`None`, meaning the item will always be present.
+
Examples::
+
appbuilder = AppBuilder(app, db)
# Register a view, rendering a top menu without icon.
appbuilder.add_view(MyModelView(), "My View")
@@ -478,8 +480,8 @@ class AirflowAppBuilder:
baseview=None,
cond=None,
):
- """
- Add your own links to menu using this method
+ """Add your own links to menu using this method.
+
:param name:
The string name that identifies the menu.
:param href:
@@ -524,8 +526,8 @@ class AirflowAppBuilder:
self._add_permissions_menu(category)
def add_separator(self, category, cond=None):
- """
- Add a separator to the menu, you will sequentially create the menu
+ """Add a separator to the menu, you will sequentially create the menu.
+
:param category:
The menu category where the separator will be included.
:param cond:
@@ -558,28 +560,29 @@ class AirflowAppBuilder:
return baseview
def security_cleanup(self):
- """
- This method is useful if you have changed
- the name of your menus or classes,
- changing them will leave behind permissions
- that are not associated with anything.
- You can use it always or just sometimes to
- perform a security cleanup. Warning this will delete any permission
- that is no longer part of any registered view or menu.
- Remember invoke ONLY AFTER YOU HAVE REGISTERED ALL VIEWS.
+ """Clean up security.
+
+ This method is useful if you have changed the name of your menus or
+ classes. Changing them leaves behind permissions that are not
associated
+ with anything. You can use it always or just sometimes to perform a
+ security cleanup.
+
+ .. warning::
+
+ This deletes any permission that is no longer part of any
registered
+ view or menu. Only invoke AFTER YOU HAVE REGISTERED ALL VIEWS.
"""
self.sm.security_cleanup(self.baseviews, self.menu)
def security_converge(self, dry=False) -> dict:
- """
- Migrates all permissions to the new names on all the Roles.
+ """Migrates all permissions to the new names on all the Roles.
This method is useful when you use:
- - `class_permission_name`
- - `previous_class_permission_name`
- - `method_permission_name`
- - `previous_method_permission_name`
+ - ``class_permission_name``
+ - ``previous_class_permission_name``
+ - ``method_permission_name``
+ - ``previous_method_permission_name``
:param dry: If True will not change DB
:return: Dict with all computed necessary operations
diff --git a/airflow/www/extensions/init_manifest_files.py
b/airflow/www/extensions/init_manifest_files.py
index 79b244e3b6..2ce60194a6 100644
--- a/airflow/www/extensions/init_manifest_files.py
+++ b/airflow/www/extensions/init_manifest_files.py
@@ -24,11 +24,9 @@ from flask import url_for
def configure_manifest_files(app):
- """
- Loads the manifest file and register the `url_for_asset_` template tag.
+ """Loads the manifest file and register the `url_for_asset_` template tag.
:param app:
- :return:
"""
manifest = {}
@@ -52,11 +50,10 @@ def configure_manifest_files(app):
@app.context_processor
def get_url_for_asset():
- """
- Template tag to return the asset URL.
- WebPack renders the assets after minification and modification
- under the static/dist folder.
- This template tag reads the asset name in manifest.json and returns
- the appropriate file.
+ """Template tag to return the asset URL.
+
+ WebPack renders the assets after minification and modification under
the
+ static/dist folder. This template tag reads the asset name in
+ ``manifest.json`` and returns the appropriate file.
"""
return dict(url_for_asset=get_asset_url)
diff --git a/airflow/www/fab_security/manager.py
b/airflow/www/fab_security/manager.py
index 380e3ef9e3..a26b5977fb 100644
--- a/airflow/www/fab_security/manager.py
+++ b/airflow/www/fab_security/manager.py
@@ -146,6 +146,7 @@ class BaseSecurityManager:
@staticmethod
def oauth_tokengetter(token=None):
"""Authentication (OAuth) token getter function.
+
Override to implement your own token getter method.
"""
return _oauth_tokengetter(token)
@@ -585,9 +586,10 @@ class BaseSecurityManager:
return _provider.get("token_key", "oauth_token")
def get_oauth_token_secret_name(self, provider):
- """
- Returns the token_secret name for the oauth provider if none is
configured defaults to oauth_secret.
- This is configured using OAUTH_PROVIDERS and token_secret.
+ """Gety the ``token_secret`` name for the oauth provider.
+
+ If none is configured, defaults to ``oauth_secret``. This is configured
+ using ``OAUTH_PROVIDERS`` and ``token_secret``.
"""
for _provider in self.oauth_providers:
if _provider["name"] == provider:
@@ -606,9 +608,9 @@ class BaseSecurityManager:
session["oauth_provider"] = provider
def get_oauth_user_info(self, provider, resp):
- """
- Since there are different OAuth API's with different ways to
- retrieve user info.
+ """Get the OAuth user information from different OAuth APIs.
+
+ All providers have different ways to retrieve user info.
"""
# for GITHUB
if provider == "github" or provider == "githublocal":
@@ -866,9 +868,10 @@ class BaseSecurityManager:
self.update_user(user)
def update_user_auth_stat(self, user, success=True):
- """
- Update user authentication stats upon successful/unsuccessful
- authentication attempts.
+ """Update user authentication stats.
+
+ This is done upon successful/unsuccessful authentication attempts.
+
:param user:
The identified (but possibly not successfully authenticated) user
model
@@ -890,9 +893,10 @@ class BaseSecurityManager:
self.update_user(user)
def _rotate_session_id(self):
- """
- Upon successful authentication when using the database session backend,
- we need to rotate the session id.
+ """Rotate the session ID.
+
+ We need to do this upon successful authentication when using the
+ database session backend.
"""
if conf.get("webserver", "SESSION_BACKEND") == "database":
session.sid = str(uuid4())
@@ -1379,10 +1383,10 @@ class BaseSecurityManager:
def _get_user_permission_resources(
self, user: User | None, action_name: str, resource_names: list[str] |
None = None
) -> set[str]:
- """
- Return a set of resource names with a certain action name that a user
has access to.
- Mainly used to fetch all menu permissions on a single db call, will
also check public permissions
- and builtin roles.
+ """Get resource names with a certain action name that a user has
access to.
+
+ Mainly used to fetch all menu permissions on a single db call, will
also
+ check public permissions and builtin roles
"""
if not resource_names:
resource_names = []
diff --git a/dev/breeze/src/airflow_breeze/utils/confirm.py
b/dev/breeze/src/airflow_breeze/utils/confirm.py
index 0b64697c19..225a04bce2 100644
--- a/dev/breeze/src/airflow_breeze/utils/confirm.py
+++ b/dev/breeze/src/airflow_breeze/utils/confirm.py
@@ -37,14 +37,12 @@ def user_confirm(
default_answer: Answer | None = Answer.NO,
quit_allowed: bool = True,
) -> Answer:
- """
- Ask the user for confirmation.
+ """Ask the user for confirmation.
:param message: message to display to the user (should end with the
question mark)
:param timeout: time given user to answer
:param default_answer: default value returned on timeout. If no default -
is set, the timeout is ignored.
:param quit_allowed: whether quit answer is allowed
- :return:
"""
from inputimeout import TimeoutOccurred, inputimeout
diff --git a/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py
b/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py
index 2218a266b2..6bcb0bdb93 100644
--- a/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py
+++ b/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py
@@ -262,11 +262,11 @@ Please upgrade to at least {MIN_DOCKER_VERSION}[/]
def check_remote_ghcr_io_commands():
- """
- Checks if you have permissions to pull an empty image from ghcr.io.
Unfortunately, GitHub packages
- treat expired login as "no-access" even on public repos. We need to detect
that situation and suggest
- user to log-out or if they are in CI environment to re-push their PR/close
or reopen the PR.
- :return:
+ """Checks if you have permissions to pull an empty image from ghcr.io.
+
+ Unfortunately, GitHub packages treat expired login as "no-access" even on
+ public repos. We need to detect that situation and suggest user to log-out
+ or if they are in CI environment to re-push their PR/close or reopen the
PR.
"""
response = run_command(
["docker", "pull", "ghcr.io/apache/airflow-hello-world"],
@@ -305,12 +305,12 @@ DOCKER_COMPOSE_COMMAND = ["docker-compose"]
def check_docker_compose_version():
- """
- Checks if the docker compose version is as expected, including some
specific modifications done by
- some vendors such as Microsoft. They might have modified version of
docker-compose/docker in their
- cloud. In case docker compose version is wrong we continue but print
warning for the user.
-
+ """Checks if the docker compose version is as expected.
+ This includes specific modifications done by some vendors such as
Microsoft.
+ They might have modified version of docker-compose/docker in their cloud.
In
+ the case the docker compose version is wrong, we continue but print a
+ warning for the user.
"""
version_pattern = re.compile(r"(\d+)\.(\d+)\.(\d+)")
docker_compose_version_command = ["docker-compose", "--version"]
@@ -363,10 +363,7 @@ Make sure docker-compose you install is first on the PATH
variable of yours.
def check_docker_context():
- """
- Checks whether Docker is using the expected context
-
- """
+ """Checks whether Docker is using the expected context."""
expected_docker_context = "default"
response = run_command(
["docker", "info", "--format", "{{json .ClientInfo.Context}}"],
@@ -576,12 +573,11 @@ def make_sure_builder_configured(params:
CommonBuildParams):
def set_value_to_default_if_not_set(env: dict[str, str], name: str, default:
str):
- """
- Set value of name parameter to default (indexed by name) if not set.
+ """Set value of name parameter to default (indexed by name) if not set.
+
:param env: dictionary where to set the parameter
:param name: name of parameter
:param default: default value
- :return:
"""
if env.get(name) is None:
env[name] = os.environ.get(name, default)
diff --git a/dev/provider_packages/prepare_provider_packages.py
b/dev/provider_packages/prepare_provider_packages.py
index 5f994e9432..a2b7787f49 100755
--- a/dev/provider_packages/prepare_provider_packages.py
+++ b/dev/provider_packages/prepare_provider_packages.py
@@ -623,13 +623,15 @@ def strip_leading_zeros(version: str) -> str:
def get_previous_release_info(
previous_release_version: str | None, past_releases: list[ReleaseInfo],
current_release_version: str
) -> str | None:
- """
- Find previous release. In case we are re-running current release we assume
that last release was
- the previous one. This is needed so that we can generate list of changes
since the previous release.
+ """Find previous release.
+
+ In case we are re-running current release, we assume that last release was
+ the previous one. This is needed so that we can generate list of changes
+ since the previous release.
+
:param previous_release_version: known last release version
:param past_releases: list of past releases
:param current_release_version: release that we are working on currently
- :return:
"""
previous_release = None
if previous_release_version == current_release_version:
@@ -645,8 +647,8 @@ def check_if_release_version_ok(
past_releases: list[ReleaseInfo],
current_release_version: str,
) -> tuple[str, str | None]:
- """
- Check if the release version passed is not later than the last release
version
+ """Check if the release version passed is not later than the last release
version.
+
:param past_releases: all past releases (if there are any)
:param current_release_version: release version to check
:return: Tuple of current/previous_release (previous might be None if
there are no releases)
@@ -668,8 +670,8 @@ def check_if_release_version_ok(
def get_cross_provider_dependent_packages(provider_package_id: str) ->
list[str]:
- """
- Returns cross-provider dependencies for the package.
+ """Returns cross-provider dependencies for the package.
+
:param provider_package_id: package id
:return: list of cross-provider dependencies
"""
@@ -677,18 +679,17 @@ def
get_cross_provider_dependent_packages(provider_package_id: str) -> list[str]
def make_current_directory_safe(verbose: bool):
- """
- Makes current directory safe for Git.
+ """Makes current directory safe for Git.
- New git checks if git ownership for the folder is not manipulated with. We
are running this command
- only inside the container where the directory is mounted from "regular"
user to "root" user which is
- used inside the container, so this is quite ok to assume the directory it
is used is safe.
+ New git checks if git ownership for the folder is not manipulated with. We
+ are running this command only inside the container where the directory is
+ mounted from "regular" user to "root" user which is used inside the
+ container, so this is quite ok to assume the directory it is used is safe.
- It's also ok to leave it as safe - it is a global option inside the
container so it will disappear
- when we exit.
+ It's also ok to leave it as safe - it is a global option inside the
+ container so it will disappear when we exit.
:param verbose: whether to print commands being executed
- :return:
"""
safe_dir_remove_command = ["git", "config", "--global", "--unset-all",
"safe.directory"]
if verbose:
@@ -702,17 +703,17 @@ def make_current_directory_safe(verbose: bool):
def make_sure_remote_apache_exists_and_fetch(git_update: bool, verbose: bool):
- """
- Make sure that apache remote exist in git. We need to take a log from the
apache
- repository - not locally.
+ """Make sure that apache remote exist in git.
- Also, the local repo might be shallow, so we need to un-shallow it.
+ We need to take a log from the apache repository - not locally. Also, the
+ local repo might be shallow, so we need to un-shallow it.
This will:
* mark current directory as safe for ownership (it is run in the container)
* check if the remote exists and add if it does not
* check if the local repo is shallow, mark it to un-shallow in this case
- * fetch from the remote including all tags and overriding local tags in
case they are set differently
+ * fetch from the remote including all tags and overriding local tags in
case
+ they are set differently
:param git_update: If the git remote already exists, should we try to
update it
:param verbose: print verbose messages while fetching
@@ -782,8 +783,10 @@ def make_sure_remote_apache_exists_and_fetch(git_update:
bool, verbose: bool):
def get_git_log_command(
verbose: bool, from_commit: str | None = None, to_commit: str | None = None
) -> list[str]:
- """
- Get git command to run for the current repo from the current folder (which
is the package folder).
+ """Get git command to run for the current repo from the current folder.
+
+ The current directory should always be the package folder.
+
:param verbose: whether to print verbose info while getting the command
:param from_commit: if present - base commit from which to start the log
from
:param to_commit: if present - final commit which should be the start of
the log
@@ -806,8 +809,8 @@ def get_git_log_command(
def get_git_tag_check_command(tag: str) -> list[str]:
- """
- Get git command to check if tag exits.
+ """Get git command to check if tag exits.
+
:param tag: Tag to check
:return: git command to run
"""
@@ -819,8 +822,8 @@ def get_git_tag_check_command(tag: str) -> list[str]:
def get_source_package_path(provider_package_id: str) -> str:
- """
- Retrieves source package path from package id.
+ """Retrieves source package path from package id.
+
:param provider_package_id: id of the package
:return: path of the providers folder
"""
@@ -828,8 +831,8 @@ def get_source_package_path(provider_package_id: str) ->
str:
def get_documentation_package_path(provider_package_id: str) -> str:
- """
- Retrieves documentation package path from package id.
+ """Retrieves documentation package path from package id.
+
:param provider_package_id: id of the package
:return: path of the documentation folder
"""
@@ -839,8 +842,8 @@ def get_documentation_package_path(provider_package_id:
str) -> str:
def get_generated_package_path(provider_package_id: str) -> str:
- """
- Retrieves generated package path from package id.
+ """Retrieves generated package path from package id.
+
:param provider_package_id: id of the package
:return: path of the providers folder
"""
@@ -849,8 +852,7 @@ def get_generated_package_path(provider_package_id: str) ->
str:
def get_additional_package_info(provider_package_path: str) -> str:
- """
- Returns additional info for the package.
+ """Returns additional info for the package.
:param provider_package_path: path for the package
:return: additional information for the path (empty string if missing)
@@ -878,10 +880,10 @@ def get_package_pip_name(provider_package_id: str):
def validate_provider_info_with_runtime_schema(provider_info: dict[str, Any])
-> None:
- """
- Validates provider info against the runtime schema. This way we check if
the provider info in the
- packages is future-compatible. The Runtime Schema should only change when
there is a major version
- change.
+ """Validates provider info against the runtime schema.
+
+ This way we check if the provider info in the packages is
future-compatible.
+ The Runtime Schema should only change when there is a major version change.
:param provider_info: provider info to validate
"""
@@ -900,10 +902,13 @@ def
validate_provider_info_with_runtime_schema(provider_info: dict[str, Any]) ->
def get_provider_yaml(provider_package_id: str) -> dict[str, Any]:
- """
- Retrieves provider info from the provider yaml file. The provider yaml
file contains more information
- than provider_info that is used at runtime. This method converts the full
provider yaml file into
- stripped-down provider info and validates it against deprecated 2.0.0
schema and runtime schema.
+ """Retrieves provider info from the provider YAML file.
+
+ The provider yaml file contains more information than provider_info that is
+ used at runtime. This method converts the full provider yaml file into
+ stripped-down provider info and validates it against deprecated 2.0.0
schema
+ and runtime schema.
+
:param provider_package_id: package id to retrieve provider.yaml from
:return: provider_info dictionary
"""
@@ -916,8 +921,8 @@ def get_provider_yaml(provider_package_id: str) ->
dict[str, Any]:
def get_provider_info_from_provider_yaml(provider_package_id: str) ->
dict[str, Any]:
- """
- Retrieves provider info from the provider yaml file.
+ """Retrieves provider info from the provider yaml file.
+
:param provider_package_id: package id to retrieve provider.yaml from
:return: provider_info dictionary
"""
@@ -942,12 +947,11 @@ def get_all_changes_for_package(
verbose: bool,
base_branch: str,
) -> tuple[bool, list[list[Change]] | Change | None, str]:
- """
- Retrieves all changes for the package.
+ """Retrieves all changes for the package.
+
:param provider_package_id: provider package id
:param base_branch: base branch to check changes in apache remote for
changes
:param verbose: whether to print verbose messages
-
"""
provider_details = get_provider_details(provider_package_id)
current_version = provider_details.versions[0]
@@ -1171,11 +1175,12 @@ def prepare_readme_file(context):
def confirm(message: str, answer: str | None = None) -> bool:
- """
- Ask user to confirm (case-insensitive).
+ """Ask user to confirm (case-insensitive).
+
:param message: message to display
:param answer: force answer if set
- :return: True if the answer is any form of y/yes. Exits with 65 exit code
if any form of q/quit is chosen.
+ :return: True if the answer is any form of y/yes. Exits with 65 exit code
if
+ any form of q/quit is chosen.
"""
given_answer = answer.lower() if answer is not None else ""
while given_answer not in ["y", "n", "q", "yes", "no", "quit"]:
@@ -1199,8 +1204,8 @@ class TypeOfChange(Enum):
def get_type_of_changes(answer: str | None) -> TypeOfChange:
- """
- Ask user to specify type of changes (case-insensitive).
+ """Ask user to specify type of changes (case-insensitive).
+
:return: Type of change.
"""
given_answer = ""
@@ -1271,8 +1276,9 @@ def update_release_notes(
answer: str | None,
base_branch: str,
) -> bool:
- """
- Updates generated files (readme, changes and/or
setup.cfg/setup.py/manifest.in/provider_info)
+ """Updates generated files.
+
+ This includes the readme, changes, and/or
setup.cfg/setup.py/manifest.in/provider_info).
:param provider_package_id: id of the package
:param version_suffix: version suffix corresponding to the version in the
code
@@ -1352,8 +1358,7 @@ def update_setup_files(
provider_package_id: str,
version_suffix: str,
):
- """
- Updates generated setup.cfg/setup.py/manifest.in/provider_info for packages
+ """Updates generated setup.cfg/setup.py/manifest.in/provider_info for
packages.
:param provider_package_id: id of the package
:param version_suffix: version suffix corresponding to the version in the
code
@@ -1545,18 +1550,17 @@ def prepare_manifest_in_file(context):
def get_all_providers() -> list[str]:
- """
- Returns all providers for regular packages.
+ """Returns all providers for regular packages.
+
:return: list of providers that are considered for provider packages
"""
return list(ALL_PROVIDERS)
def verify_provider_package(provider_package_id: str) -> None:
- """
- Verifies if the provider package is good.
+ """Verifies if the provider package is good.
+
:param provider_package_id: package id to verify
- :return: None
"""
if provider_package_id not in get_all_providers():
console.print(f"[red]Wrong package name: {provider_package_id}[/]")
@@ -1618,10 +1622,9 @@ def update_package_documentation(
verbose: bool,
base_branch: str,
):
- """
- Updates package documentation.
+ """Updates package documentation.
- See `list-providers-packages` subcommand for the possible PACKAGE_ID values
+ See `list-providers-packages` subcommand for the possible PACKAGE_ID
values.
"""
provider_package_id = package_id
verify_provider_package(provider_package_id)
@@ -1670,10 +1673,9 @@ def tag_exists_for_version(provider_package_id: str,
current_tag: str, verbose:
def generate_setup_files(
version_suffix: str, git_update: bool, package_id: str, verbose: bool,
skip_tag_check: bool
):
- """
- Generates setup files for the package.
+ """Generates setup files for the package.
- See `list-providers-packages` subcommand for the possible PACKAGE_ID values
+ See `list-providers-packages` subcommand for the possible PACKAGE_ID
values.
"""
provider_package_id = package_id
with with_group(f"Generate setup files for '{provider_package_id}'"):
@@ -1740,10 +1742,9 @@ def build_provider_packages(
verbose: bool,
skip_tag_check: bool,
):
- """
- Builds provider package.
+ """Builds provider package.
- See `list-providers-packages` subcommand for the possible PACKAGE_ID values
+ See `list-providers-packages` subcommand for the possible PACKAGE_ID
values.
"""
import tempfile
@@ -1792,13 +1793,14 @@ def build_provider_packages(
def find_insertion_index_for_version(content: list[str], version: str) ->
tuple[int, bool]:
- """
- Finds insertion index for the specified version from the .rst changelog
content.
+ """Finds insertion index for the specified version from the .rst changelog
content.
:param content: changelog split into separate lines
:param version: version to look for
- :return: Tuple : insertion_index, append (whether to append or insert the
changelog)
+ :return: A 2-tuple. The first item indicates the insertion index, while the
+ second is a boolean indicating whether to append (False) or insert
(True)
+ to the changelog.
"""
changelog_found = False
skip_next_line = False
@@ -1824,10 +1826,11 @@ class ClassifiedChanges(NamedTuple):
def get_changes_classified(changes: list[Change]) -> ClassifiedChanges:
- """
- Pre-classifies changes based on commit message, it's wildly guessing now,
- but if we switch to semantic commits, it could be automated. This list is
supposed to be manually
- reviewed and re-classified by release manager anyway.
+ """Pre-classifies changes based on commit message, it's wildly guessing
now,
+
+ However, if we switch to semantic commits, it could be automated. This list
+ is supposed to be manually reviewed and re-classified by release manager
+ anyway.
:param changes: list of changes
:return: list of changes classified semi-automatically to the
fix/feature/breaking/other buckets
@@ -1856,8 +1859,8 @@ def update_changelog(package_id: str, base_branch: str,
verbose: bool):
def _update_changelog(package_id: str, base_branch: str, verbose: bool) ->
bool:
- """
- Internal update changelog method
+ """Internal update changelog method.
+
:param package_id: package id
:param base_branch: base branch to check changes in apache remote for
changes
:param verbose: verbose flag
diff --git
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst
index 45fcb1eaca..0b7ef9a472 100644
---
a/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst
+++
b/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst
@@ -101,7 +101,7 @@ the example below.
.. code-block:: bash
$ airflow info
- ...
+
Apache Airflow
version | 2.7.0.dev0
executor | LocalExecutor
diff --git a/docs/exts/docs_build/code_utils.py
b/docs/exts/docs_build/code_utils.py
index a7e3a5a666..1f520553b8 100644
--- a/docs/exts/docs_build/code_utils.py
+++ b/docs/exts/docs_build/code_utils.py
@@ -36,12 +36,11 @@ CONSOLE_WIDTH = 180
def prepare_code_snippet(file_path: str, line_no: int, context_lines_count:
int = 5) -> str:
- """
- Prepares code snippet.
+ """Prepares code snippet.
+
:param file_path: file path
:param line_no: line number
:param context_lines_count: number of lines of context.
- :return:
"""
def guess_lexer_for_filename(filename):
diff --git a/scripts/in_container/verify_providers.py
b/scripts/in_container/verify_providers.py
index ab0a977aa6..5a5838351b 100755
--- a/scripts/in_container/verify_providers.py
+++ b/scripts/in_container/verify_providers.py
@@ -208,8 +208,8 @@ def filter_known_common_deprecated_messages(warn:
warnings.WarningMessage) -> bo
def get_all_providers() -> list[str]:
- """
- Returns all providers for regular packages.
+ """Returns all providers for regular packages.
+
:return: list of providers that are considered for provider packages
"""
from setup import ALL_PROVIDERS
@@ -224,13 +224,13 @@ def import_all_classes(
print_imports: bool = False,
print_skips: bool = False,
) -> tuple[list[str], list[WarningMessage], list[str]]:
- """
- Imports all classes in providers packages. This method loads and imports
- all the classes found in providers, so that we can find all the subclasses
- of operators/sensors etc.
+ """Imports all classes in providers packages.
+
+ This method loads and imports all the classes found in providers, so that
we
+ can find all the subclasses of operators/sensors etc.
- :param walkable_paths_and_prefixes: dict of paths with accompanying
prefixes to look the provider
- packages in
+ :param walkable_paths_and_prefixes: dict of paths with accompanying
prefixes
+ to look the provider packages in
:param prefix: prefix to add
:param provider_ids - provider ids that should be loaded.
:param print_imports - if imported class should also be printed in output
@@ -330,8 +330,7 @@ def import_all_classes(
def is_imported_from_same_module(the_class: str, imported_name: str) -> bool:
- """
- Is the class imported from another module?
+ """Is the class imported from another module?
:param the_class: the class object itself
:param imported_name: name of the imported class
@@ -341,8 +340,7 @@ def is_imported_from_same_module(the_class: str,
imported_name: str) -> bool:
def is_example_dag(imported_name: str) -> bool:
- """
- Is the class an example_dag class?
+ """Is the class an example_dag class?
:param imported_name: name where the class is imported from
:return: true if it is an example_dags class
@@ -351,18 +349,17 @@ def is_example_dag(imported_name: str) -> bool:
def is_from_the_expected_base_package(the_class: type, expected_package: str)
-> bool:
- """
- Returns true if the class is from the package expected.
+ """Returns true if the class is from the package expected.
+
:param the_class: the class object
:param expected_package: package expected for the class
- :return:
"""
return the_class.__module__.startswith(expected_package)
def inherits_from(the_class: type, expected_ancestor: type | None = None) ->
bool:
- """
- Returns true if the class inherits (directly or indirectly) from the class
specified.
+ """Returns true if the class inherits (directly or indirectly) from the
class specified.
+
:param the_class: The class to check
:param expected_ancestor: expected class to inherit from
:return: true is the class inherits from the class expected
@@ -376,8 +373,8 @@ def inherits_from(the_class: type, expected_ancestor: type
| None = None) -> boo
def is_class(the_class: type) -> bool:
- """
- Returns true if the object passed is a class
+ """Returns true if the object passed is a class.
+
:param the_class: the class to pass
:return: true if it is a class
"""
@@ -387,9 +384,8 @@ def is_class(the_class: type) -> bool:
def package_name_matches(the_class: type, expected_pattern: str | None = None)
-> bool:
- """
- In case expected_pattern is set, it checks if the package name matches the
pattern.
- .
+ """In case expected_pattern is set, it checks if the package name matches
the pattern.
+
:param the_class: imported class
:param expected_pattern: the pattern that should match the package
:return: true if the expected_pattern is None or the pattern matches the
package
@@ -398,8 +394,7 @@ def package_name_matches(the_class: type, expected_pattern:
str | None = None) -
def convert_classes_to_table(entity_type: EntityType, entities: list[str],
full_package_name: str) -> str:
- """
- Converts new entities to a Markdown table.
+ """Converts new entities to a Markdown table.
:param entity_type: entity type to convert to markup
:param entities: list of entities
@@ -419,14 +414,12 @@ def get_details_about_classes(
wrong_entities: list[tuple[type, str]],
full_package_name: str,
) -> EntityTypeSummary:
- """
- Get details about entities.
+ """Get details about entities.
:param entity_type: type of entity (Operators, Hooks etc.)
:param entities: set of entities found
:param wrong_entities: wrong entities found for that type
:param full_package_name: full package name
- :return:
"""
all_entities = list(entities)
all_entities.sort()
@@ -443,9 +436,7 @@ def get_details_about_classes(
def strip_package_from_class(base_package: str, class_name: str) -> str:
- """
- Strips base package name from the class (if it starts with the package
name).
- """
+ """Strips base package name from the class (if it starts with the package
name)."""
if class_name.startswith(base_package):
return class_name[len(base_package) + 1 :]
else:
@@ -453,8 +444,7 @@ def strip_package_from_class(base_package: str, class_name:
str) -> str:
def convert_class_name_to_url(base_url: str, class_name) -> str:
- """
- Converts the class name to URL that the class can be reached
+ """Converts the class name to URL that the class can be reached.
:param base_url: base URL to use
:param class_name: name of the class
@@ -464,8 +454,7 @@ def convert_class_name_to_url(base_url: str, class_name) ->
str:
def get_class_code_link(base_package: str, class_name: str, git_tag: str) ->
str:
- """
- Provides a Markdown link for the class passed as parameter.
+ """Provides a Markdown link for the class passed as parameter.
:param base_package: base package to strip from most names
:param class_name: name of the class
@@ -480,8 +469,8 @@ def get_class_code_link(base_package: str, class_name: str,
git_tag: str) -> str
def print_wrong_naming(entity_type: EntityType, wrong_classes:
list[tuple[type, str]]):
- """
- Prints wrong entities of a given entity type if there are any
+ """Prints wrong entities of a given entity type if there are any.
+
:param entity_type: type of the class to print
:param wrong_classes: list of wrong entities
"""
@@ -501,8 +490,7 @@ def find_all_entities(
exclude_class_type: type | None = None,
false_positive_class_names: set[str] | None = None,
) -> VerifiedEntities:
- """
- Returns set of entities containing all subclasses in package specified.
+ """Returns set of entities containing all subclasses in package specified.
:param imported_classes: entities imported from providers
:param base_package: base package name where to start looking for the
entities
@@ -557,11 +545,12 @@ def find_all_entities(
def get_package_class_summary(
full_package_name: str, imported_classes: list[str]
) -> dict[EntityType, EntityTypeSummary]:
- """
- Gets summary of the package in the form of dictionary containing all types
of entities
+ """Gets summary of the package in the form of dictionary containing all
types of entities.
+
:param full_package_name: full package name
:param imported_classes: entities imported_from providers
- :return: dictionary of objects usable as context for JINJA2 templates - or
None if there are some errors
+ :return: dictionary of objects usable as context for JINJA2 templates, or
+ None if there are some errors
"""
from airflow.hooks.base import BaseHook
from airflow.models.baseoperator import BaseOperator
@@ -637,8 +626,8 @@ def get_package_class_summary(
def is_camel_case_with_acronyms(s: str):
- """
- Checks if the string passed is Camel Case (with capitalised acronyms
allowed).
+ """Checks if the string passed is Camel Case (with capitalised acronyms
allowed).
+
:param s: string to check
:return: true if the name looks cool as Class name.
"""
@@ -648,9 +637,9 @@ def is_camel_case_with_acronyms(s: str):
def check_if_classes_are_properly_named(
entity_summary: dict[EntityType, EntityTypeSummary]
) -> tuple[int, int]:
- """
- Check if all entities in the dictionary are named properly. It prints
names at the output
- and returns the status of class names.
+ """Check if all entities in the dictionary are named properly.
+
+ It prints names at the output and returns the status of class names.
:param entity_summary: dictionary of class names to check, grouped by
types.
:return: Tuple of 2 ints = total number of entities and number of badly
named entities
@@ -773,9 +762,10 @@ def get_providers_paths() -> list[str]:
def add_all_namespaced_packages(
walkable_paths_and_prefixes: dict[str, str], provider_path: str,
provider_prefix: str
):
- """
- We need to find namespace packages ourselves as "walk_packages" does not
support namespaced packages
- # and PEP420
+ """Find namespace packages.
+
+ This needs to be done manually as ``walk_packages`` does not support
+ namespaced packages and PEP 420.
:param walkable_paths_and_prefixes: pats
:param provider_path:
diff --git a/tests/conftest.py b/tests/conftest.py
index 071a6f77a9..ca0751f1be 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -63,9 +63,7 @@ collect_ignore = [
@pytest.fixture()
def reset_environment():
- """
- Resets env variables.
- """
+ """Resets env variables."""
init_env = os.environ.copy()
yield
changed_env = os.environ
@@ -78,10 +76,7 @@ def reset_environment():
@pytest.fixture()
def secret_key() -> str:
- """
- Return secret key configured.
- :return:
- """
+ """Return secret key configured."""
from airflow.configuration import conf
the_key = conf.get("webserver", "SECRET_KEY")
@@ -100,9 +95,7 @@ def url_safe_serializer(secret_key) -> URLSafeSerializer:
@pytest.fixture()
def reset_db():
- """
- Resets Airflow db.
- """
+ """Resets Airflow db."""
from airflow.utils import db
@@ -115,9 +108,7 @@ ALLOWED_TRACE_SQL_COLUMNS = ["num", "time", "trace", "sql",
"parameters", "count
@pytest.fixture(autouse=True)
def trace_sql(request):
- """
- Displays queries from the tests to console.
- """
+ """Displays queries from the tests to console."""
trace_sql_option = request.config.getoption("trace_sql")
if not trace_sql_option:
yield
@@ -157,9 +148,7 @@ def trace_sql(request):
def pytest_addoption(parser):
- """
- Add options parser for custom plugins
- """
+ """Add options parser for custom plugins."""
group = parser.getgroup("airflow")
group.addoption(
"--with-db-init",
@@ -233,10 +222,7 @@ def initial_db_init():
@pytest.fixture(autouse=True, scope="session")
def initialize_airflow_tests(request):
- """
- Helper that setups Airflow testing environment.
- """
-
+ """Helper that setups Airflow testing environment."""
print(" AIRFLOW ".center(60, "="))
# Setup test environment for breeze
@@ -294,7 +280,7 @@ def pytest_configure(config):
def pytest_unconfigure(config):
- os.environ["_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK"]
+ del os.environ["_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK"]
def skip_if_not_marked_with_integration(selected_integrations, item):
@@ -423,21 +409,24 @@ def pytest_runtest_setup(item):
@pytest.fixture
def frozen_sleep(monkeypatch):
- """
- Use time-machine to "stub" sleep, so that it takes no time, but that
- ``datetime.now()`` appears to move forwards
+ """Use time-machine to "stub" sleep.
- If your module under test does ``import time`` and then ``time.sleep``::
+ This means the ``sleep()`` takes no time, but ``datetime.now()`` appears
to move forwards.
+
+ If your module under test does ``import time`` and then ``time.sleep``:
+
+ .. code-block:: python
def test_something(frozen_sleep):
my_mod.fn_under_test()
-
If your module under test does ``from time import sleep`` then you will
- have to mock that sleep function directly::
+ have to mock that sleep function directly:
+
+ .. code-block:: python
def test_something(frozen_sleep, monkeypatch):
- monkeypatch.setattr('my_mod.sleep', frozen_sleep)
+ monkeypatch.setattr("my_mod.sleep", frozen_sleep)
my_mod.fn_under_test()
"""
traveller = None
@@ -469,8 +458,7 @@ def app():
@pytest.fixture
def dag_maker(request):
- """
- The dag_maker helps us to create DAG, DagModel, and SerializedDAG
automatically.
+ """Fixture to help create DAG, DagModel, and SerializedDAG automatically.
You have to use the dag_maker as a context manager and it takes
the same argument as DAG::
@@ -492,10 +480,11 @@ def dag_maker(request):
The dag_maker.create_dagrun takes the same arguments as dag.create_dagrun
- If you want to operate on serialized DAGs, then either pass
``serialized=True` to the ``dag_maker()``
- call, or you can mark your test/class/file with
``@pytest.mark.need_serialized_dag(True)``. In both of
- these cases the ``dag`` returned by the context manager will be a
lazily-evaluated proxy object to the
- SerializedDAG.
+ If you want to operate on serialized DAGs, then either pass
+ ``serialized=True`` to the ``dag_maker()`` call, or you can mark your
+ test/class/file with ``@pytest.mark.need_serialized_dag(True)``. In both of
+ these cases the ``dag`` returned by the context manager will be a
+ lazily-evaluated proxy object to the SerializedDAG.
"""
import lazy_object_proxy
@@ -703,8 +692,8 @@ def dag_maker(request):
@pytest.fixture
def create_dummy_dag(dag_maker):
- """
- This fixture creates a `DAG` with a single `EmptyOperator` task.
+ """Create a `DAG` with a single `EmptyOperator` task.
+
DagRun and DagModel is also created.
Apart from the already existing arguments, any other argument in kwargs
@@ -760,8 +749,7 @@ def create_dummy_dag(dag_maker):
@pytest.fixture
def create_task_instance(dag_maker, create_dummy_dag):
- """
- Create a TaskInstance, and associated DB rows (DagRun, DagModel, etc)
+ """Create a TaskInstance, and associated DB rows (DagRun, DagModel, etc).
Uses ``create_dummy_dag`` to create the dag structure.
"""
diff --git a/tests/dags/test_logging_in_dag.py
b/tests/dags/test_logging_in_dag.py
index 9cb68e1807..4420df012b 100644
--- a/tests/dags/test_logging_in_dag.py
+++ b/tests/dags/test_logging_in_dag.py
@@ -27,10 +27,9 @@ logger = logging.getLogger(__name__)
def test_logging_fn(**kwargs):
- """
- Tests DAG logging.
+ """Tests DAG logging.
+
:param kwargs:
- :return:
"""
logger.info("Log from DAG Logger")
kwargs["ti"].log.info("Log from TI Logger")
diff --git a/tests/utils/test_cli_util.py b/tests/utils/test_cli_util.py
index 126bc6f48e..99ee65fc5e 100644
--- a/tests/utils/test_cli_util.py
+++ b/tests/utils/test_cli_util.py
@@ -172,10 +172,7 @@ class TestCliUtil:
@contextmanager
def fail_action_logger_callback():
- """
- Adding failing callback and revert it back when closed.
- :return:
- """
+ """Adding failing callback and revert it back when closed."""
tmp = cli_action_loggers.__pre_exec_callbacks[:]
def fail_callback(**_):