This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9d7c224681 D205 Support - Models (#32575)
9d7c224681 is described below
commit 9d7c2246815251727af6d9119d7ef7660338e8c8
Author: D. Ferruzzi <[email protected]>
AuthorDate: Wed Jul 19 12:10:11 2023 -0700
D205 Support - Models (#32575)
* D205 Support - Models
Co-authored-by: Tzu-ping Chung <[email protected]>
---------
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/models/baseoperator.py | 73 ++++++++++------------
airflow/models/connection.py | 23 ++++---
airflow/models/crypto.py | 3 +-
airflow/models/dag.py | 124 ++++++++++++++++++-------------------
airflow/models/dagbag.py | 22 +++----
airflow/models/dagpickle.py | 11 ++--
airflow/models/dagrun.py | 8 +--
airflow/models/errors.py | 5 +-
airflow/models/param.py | 28 +++++----
airflow/models/renderedtifields.py | 6 +-
airflow/models/serialized_dag.py | 23 ++++---
airflow/models/skipmixin.py | 5 +-
airflow/models/slamiss.py | 4 +-
airflow/models/taskinstance.py | 99 +++++++++++++++--------------
airflow/models/taskmixin.py | 16 ++---
airflow/models/taskreschedule.py | 3 +-
airflow/models/trigger.py | 29 ++++-----
airflow/models/variable.py | 10 ++-
airflow/models/xcom.py | 3 +-
19 files changed, 236 insertions(+), 259 deletions(-)
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 40fe3b8f0b..e389a8df8a 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -355,8 +355,7 @@ class BaseOperatorMeta(abc.ABCMeta):
@classmethod
def _apply_defaults(cls, func: T) -> T:
"""
- Function decorator that Looks for an argument named "default_args", and
- fills the unspecified arguments from it.
+ Look for an argument named "default_args", and fill the unspecified
arguments from it.
Since python2.* isn't clear about which arguments are missing when
calling a function, and that this can be quite confusing with
multi-level
@@ -463,10 +462,12 @@ class BaseOperatorMeta(abc.ABCMeta):
@functools.total_ordering
class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta):
"""
- Abstract base class for all operators. Since operators create objects that
- become nodes in the dag, BaseOperator contains many recursive methods for
- dag crawling behavior. To derive this class, you are expected to override
- the constructor as well as the 'execute' method.
+ Abstract base class for all operators.
+
+ Since operators create objects that become nodes in the DAG, BaseOperator
+ contains many recursive methods for DAG crawling behavior. To derive from
+ this class, you are expected to override the constructor and the 'execute'
+ method.
Operators derived from this class should perform or trigger certain tasks
synchronously (wait for completion). Example of operators could be an
@@ -992,9 +993,10 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
# including lineage information
def __or__(self, other):
"""
- Called for [This Operator] | [Operator], The inlets of other
- will be set to pick up the outlets from this operator. Other will
- be set as a downstream task of this operator.
+ Called for [This Operator] | [Operator].
+
+ The inlets of other will be set to pick up the outlets from this
operator.
+ Other will be set as a downstream task of this operator.
"""
if isinstance(other, BaseOperator):
if not self.outlets and not self.supports_lineage:
@@ -1010,8 +1012,9 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
def __gt__(self, other):
"""
- Called for [Operator] > [Outlet], so that if other is an attr
annotated object
- it is set as an outlet of this Operator.
+ Called for [Operator] > [Outlet].
+
+ If other is an attr annotated object it is set as an outlet of this
Operator.
"""
if not isinstance(other, Iterable):
other = [other]
@@ -1025,8 +1028,9 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
def __lt__(self, other):
"""
- Called for [Inlet] > [Operator] or [Operator] < [Inlet], so that if
other is
- an attr annotated object it is set as an inlet to this operator.
+ Called for [Inlet] > [Operator] or [Operator] < [Inlet].
+
+ If other is an attr annotated object it is set as an inlet to this
operator.
"""
if not isinstance(other, Iterable):
other = [other]
@@ -1086,10 +1090,7 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
@dag.setter
def dag(self, dag: DAG | None):
- """
- Operators can be assigned to one DAG, one time. Repeat assignments to
- that same DAG are ok.
- """
+ """Operators can be assigned to one DAG, one time. Repeat assignments
to that same DAG are ok."""
from airflow.models.dag import DAG
if dag is None:
@@ -1128,19 +1129,17 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
"""
def prepare_for_execution(self) -> BaseOperator:
- """
- Lock task for execution to disable custom action in __setattr__ and
- returns a copy of the task.
- """
+ """Lock task for execution to disable custom action in ``__setattr__``
and return a copy."""
other = copy.copy(self)
other._lock_for_execution = True
return other
def set_xcomargs_dependencies(self) -> None:
"""
- Resolves upstream dependencies of a task. In this way passing an
``XComArg``
- as value for a template field will result in creating upstream
relation between
- two tasks.
+ Resolves upstream dependencies of a task.
+
+ In this way passing an ``XComArg`` as value for a template field
+ will result in creating upstream relation between two tasks.
**Example**: ::
@@ -1173,6 +1172,7 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
def execute(self, context: Context) -> Any:
"""
This is the main method to derive when creating an operator.
+
Context is the same dictionary used as when rendering jinja templates.
Refer to get_template_context for more context.
@@ -1183,18 +1183,18 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
def post_execute(self, context: Any, result: Any = None):
"""
This hook is triggered right after self.execute() is called.
- It is passed the execution context and any results returned by the
- operator.
+
+ It is passed the execution context and any results returned by the
operator.
"""
if self._post_execute_hook is not None:
self._post_execute_hook(context, result)
def on_kill(self) -> None:
"""
- Override this method to clean up subprocesses when a task instance
- gets killed. Any use of the threading, subprocess or multiprocessing
- module within an operator needs to be cleaned up, or it will leave
- ghost processes behind.
+ Override this method to clean up subprocesses when a task instance
gets killed.
+
+ Any use of the threading, subprocess or multiprocessing module within
an
+ operator needs to be cleaned up, or it will leave ghost processes
behind.
"""
def __deepcopy__(self, memo):
@@ -1254,10 +1254,7 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
downstream: bool = False,
session: Session = NEW_SESSION,
):
- """
- Clears the state of task instances associated with the task, following
- the parameters specified.
- """
+ """Clears the state of task instances associated with the task,
following the parameters specified."""
qry = select(TaskInstance).where(TaskInstance.dag_id == self.dag_id)
if start_date:
@@ -1375,10 +1372,7 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
self.log.info(content)
def get_direct_relatives(self, upstream: bool = False) ->
Iterable[Operator]:
- """
- Get list of the direct relatives to the current task, upstream or
- downstream.
- """
+ """Get list of the direct relatives to the current task, upstream or
downstream."""
if upstream:
return self.upstream_list
else:
@@ -1550,8 +1544,7 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
timeout: timedelta | None = None,
):
"""
- Marks this Operator as being "deferred" - that is, suspending its
- execution until the provided trigger fires an event.
+ Mark this Operator "deferred", suspending its execution until the
provided trigger fires an event.
This is achieved by raising a special exception (TaskDeferred)
which is caught in the main _execute_task wrapper.
diff --git a/airflow/models/connection.py b/airflow/models/connection.py
index 0bc3ca38d4..18c6bcce59 100644
--- a/airflow/models/connection.py
+++ b/airflow/models/connection.py
@@ -60,10 +60,10 @@ def _parse_netloc_to_hostname(uri_parts):
class Connection(Base, LoggingMixin):
"""
- Placeholder to store information about different database instances
- connection information. The idea here is that scripts use references to
- database instances (conn_id) instead of hard coding hostname, logins and
- passwords when using operators or hooks.
+ Placeholder to store information about different database instances
connection information.
+
+ The idea here is that scripts use references to database instances
(conn_id)
+ instead of hard coding hostname, logins and passwords when using operators
or hooks.
.. seealso::
For more information on how to use this class, see:
:doc:`/howto/connection`
@@ -141,8 +141,9 @@ class Connection(Base, LoggingMixin):
@staticmethod
def _validate_extra(extra, conn_id) -> None:
"""
- Here we verify that ``extra`` is a JSON-encoded Python dict. From
Airflow 3.0, we should no
- longer suppress these errors but raise instead.
+ Here we verify that ``extra`` is a JSON-encoded Python dict.
+
+ From Airflow 3.0, we should no longer suppress these errors but raise
instead.
"""
if extra is None:
return None
@@ -376,8 +377,9 @@ class Connection(Base, LoggingMixin):
def log_info(self):
"""
- This method is deprecated. You can read each field individually or use
the
- default representation (`__repr__`).
+ This method is deprecated.
+
+ You can read each field individually or use the default representation
(`__repr__`).
"""
warnings.warn(
"This method is deprecated. You can read each field individually
or "
@@ -393,8 +395,9 @@ class Connection(Base, LoggingMixin):
def debug_info(self):
"""
- This method is deprecated. You can read each field individually or use
the
- default representation (`__repr__`).
+ This method is deprecated.
+
+ You can read each field individually or use the default representation
(`__repr__`).
"""
warnings.warn(
"This method is deprecated. You can read each field individually
or "
diff --git a/airflow/models/crypto.py b/airflow/models/crypto.py
index df072665fc..97495ea90e 100644
--- a/airflow/models/crypto.py
+++ b/airflow/models/crypto.py
@@ -38,8 +38,7 @@ class FernetProtocol(Protocol):
class NullFernet:
"""
- A "Null" encryptor class that doesn't encrypt or decrypt but that presents
- a similar interface to Fernet.
+ A "Null" encryptor class that doesn't encrypt or decrypt but that presents
a similar interface to Fernet.
The purpose of this is to make the rest of the code not have to know the
difference, and to only display the message once, not 20 times when
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 1d380274f6..ae8ed2b6b6 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -220,6 +220,7 @@ def create_timetable(interval: ScheduleIntervalArg,
timezone: Timezone) -> Timet
def get_last_dagrun(dag_id, session, include_externally_triggered=False):
"""
Returns the last dag run for a dag, None if there was none.
+
Last dag run can be any type of run e.g. scheduled or backfilled.
Overridden DagRuns are ignored.
"""
@@ -235,6 +236,8 @@ def get_dataset_triggered_next_run_info(
dag_ids: list[str], *, session: Session
) -> dict[str, dict[str, int | str]]:
"""
+ Get next run info for a list of dag_ids.
+
Given a list of dag_ids, get string representing how close any that are
dataset triggered are
their next run, e.g. "1 of 2 datasets updated".
"""
@@ -273,12 +276,12 @@ def get_dataset_triggered_next_run_info(
@functools.total_ordering
class DAG(LoggingMixin):
"""
- A dag (directed acyclic graph) is a collection of tasks with directional
- dependencies. A dag also has a schedule, a start date and an end date
- (optional). For each schedule, (say daily or hourly), the DAG needs to run
- each individual tasks as their dependencies are met. Certain tasks have
- the property of depending on their own past, meaning that they can't run
- until their previous schedule (and upstream tasks) are completed.
+ A dag (directed acyclic graph) is a collection of tasks with directional
dependencies.
+
+ A dag also has a schedule, a start date and an end date (optional). For
each schedule,
+ (say daily or hourly), the DAG needs to run each individual tasks as their
dependencies
+ are met. Certain tasks have the property of depending on their own past,
meaning that
+ they can't run until their previous schedule (and upstream tasks) are
completed.
DAGs essentially act as namespaces for tasks. A task_id can only be
added once to a DAG.
@@ -764,9 +767,11 @@ class DAG(LoggingMixin):
@staticmethod
def _upgrade_outdated_dag_access_control(access_control=None):
"""
- Looks for outdated dag level actions (can_dag_read and can_dag_edit)
in DAG
- access_controls (for example, {'role1': {'can_dag_read'}, 'role2':
{'can_dag_read', 'can_dag_edit'}})
- and replaces them with updated actions (can_read and can_edit).
+ Look for outdated dag level actions in DAG access_controls and replace
them with updated actions.
+
+ For example, in DAG access_control {'role1': {'can_dag_read'}}
'can_dag_read'
+ will be replaced with 'can_read', in {'role2': {'can_dag_read',
'can_dag_edit'}}
+ 'can_dag_edit' will be replaced with 'can_edit', etc.
"""
if not access_control:
return None
@@ -1102,8 +1107,9 @@ class DAG(LoggingMixin):
def get_run_dates(self, start_date, end_date=None) -> list:
"""
- Returns a list of dates between the interval received as parameter
using this
- dag's schedule interval. Returned dates can be used for execution
dates.
+ Returns a list of dates between the interval received as parameter
using this dag's schedule interval.
+
+ Returned dates can be used for execution dates.
:param start_date: The start date of the interval.
:param end_date: The end date of the interval. Defaults to
``timezone.utcnow()``.
@@ -1316,10 +1322,7 @@ class DAG(LoggingMixin):
@provide_session
def get_concurrency_reached(self, session=NEW_SESSION) -> bool:
- """
- Returns a boolean indicating whether the max_active_tasks limit for
this DAG
- has been reached.
- """
+ """Returns a boolean indicating whether the max_active_tasks limit for
this DAG has been reached."""
TI = TaskInstance
total_tasks = session.scalar(
select(func.count(TI.task_id)).where(
@@ -1377,10 +1380,11 @@ class DAG(LoggingMixin):
@provide_session
def handle_callback(self, dagrun, success=True, reason=None,
session=NEW_SESSION):
"""
- Triggers the appropriate callback depending on the value of success,
namely the
- on_failure_callback or on_success_callback. This method gets the
context of a
- single TaskInstance part of this DagRun and passes that to the
callable along
- with a 'reason', primarily to differentiate DagRun failures.
+ Triggers on_failure_callback or on_success_callback as appropriate.
+
+ This method gets the context of a single TaskInstance part of this
DagRun
+ and passes that to the callable along with a 'reason', primarily to
+ differentiate DagRun failures.
.. note: The logs end up in
``$AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log``
@@ -1450,8 +1454,7 @@ class DAG(LoggingMixin):
session: Session = NEW_SESSION,
):
"""
- Returns the dag run for a given execution date or run_id if it exists,
otherwise
- none.
+ Returns the dag run for a given execution date or run_id if it exists,
otherwise none.
:param execution_date: The execution date of the DagRun to find.
:param run_id: The run_id of the DagRun to find.
@@ -1557,10 +1560,7 @@ class DAG(LoggingMixin):
return env
def set_dependency(self, upstream_task_id, downstream_task_id):
- """
- Simple utility method to set dependency between two tasks that
- already have been added to the DAG using add_task().
- """
+ """Set dependency between two tasks that already have been added to
the DAG using add_task()."""
self.get_task(upstream_task_id).set_downstream(self.get_task(downstream_task_id))
@provide_session
@@ -1902,8 +1902,7 @@ class DAG(LoggingMixin):
session=NEW_SESSION,
) -> list[TaskInstance]:
"""
- Set the state of a TaskInstance to the given state, and clear its
downstream tasks that are
- in failed or upstream_failed state.
+ Set the state of a TaskInstance and clear downstream tasks in failed
or upstream_failed state.
:param task_id: Task ID of the TaskInstance
:param map_indexes: Only set TaskInstance if its map_index matches.
@@ -1996,8 +1995,7 @@ class DAG(LoggingMixin):
session: Session = NEW_SESSION,
) -> list[TaskInstance]:
"""
- Set the state of the TaskGroup to the given state, and clear its
downstream tasks that are
- in failed or upstream_failed state.
+ Set TaskGroup to the given state and clear downstream tasks in failed
or upstream_failed state.
:param group_id: The group_id of the TaskGroup
:param execution_date: Execution date of the TaskInstance
@@ -2095,8 +2093,7 @@ class DAG(LoggingMixin):
def topological_sort(self, include_subdag_tasks: bool = False):
"""
- Sorts tasks in topographical order, such that a task comes after any
of its
- upstream dependencies.
+ Sorts tasks in topographical order, such that a task comes after any
of its upstream dependencies.
Deprecated in place of ``task_group.topological_sort``
"""
@@ -2154,8 +2151,7 @@ class DAG(LoggingMixin):
exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None =
frozenset(),
) -> int | Iterable[TaskInstance]:
"""
- Clears a set of task instances associated with the current dag for
- a specified date range.
+ Clears a set of task instances associated with the current dag for a
specified date range.
:param task_ids: List of task ids or (``task_id``, ``map_index``)
tuples to clear
:param start_date: The minimum execution_date to clear
@@ -2338,6 +2334,8 @@ class DAG(LoggingMixin):
include_direct_upstream=False,
):
"""
+ Return a subset of the current dag based on regex matching one or more
tasks.
+
Returns a subset of the current dag as a deep copy of the current dag
based on a regex that should match one or many tasks, and includes
upstream and downstream neighbours based on the flag passed.
@@ -2771,6 +2769,7 @@ class DAG(LoggingMixin):
):
"""
Creates a dag run from this dag including the tasks associated with
this dag.
+
Returns the dag run.
:param run_id: defines the run id for this dag run
@@ -2892,8 +2891,7 @@ class DAG(LoggingMixin):
session=NEW_SESSION,
):
"""
- Ensure the DagModel rows for the given dags are up-to-date in the dag
table in the DB, including
- calculated fields.
+ Ensure the DagModel rows for the given dags are up-to-date in the dag
table in the DB.
Note that this method can be called for both DAGs and SubDAGs. A
SubDag is actually a SubDagOperator.
@@ -3122,9 +3120,9 @@ class DAG(LoggingMixin):
@provide_session
def sync_to_db(self, processor_subdir: str | None = None,
session=NEW_SESSION):
"""
- Save attributes about this DAG to the DB. Note that this method
- can be called for both DAGs and SubDAGs. A SubDag is actually a
- SubDagOperator.
+ Save attributes about this DAG to the DB.
+
+ Note that this method can be called for both DAGs and SubDAGs. A
SubDag is actually a SubDagOperator.
:return: None
"""
@@ -3141,8 +3139,7 @@ class DAG(LoggingMixin):
@provide_session
def deactivate_unknown_dags(active_dag_ids, session=NEW_SESSION):
"""
- Given a list of known DAGs, deactivate any other DAGs that are
- marked as active in the ORM.
+ Given a list of known DAGs, deactivate any other DAGs that are marked
as active in the ORM.
:param active_dag_ids: list of DAG IDs that are active
:return: None
@@ -3158,11 +3155,11 @@ class DAG(LoggingMixin):
@provide_session
def deactivate_stale_dags(expiration_date, session=NEW_SESSION):
"""
- Deactivate any DAGs that were last touched by the scheduler before
- the expiration date. These DAGs were likely deleted.
+ Deactivate any DAGs that were last touched by the scheduler before the
expiration date.
+
+ These DAGs were likely deleted.
- :param expiration_date: set inactive DAGs that were touched before this
- time
+ :param expiration_date: set inactive DAGs that were touched before
this time
:return: None
"""
for dag in session.scalars(
@@ -3249,10 +3246,7 @@ class DAG(LoggingMixin):
return cls.__serialized_fields
def get_edge_info(self, upstream_task_id: str, downstream_task_id: str) ->
EdgeInfoType:
- """
- Returns edge information for the given pair of tasks if present, and
- an empty edge if there is no information.
- """
+ """Return edge information for the given pair of tasks or an empty
edge if there is no information."""
# Note - older serialized DAGs may not have edge_info being a dict at
all
empty = cast(EdgeInfoType, {})
if self.edge_info:
@@ -3262,14 +3256,17 @@ class DAG(LoggingMixin):
def set_edge_info(self, upstream_task_id: str, downstream_task_id: str,
info: EdgeInfoType):
"""
- Sets the given edge information on the DAG. Note that this will
overwrite,
- rather than merge with, existing info.
+ Sets the given edge information on the DAG.
+
+ Note that this will overwrite, rather than merge with, existing info.
"""
self.edge_info.setdefault(upstream_task_id, {})[downstream_task_id] =
info
def validate_schedule_and_params(self):
"""
- Validates & raise exception if there are any Params in the DAG which
neither have a default value nor
+ Validates Param values when the schedule_interval is not None.
+
+ Raise exception if there are any Params in the DAG which neither have
a default value nor
have the null in schema['type'] list, but the DAG have a
schedule_interval which is not None.
"""
if not self.timetable.can_be_scheduled:
@@ -3283,7 +3280,9 @@ class DAG(LoggingMixin):
)
def iter_invalid_owner_links(self) -> Iterator[tuple[str, str]]:
- """Parses a given link, and verifies if it's a valid URL, or a
'mailto' link.
+ """
+ Parses a given link, and verifies if it's a valid URL, or a 'mailto'
link.
+
Returns an iterator of invalid (owner, link) pairs.
"""
for owner, link in self.owner_links.items():
@@ -3312,10 +3311,10 @@ class DagTag(Base):
class DagOwnerAttributes(Base):
- """Table defining different owner attributes.
+ """
+ Table defining different owner attributes.
- For example, a link for an owner that will be passed as a hyperlink to the
- "DAGs" view.
+ For example, a link for an owner that will be passed as a hyperlink to the
"DAGs" view.
"""
__tablename__ = "dag_owner_attributes"
@@ -3515,10 +3514,7 @@ class DagModel(Base):
return paused_dag_ids
def get_default_view(self) -> str:
- """
- Get the Default DAG View, returns the default config value if DagModel
does not
- have a value.
- """
+ """Get the Default DAG View, returns the default config value if
DagModel does not have a value."""
# This is for backwards-compatibility with old dags that don't have
None as default_view
return self.default_view or
airflow_conf.get_mandatory_value("webserver", "dag_default_view").lower()
@@ -3730,7 +3726,8 @@ def dag(
fail_stop: bool = False,
) -> Callable[[Callable], Callable[..., DAG]]:
"""
- Python dag decorator. Wraps a function into an Airflow DAG.
+ Python dag decorator which wraps a function into an Airflow DAG.
+
Accepts kwargs for operator kwarg. Can be used to parameterize DAGs.
:param dag_args: Arguments for DAG object
@@ -3869,9 +3866,10 @@ class DagContext:
def _run_task(ti: TaskInstance, session):
"""
- Run a single task instance, and push result to Xcom for downstream tasks.
Bypasses a lot of
- extra steps used in `task.run` to keep our local running as fast as
possible
- This function is only meant for the `dag.test` function as a helper
function.
+ Run a single task instance, and push result to Xcom for downstream tasks.
+
+ Bypasses a lot of extra steps used in `task.run` to keep our local running
as fast as
+ possible. This function is only meant for the `dag.test` function as a
helper function.
Args:
ti: TaskInstance to run
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 1b80cb0158..e53c2ce3bd 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -73,13 +73,13 @@ class FileLoadStat(NamedTuple):
class DagBag(LoggingMixin):
"""
- A dagbag is a collection of dags, parsed out of a folder tree and has high
- level configuration settings, like what database to use as a backend and
- what executor to use to fire off tasks. This makes it easier to run
- distinct environments for say production and development, tests, or for
- different teams or security profiles. What would have been system level
- settings are now dagbag level so that one system can run multiple,
- independent settings sets.
+ A dagbag is a collection of dags, parsed out of a folder tree and has high
level configuration settings.
+
+ Some possible setting are database to use as a backend and what executor
+ to use to fire off tasks. This makes it easier to run distinct environments
+ for say production and development, tests, or for different teams or
security
+ profiles. What would have been system level settings are now dagbag level
so
+ that one system can run multiple, independent settings sets.
:param dag_folder: the folder to scan to find DAGs
:param include_examples: whether to include the examples that ship
@@ -277,10 +277,7 @@ class DagBag(LoggingMixin):
self.dags_hash[dag.dag_id] = row.dag_hash
def process_file(self, filepath, only_if_updated=True, safe_mode=True):
- """
- Given a path to a python module or zip file, this method imports
- the module and look for dag objects within it.
- """
+ """Given a path to a python module or zip file, import the module and
look for dag objects within."""
from airflow.models.dag import DagContext
# if the source file no longer exists in the DB or in the filesystem,
@@ -525,8 +522,7 @@ class DagBag(LoggingMixin):
safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE"),
):
"""
- Given a file path or a folder, this method looks for python modules,
- imports them and adds them to the dagbag collection.
+ Look for python modules in a given path, import them, and add them to
the dagbag collection.
Note that if a ``.airflowignore`` file is found while processing
the directory, it will behave much like a ``.gitignore``,
diff --git a/airflow/models/dagpickle.py b/airflow/models/dagpickle.py
index ae2c04ccaf..e6f4561d8e 100644
--- a/airflow/models/dagpickle.py
+++ b/airflow/models/dagpickle.py
@@ -32,14 +32,13 @@ if TYPE_CHECKING:
class DagPickle(Base):
"""
- Dags can originate from different places (user repos, main repo, ...)
- and also get executed in different places (different executors). This
- object represents a version of a DAG and becomes a source of truth for
- a BackfillJob execution. A pickle is a native python serialized object,
+ Represents a version of a DAG and becomes a source of truth for a
BackfillJob execution.
+
+ Dags can originate from different places (user repos, main repo, ...) and
also get executed
+ in different places (different executors). A pickle is a native python
serialized object,
and in this case gets stored in the database for the duration of the job.
- The executors pick up the DagPickle id and read the dag definition from
- the database.
+ The executors pick up the DagPickle id and read the dag definition from
the database.
"""
id = Column(Integer, primary_key=True)
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 2d3ff26d7f..baca096024 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -575,8 +575,7 @@ class DagRun(Base, LoggingMixin):
self, session: Session = NEW_SESSION, execute_callbacks: bool = True
) -> tuple[list[TI], DagCallbackRequest | None]:
"""
- Determines the overall state of the DagRun based on the state
- of its TaskInstances.
+ Determines the overall state of the DagRun based on the state of its
TaskInstances.
:param session: Sqlalchemy ORM Session
:param execute_callbacks: Should dag callbacks (success/failure, SLA
etc.) be invoked
@@ -974,8 +973,9 @@ class DagRun(Base, LoggingMixin):
@provide_session
def verify_integrity(self, *, session: Session = NEW_SESSION) -> None:
"""
- Verifies the DagRun by checking for removed tasks or tasks that are
not in the
- database yet. It will set state to removed or add the task if required.
+ Verifies the DagRun by checking for removed tasks or tasks that are
not in the database yet.
+
+ It will set state to removed or add the task if required.
:missing_indexes: A dictionary of task vs indexes that are missing.
:param session: Sqlalchemy ORM Session
diff --git a/airflow/models/errors.py b/airflow/models/errors.py
index 974d9b8eeb..9d2da0917d 100644
--- a/airflow/models/errors.py
+++ b/airflow/models/errors.py
@@ -24,10 +24,7 @@ from airflow.utils.sqlalchemy import UtcDateTime
class ImportError(Base):
- """
- A table to store all Import Errors. The ImportErrors are recorded when
parsing DAGs.
- This errors are displayed on the Webserver.
- """
+ """Stores all Import Errors which are recorded when parsing DAGs and
displayed on the Webserver."""
__tablename__ = "import_error"
id = Column(Integer, primary_key=True)
diff --git a/airflow/models/param.py b/airflow/models/param.py
index d9a547b9ad..5bb1db3d44 100644
--- a/airflow/models/param.py
+++ b/airflow/models/param.py
@@ -42,8 +42,9 @@ logger = logging.getLogger(__name__)
class Param:
"""
- Class to hold the default value of a Param and rule set to do the
validations. Without the rule set
- it always validates and returns the default value.
+ Class to hold the default value of a Param and rule set to do the
validations.
+
+ Without the rule set it always validates and returns the default value.
:param default: The value this Param object holds
:param description: Optional help text for the Param
@@ -100,6 +101,7 @@ class Param:
def resolve(self, value: Any = NOTSET, suppress_exception: bool = False)
-> Any:
"""
Runs the validations and returns the Param's final value.
+
May raise ValueError on failed validations, or TypeError
if no value is passed and no value already exists.
We first check that value is json-serializable; if not, warn.
@@ -157,19 +159,21 @@ class Param:
class ParamsDict(MutableMapping[str, Any]):
"""
- Class to hold all params for dags or tasks. All the keys are strictly
string and values
- are converted into Param's object if they are not already. This class is
to replace param's
- dictionary implicitly and ideally not needed to be used directly.
+ Class to hold all params for dags or tasks.
+
+ All the keys are strictly string and values are converted into Param's
object
+ if they are not already. This class is to replace param's dictionary
implicitly
+ and ideally not needed to be used directly.
+
+
+ :param dict_obj: A dict or dict like object to init ParamsDict
+ :param suppress_exception: Flag to suppress value exceptions while
initializing the ParamsDict
"""
__version__: ClassVar[int] = 1
__slots__ = ["__dict", "suppress_exception"]
def __init__(self, dict_obj: MutableMapping | None = None,
suppress_exception: bool = False):
- """
- :param dict_obj: A dict or dict like object to init ParamsDict
- :param suppress_exception: Flag to suppress value exceptions while
initializing the ParamsDict
- """
params_dict: dict[str, Param] = {}
dict_obj = dict_obj or {}
for k, v in dict_obj.items():
@@ -213,8 +217,7 @@ class ParamsDict(MutableMapping[str, Any]):
def __setitem__(self, key: str, value: Any) -> None:
"""
- Override for dictionary's ``setitem`` method. This method make sure
that all values are of
- Param's type only.
+ Override for dictionary's ``setitem`` method to ensure all values are
of Param's type only.
:param key: A key which needs to be inserted or updated in the dict
:param value: A value which needs to be set against the key. It could
be of any
@@ -236,8 +239,7 @@ class ParamsDict(MutableMapping[str, Any]):
def __getitem__(self, key: str) -> Any:
"""
- Override for dictionary's ``getitem`` method. After fetching the key,
it would call the
- resolve method as well on the Param object.
+ Override for dictionary's ``getitem`` method to call the resolve
method after fetching the key.
:param key: The key to fetch
"""
diff --git a/airflow/models/renderedtifields.py
b/airflow/models/renderedtifields.py
index 37a5f08b27..4b2f80e47d 100644
--- a/airflow/models/renderedtifields.py
+++ b/airflow/models/renderedtifields.py
@@ -127,8 +127,7 @@ class RenderedTaskInstanceFields(Base):
@provide_session
def get_templated_fields(cls, ti: TaskInstance, session: Session =
NEW_SESSION) -> dict | None:
"""
- Get templated field for a TaskInstance from the
RenderedTaskInstanceFields
- table.
+ Get templated field for a TaskInstance from the
RenderedTaskInstanceFields table.
:param ti: Task Instance
:param session: SqlAlchemy Session
@@ -153,8 +152,7 @@ class RenderedTaskInstanceFields(Base):
@provide_session
def get_k8s_pod_yaml(cls, ti: TaskInstance, session: Session =
NEW_SESSION) -> dict | None:
"""
- Get rendered Kubernetes Pod Yaml for a TaskInstance from the
RenderedTaskInstanceFields
- table.
+ Get rendered Kubernetes Pod Yaml for a TaskInstance from the
RenderedTaskInstanceFields table.
:param ti: Task Instance
:param session: SqlAlchemy Session
diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index 6ee430bca0..90480d8a33 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -128,7 +128,9 @@ class SerializedDagModel(Base):
processor_subdir: str | None = None,
session: Session = NEW_SESSION,
) -> bool:
- """Serializes a DAG and writes it into database.
+ """
+ Serializes a DAG and writes it into database.
+
If the record already exists, it checks if the Serialized DAG changed
or not. If it is
changed, it updates the record, ignores otherwise.
@@ -225,7 +227,9 @@ class SerializedDagModel(Base):
@classmethod
@provide_session
def remove_dag(cls, dag_id: str, session: Session = NEW_SESSION) -> None:
- """Deletes a DAG with given dag_id.
+ """
+ Deletes a DAG with given dag_id.
+
:param dag_id: dag_id to be deleted
:param session: ORM Session.
"""
@@ -286,8 +290,8 @@ class SerializedDagModel(Base):
def get(cls, dag_id: str, session: Session = NEW_SESSION) ->
SerializedDagModel | None:
"""
Get the SerializedDAG for the given dag ID.
- It will cope with being passed the ID of a subdag by looking up the
- root dag_id from the DAG table.
+
+ It will cope with being passed the ID of a subdag by looking up the
root dag_id from the DAG table.
:param dag_id: the DAG to fetch
:param session: ORM Session
@@ -310,8 +314,9 @@ class SerializedDagModel(Base):
session: Session = NEW_SESSION,
) -> None:
"""
- Saves DAGs as Serialized DAG objects in the database. Each
- DAG is saved in a separate database query.
+ Saves DAGs as Serialized DAG objects in the database.
+
+ Each DAG is saved in a separate database query.
:param dags: the DAG objects to save to the DB
:param session: ORM Session
@@ -330,8 +335,7 @@ class SerializedDagModel(Base):
@provide_session
def get_last_updated_datetime(cls, dag_id: str, session: Session =
NEW_SESSION) -> datetime | None:
"""
- Get the date when the Serialized DAG associated to DAG was last updated
- in serialized_dag table.
+ Get the date when the Serialized DAG associated to DAG was last
updated in serialized_dag table.
:param dag_id: DAG ID
:param session: ORM Session
@@ -368,8 +372,7 @@ class SerializedDagModel(Base):
session: Session,
) -> tuple[str, datetime] | None:
"""
- Get the latest DAG version for a given DAG ID, as well as the date
when the Serialized DAG associated
- to DAG was last updated in serialized_dag table.
+ Get the latest version for a DAG ID and the date it was last updated
in serialized_dag table.
:meta private:
:param dag_id: DAG ID
diff --git a/airflow/models/skipmixin.py b/airflow/models/skipmixin.py
index 10991cadc7..5ddc80308a 100644
--- a/airflow/models/skipmixin.py
+++ b/airflow/models/skipmixin.py
@@ -162,8 +162,9 @@ class SkipMixin(LoggingMixin):
branch_task_ids: None | str | Iterable[str],
):
"""
- This method implements the logic for a branching operator; given a
single
- task ID or list of task IDs to follow, this skips all other tasks
+ This method implements the logic for a branching operator.
+
+ Given a single task ID or list of task IDs to follow, this skips all
other tasks
immediately downstream of this operator.
branch_task_ids is stored to XCom so that NotPreviouslySkippedDep
knows skipped tasks or
diff --git a/airflow/models/slamiss.py b/airflow/models/slamiss.py
index cd2f0f1dd5..4fb7e53a17 100644
--- a/airflow/models/slamiss.py
+++ b/airflow/models/slamiss.py
@@ -26,8 +26,8 @@ from airflow.utils.sqlalchemy import UtcDateTime
class SlaMiss(Base):
"""
Model that stores a history of the SLA that have been missed.
- It is used to keep track of SLA failures over time and to avoid double
- triggering alert emails.
+
+ It is used to keep track of SLA failures over time and to avoid double
triggering alert emails.
"""
__tablename__ = "sla_miss"
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 3e328ec208..4abfd94cd0 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -170,6 +170,7 @@ class TaskReturnCode(Enum):
def set_current_context(context: Context) -> Generator[Context, None, None]:
"""
Sets the current execution context to the provided context object.
+
This method should be called once per Task execution, before calling
operator.execute.
"""
_CURRENT_CONTEXT.append(context)
@@ -208,9 +209,10 @@ def clear_task_instances(
dag_run_state: DagRunState | Literal[False] = DagRunState.QUEUED,
) -> None:
"""
- Clears a set of task instances, but makes sure the running ones
- get killed. Also sets Dagrun's `state` to QUEUED and `start_date`
- to the time of execution. But only for finished DRs (SUCCESS and FAILED).
+ Clears a set of task instances, but makes sure the running ones get killed.
+
+ Also sets Dagrun's `state` to QUEUED and `start_date` to the time of
execution.
+ But only for finished DRs (SUCCESS and FAILED).
Doesn't clear DR's `state` and `start_date`for running
DRs (QUEUED and RUNNING) because clearing the state for already
running DR is redundant and clearing `start_date` affects DR's duration.
@@ -358,9 +360,10 @@ def _creator_note(val):
class TaskInstance(Base, LoggingMixin):
"""
- Task instances store the state of a task instance. This table is the
- authority and single source of truth around what tasks have run and the
- state they are in.
+ Task instances store the state of a task instance.
+
+ This table is the authority and single source of truth around what tasks
+ have run and the state they are in.
The SqlAlchemy model doesn't have a SqlAlchemy foreign key to the task or
dag model deliberately to have more control over transactions.
@@ -586,8 +589,7 @@ class TaskInstance(Base, LoggingMixin):
@hybrid_property
def try_number(self):
"""
- Return the try number that this task number will be when it is actually
- run.
+ Return the try number that this task number will be when it is
actually run.
If the TaskInstance is currently running, this will match the column
in the
database, in all other cases this will be incremented.
@@ -604,15 +606,13 @@ class TaskInstance(Base, LoggingMixin):
@property
def prev_attempted_tries(self) -> int:
"""
- Based on this instance's try_number, this will calculate
- the number of previously attempted tries, defaulting to 0.
- """
- # Expose this for the Task Tries and Gantt graph views.
- # Using `try_number` throws off the counts for non-running tasks.
- # Also useful in error logging contexts to get
- # the try number for the last try that was attempted.
- # https://issues.apache.org/jira/browse/AIRFLOW-2143
+ Calculate the number of previously attempted tries, defaulting to 0.
+ Expose this for the Task Tries and Gantt graph views.
+ Using `try_number` throws off the counts for non-running tasks.
+ Also useful in error logging contexts to get the try number for the
last try that was attempted.
+ https://issues.apache.org/jira/browse/AIRFLOW-2143
+ """
return self._try_number
@property
@@ -640,9 +640,9 @@ class TaskInstance(Base, LoggingMixin):
cfg_path=None,
) -> list[str]:
"""
- Returns a command that can be executed anywhere where airflow is
- installed. This command is part of the message sent to executors by
- the orchestrator.
+ Returns a command that can be executed anywhere where airflow is
installed.
+
+ This command is part of the message sent to executors by the
orchestrator.
"""
dag: DAG | DagModel
# Use the dag if we have it, else fallback to the ORM dag_model, which
might not be loaded
@@ -791,9 +791,10 @@ class TaskInstance(Base, LoggingMixin):
@provide_session
def current_state(self, session: Session = NEW_SESSION) -> str:
"""
- Get the very latest state from the database, if a session is passed,
- we use and looking up the state becomes part of the session, otherwise
- a new session is used.
+ Get the very latest state from the database.
+
+ If a session is passed, we use and looking up the state becomes part
of the session,
+ otherwise a new session is used.
sqlalchemy.inspect is used here to get the primary keys ensuring that
if they change
it will not regress
@@ -948,10 +949,7 @@ class TaskInstance(Base, LoggingMixin):
@property
def is_premature(self) -> bool:
- """
- Returns whether a task is in UP_FOR_RETRY state and its retry interval
- has elapsed.
- """
+ """Returns whether a task is in UP_FOR_RETRY state and its retry
interval has elapsed."""
# is the task still in the retry waiting period?
return self.state == TaskInstanceState.UP_FOR_RETRY and not
self.ready_for_retry()
@@ -959,6 +957,7 @@ class TaskInstance(Base, LoggingMixin):
def are_dependents_done(self, session: Session = NEW_SESSION) -> bool:
"""
Checks whether the immediate dependents of this task instance have
succeeded or have been skipped.
+
This is meant to be used by wait_for_downstream.
This is useful when you do not want to start processing the next
@@ -1035,7 +1034,8 @@ class TaskInstance(Base, LoggingMixin):
def previous_ti(self) -> TaskInstance | None:
"""
This attribute is deprecated.
- Please use `airflow.models.taskinstance.TaskInstance.get_previous_ti`
method.
+
+ Please use
:class:`airflow.models.taskinstance.TaskInstance.get_previous_ti`.
"""
warnings.warn(
"""
@@ -1051,7 +1051,8 @@ class TaskInstance(Base, LoggingMixin):
def previous_ti_success(self) -> TaskInstance | None:
"""
This attribute is deprecated.
- Please use `airflow.models.taskinstance.TaskInstance.get_previous_ti`
method.
+
+ Please use
:class:`airflow.models.taskinstance.TaskInstance.get_previous_ti`.
"""
warnings.warn(
"""
@@ -1098,7 +1099,8 @@ class TaskInstance(Base, LoggingMixin):
def previous_start_date_success(self) -> pendulum.DateTime | None:
"""
This attribute is deprecated.
- Please use
`airflow.models.taskinstance.TaskInstance.get_previous_start_date` method.
+
+ Please use
:class:`airflow.models.taskinstance.TaskInstance.get_previous_start_date`.
"""
warnings.warn(
"""
@@ -1115,15 +1117,13 @@ class TaskInstance(Base, LoggingMixin):
self, dep_context: DepContext | None = None, session: Session =
NEW_SESSION, verbose: bool = False
) -> bool:
"""
- Returns whether or not all the conditions are met for this task
instance to be run
- given the context for the dependencies (e.g. a task instance being
force run from
- the UI will ignore some dependencies).
+ Are all conditions met for this task instance to be run given the
context for the dependencies.
+
+ (e.g. a task instance being force run from the UI will ignore some
dependencies).
- :param dep_context: The execution context that determines the
dependencies that
- should be evaluated.
+ :param dep_context: The execution context that determines the
dependencies that should be evaluated.
:param session: database session
- :param verbose: whether log details on failed dependencies on
- info or debug log level
+ :param verbose: whether log details on failed dependencies on info or
debug log level
"""
dep_context = dep_context or DepContext()
failed = False
@@ -1169,8 +1169,9 @@ class TaskInstance(Base, LoggingMixin):
def next_retry_datetime(self):
"""
- Get datetime of the next retry if the task instance fails. For
exponential
- backoff, retry_delay is used as base and will be converted to seconds.
+ Get datetime of the next retry if the task instance fails.
+
+ For exponential backoff, retry_delay is used as base and will be
converted to seconds.
"""
from airflow.models.abstractoperator import MAX_RETRY_DELAY
@@ -1209,10 +1210,7 @@ class TaskInstance(Base, LoggingMixin):
return self.end_date + delay
def ready_for_retry(self) -> bool:
- """
- Checks on whether the task instance is in the right state and timeframe
- to be retried.
- """
+ """Checks on whether the task instance is in the right state and
timeframe to be retried."""
return self.state == TaskInstanceState.UP_FOR_RETRY and
self.next_retry_datetime() < timezone.utcnow()
@provide_session
@@ -1256,8 +1254,9 @@ class TaskInstance(Base, LoggingMixin):
session: Session = NEW_SESSION,
) -> bool:
"""
- Checks dependencies and then sets state to RUNNING if they are met.
Returns
- True if and only if state is set to RUNNING, which implies that task
should be
+ Checks dependencies and then sets state to RUNNING if they are met.
+
+ Returns True if and only if state is set to RUNNING, which implies
that task should be
executed, in preparation for _run_raw_task.
:param verbose: whether to turn on more verbose logging
@@ -1394,6 +1393,7 @@ class TaskInstance(Base, LoggingMixin):
def emit_state_change_metric(self, new_state: TaskInstanceState) -> None:
"""
Sends a time metric representing how much time a given state
transition took.
+
The previous state and metric name is deduced from the state the task
was put in.
:param new_state: The state that has just been set for this task.
@@ -1456,6 +1456,8 @@ class TaskInstance(Base, LoggingMixin):
session: Session = NEW_SESSION,
) -> TaskReturnCode | None:
"""
+ Run a task, update the state upon completion, and run any appropriate
callbacks.
+
Immediately runs the task (without checking or changing db state
before execution) and then sets the appropriate final state after
completion and runs any post-execute callbacks. Meant to be called
@@ -1739,10 +1741,7 @@ class TaskInstance(Base, LoggingMixin):
@provide_session
def _defer_task(self, session: Session, defer: TaskDeferred) -> None:
- """
- Marks the task as deferred and sets up the trigger that is needed
- to resume it.
- """
+ """Marks the task as deferred and sets up the trigger that is needed
to resume it."""
from airflow.models.trigger import Trigger
# First, make the trigger entry
@@ -2222,6 +2221,7 @@ class TaskInstance(Base, LoggingMixin):
def get_rendered_template_fields(self, session: Session = NEW_SESSION) ->
None:
"""
Update task with rendered template fields for presentation in UI.
+
If task has already run, will fetch from DB; otherwise will render.
"""
from airflow.models.renderedtifields import RenderedTaskInstanceFields
@@ -2677,8 +2677,7 @@ class TaskInstance(Base, LoggingMixin):
@classmethod
def ti_selector_condition(cls, vals: Collection[str | tuple[str, int]]) ->
ColumnOperators:
"""
- Build an SQLAlchemy filter for a list where each element can contain
- whether a task_id, or a tuple of (task_id,map_index).
+ Build an SQLAlchemy filter for a list of task_ids or tuples of
(task_id,map_index).
:meta private:
"""
diff --git a/airflow/models/taskmixin.py b/airflow/models/taskmixin.py
index 3a4f17974b..0dd013a579 100644
--- a/airflow/models/taskmixin.py
+++ b/airflow/models/taskmixin.py
@@ -90,6 +90,7 @@ class DependencyMixin:
) -> None:
"""
Update relationship information about another TaskMixin. Default is
no-op.
+
Override if necessary.
"""
@@ -169,8 +170,9 @@ class TaskMixin(DependencyMixin):
class DAGNode(DependencyMixin, metaclass=ABCMeta):
"""
- A base class for a node in the graph of a workflow -- an Operator or a
Task Group, either mapped or
- unmapped.
+ A base class for a node in the graph of a workflow.
+
+ A node may be an Operator or a Task Group, either mapped or unmapped.
"""
dag: DAG | None = None
@@ -307,20 +309,14 @@ class DAGNode(DependencyMixin, metaclass=ABCMeta):
return [self.dag.get_task(tid) for tid in self.upstream_task_ids]
def get_direct_relative_ids(self, upstream: bool = False) -> set[str]:
- """
- Get set of the direct relative ids to the current task, upstream or
- downstream.
- """
+ """Get set of the direct relative ids to the current task, upstream or
downstream."""
if upstream:
return self.upstream_task_ids
else:
return self.downstream_task_ids
def get_direct_relatives(self, upstream: bool = False) ->
Iterable[DAGNode]:
- """
- Get list of the direct relatives to the current task, upstream or
- downstream.
- """
+ """Get list of the direct relatives to the current task, upstream or
downstream."""
if upstream:
return self.upstream_list
else:
diff --git a/airflow/models/taskreschedule.py b/airflow/models/taskreschedule.py
index d8fe2c3ff2..7b642f0a26 100644
--- a/airflow/models/taskreschedule.py
+++ b/airflow/models/taskreschedule.py
@@ -135,8 +135,7 @@ class TaskReschedule(Base):
try_number: int | None = None,
) -> list[TaskReschedule]:
"""
- Returns all task reschedules for the task instance and try number,
- in ascending order.
+ Returns all task reschedules for the task instance and try number, in
ascending order.
:param session: the database session object
:param task_instance: the task instance to find task reschedules for
diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index c0d749eb59..4f50918a09 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -36,6 +36,8 @@ from airflow.utils.state import TaskInstanceState
class Trigger(Base):
"""
+ Base Trigger class.
+
Triggers are a workload that run in an asynchronous event loop shared with
other Triggers, and fire off events that will unpause deferred Tasks,
start linked DAGs, etc.
@@ -82,10 +84,7 @@ class Trigger(Base):
@classmethod
@internal_api_call
def from_object(cls, trigger: BaseTrigger) -> Trigger:
- """
- Alternative constructor that creates a trigger row based directly
- off of a Trigger object.
- """
+ """Alternative constructor that creates a trigger row based directly
off of a Trigger object."""
classpath, kwargs = trigger.serialize()
return cls(classpath=classpath, kwargs=kwargs)
@@ -93,10 +92,7 @@ class Trigger(Base):
@internal_api_call
@provide_session
def bulk_fetch(cls, ids: Iterable[int], session: Session = NEW_SESSION) ->
dict[int, Trigger]:
- """
- Fetches all the Triggers by ID and returns a dict mapping
- ID -> Trigger instance.
- """
+ """Fetches all the Triggers by ID and returns a dict mapping ID ->
Trigger instance."""
query = (
session.query(cls)
.filter(cls.id.in_(ids))
@@ -143,10 +139,7 @@ class Trigger(Base):
@internal_api_call
@provide_session
def submit_event(cls, trigger_id, event, session: Session = NEW_SESSION)
-> None:
- """
- Takes an event from an instance of itself, and triggers all dependent
- tasks to resume.
- """
+ """Takes an event from an instance of itself, and triggers all
dependent tasks to resume."""
for task_instance in session.query(TaskInstance).filter(
TaskInstance.trigger_id == trigger_id, TaskInstance.state ==
TaskInstanceState.DEFERRED
):
@@ -164,11 +157,11 @@ class Trigger(Base):
@provide_session
def submit_failure(cls, trigger_id, exc=None, session: Session =
NEW_SESSION) -> None:
"""
- Called when a trigger has failed unexpectedly, and we need to mark
- everything that depended on it as failed. Notably, we have to actually
- run the failure code from a worker as it may have linked callbacks, so
- hilariously we have to re-schedule the task instances to a worker just
- so they can then fail.
+ When a trigger has failed unexpectedly, mark everything that depended
on it as failed.
+
+ Notably, we have to actually run the failure code from a worker as it
may
+ have linked callbacks, so hilariously we have to re-schedule the task
+ instances to a worker just so they can then fail.
We use a special __fail__ value for next_method to achieve this that
the runtime code understands as immediate-fail, and pack the error into
@@ -202,6 +195,8 @@ class Trigger(Base):
@provide_session
def assign_unassigned(cls, triggerer_id, capacity, heartrate, session:
Session = NEW_SESSION) -> None:
"""
+ Assign unassigned triggers based on a number of conditions.
+
Takes a triggerer_id, the capacity for that triggerer and the
Triggerer job heartrate,
and assigns unassigned triggers until that capacity is reached, or
there are no more
unassigned triggers.
diff --git a/airflow/models/variable.py b/airflow/models/variable.py
index bba785ee0a..edfe2dea96 100644
--- a/airflow/models/variable.py
+++ b/airflow/models/variable.py
@@ -38,10 +38,7 @@ log = logging.getLogger(__name__)
class Variable(Base, LoggingMixin):
- """
- Variables are a generic way to store and retrieve arbitrary content or
settings
- as a simple key value store within Airflow.
- """
+ """A generic way to store and retrieve arbitrary content or settings as a
simple key/value store."""
__tablename__ = "variable"
__NO_DEFAULT_SENTINEL = object()
@@ -99,8 +96,9 @@ class Variable(Base, LoggingMixin):
@classmethod
def setdefault(cls, key, default, description=None,
deserialize_json=False):
"""
- Like a Python builtin dict object, setdefault returns the current value
- for a key, and if it isn't there, stores the default value and returns
it.
+ Return the current value for a key or store the default value and
return it.
+
+ Works the same as the Python builtin dict object.
:param key: Dict key for this Variable
:param default: Default value to set and return if the variable
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index 42485a5830..50e62ea29f 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -122,7 +122,8 @@ class BaseXCom(Base, LoggingMixin):
@reconstructor
def init_on_load(self):
"""
- Called by the ORM after the instance has been loaded from the DB or
otherwise reconstituted
+ Called by the ORM after the instance has been loaded from the DB or
otherwise reconstituted.
+
i.e automatically deserialize Xcom value when loading from DB.
"""
self.value = self.orm_deserialize_value()