This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2794c4172e D205 Support - Auto-fixes and Stragglers (#32212)
2794c4172e is described below
commit 2794c4172e7b7655b813236fe47222cec46f57e4
Author: D. Ferruzzi <[email protected]>
AuthorDate: Tue Jun 27 12:55:50 2023 -0700
D205 Support - Auto-fixes and Stragglers (#32212)
Includes anything ruff could automatically fix and anything I saw that was
only one or two files in the directory.
---
airflow/cli/commands/celery_command.py | 4 +++-
airflow/cli/commands/standalone_command.py | 1 +
airflow/configuration.py | 1 +
airflow/jobs/backfill_job_runner.py | 3 +--
airflow/jobs/base_job_runner.py | 3 +--
airflow/jobs/job.py | 15 ++++++---------
airflow/kubernetes/pod_launcher.py | 1 +
airflow/metrics/otel_logger.py | 11 ++++-------
airflow/metrics/validators.py | 11 +++++------
airflow/migrations/utils.py | 10 +++++-----
airflow/operators/python.py | 1 +
airflow/operators/subdag.py | 6 +++---
airflow/operators/trigger_dagrun.py | 1 +
airflow/plugins_manager.py | 4 ++--
airflow/policies.py | 8 ++++++--
airflow/providers/amazon/aws/hooks/emr.py | 1 -
airflow/providers/oracle/transfers/oracle_to_oracle.py | 1 -
airflow/secrets/base_secrets.py | 1 +
airflow/sensors/external_task.py | 1 +
airflow/sentry.py | 1 +
airflow/serialization/pydantic/dataset.py | 10 ++--------
airflow/serialization/serde.py | 3 +--
airflow/serialization/serialized_objects.py | 5 ++++-
airflow/ti_deps/dep_context.py | 7 +++----
airflow/ti_deps/deps/ready_to_reschedule.py | 1 +
airflow/ti_deps/deps/trigger_rule_dep.py | 5 +----
airflow/triggers/base.py | 5 +----
airflow/triggers/external_task.py | 13 +++----------
docs/apache-airflow/img/airflow_erd.sha256 | 2 +-
docs/apache-airflow/img/airflow_erd.svg | 8 ++++----
30 files changed, 65 insertions(+), 79 deletions(-)
diff --git a/airflow/cli/commands/celery_command.py
b/airflow/cli/commands/celery_command.py
index f3838ec4a9..2ac976d10c 100644
--- a/airflow/cli/commands/celery_command.py
+++ b/airflow/cli/commands/celery_command.py
@@ -102,7 +102,9 @@ def _serve_logs(skip_serve_logs: bool = False):
@after_setup_logger.connect()
def logger_setup_handler(logger, **kwargs):
- """Reconfigure the logger:
+ """
+ Reconfigure the logger.
+
* remove any previously configured handlers
* logs of severity error, and above goes to stderr,
* logs of severity lower than error goes to stdout.
diff --git a/airflow/cli/commands/standalone_command.py
b/airflow/cli/commands/standalone_command.py
index 35bd49ab68..9bdd49a82a 100644
--- a/airflow/cli/commands/standalone_command.py
+++ b/airflow/cli/commands/standalone_command.py
@@ -213,6 +213,7 @@ class StandaloneCommand:
def is_ready(self):
"""
Detects when all Airflow components are ready to serve.
+
For now, it's simply time-based.
"""
return (
diff --git a/airflow/configuration.py b/airflow/configuration.py
index ea3be518ae..288595efbd 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -1690,6 +1690,7 @@ def set(*args, **kwargs) -> None:
def ensure_secrets_loaded() -> list[BaseSecretsBackend]:
"""
Ensure that all secrets backends are loaded.
+
If the secrets_backend_list contains only 2 default backends, reload it.
"""
# Check if the secrets_backend_list contains only 2 default backends
diff --git a/airflow/jobs/backfill_job_runner.py
b/airflow/jobs/backfill_job_runner.py
index 40c31c4451..6ae6ae27cb 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -819,8 +819,7 @@ class BackfillJobRunner(BaseJobRunner[Job], LoggingMixin):
session: Session = NEW_SESSION,
) -> None:
"""
- Go through the dag_runs and update the state based on the
task_instance state.
- Then set DAG runs that are not finished to failed.
+ Update the state of each dagrun based on the task_instance state and
set unfinished runs to failed.
:param dag_runs: DAG runs
:param session: session
diff --git a/airflow/jobs/base_job_runner.py b/airflow/jobs/base_job_runner.py
index 0a3222bc14..fd3060db81 100644
--- a/airflow/jobs/base_job_runner.py
+++ b/airflow/jobs/base_job_runner.py
@@ -46,8 +46,7 @@ class BaseJobRunner(Generic[J]):
def _execute(self) -> int | None:
"""
- Executes the logic connected to the runner. This method should be
- overridden by subclasses.
+ Executes the logic connected to the runner. This method should be
overridden by subclasses.
:meta private:
:return: return code if available, otherwise None
diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py
index fe75508f63..3c2caa3805 100644
--- a/airflow/jobs/job.py
+++ b/airflow/jobs/job.py
@@ -160,16 +160,13 @@ class Job(Base, LoggingMixin):
self, heartbeat_callback: Callable[[Session], None], session: Session
= NEW_SESSION
) -> None:
"""
- Heartbeats update the job's entry in the database with a timestamp
- for the latest_heartbeat and allows for the job to be killed
- externally. This allows at the system level to monitor what is
- actually active.
+ Update the job's entry in the database with the latest_heartbeat
timestamp.
- For instance, an old heartbeat for SchedulerJob would mean something
- is wrong.
-
- This also allows for any job to be killed externally, regardless
- of who is running it or on which machine it is running.
+ This allows for the job to be killed externally and allows the system
+ to monitor what is actually active. For instance, an old heartbeat
+ for SchedulerJob would mean something is wrong. This also allows for
+ any job to be killed externally, regardless of who is running it or on
+ which machine it is running.
Note that if your heart rate is set to 60 seconds and you call this
method after 10 seconds of processing since the last heartbeat, it
diff --git a/airflow/kubernetes/pod_launcher.py
b/airflow/kubernetes/pod_launcher.py
index bd52f49653..25a97921c3 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -17,6 +17,7 @@
# under the License.
"""
This module is deprecated.
+
Please use :mod:`kubernetes.client.models` for V1ResourceRequirements and Port.
"""
from __future__ import annotations
diff --git a/airflow/metrics/otel_logger.py b/airflow/metrics/otel_logger.py
index 6f1b2f9697..0cb3289cb5 100644
--- a/airflow/metrics/otel_logger.py
+++ b/airflow/metrics/otel_logger.py
@@ -110,11 +110,10 @@ def _type_as_str(obj: Instrument) -> str:
def _get_otel_safe_name(name: str) -> str:
"""
- OpenTelemetry has a maximum length for metric names. This method returns
the
- name, truncated if it is too long, and logs a warning so the user will
know.
+ Verifies that the provided name does not exceed OpenTelemetry's maximum
length for metric names.
:param name: The original metric name
- :returns: The name, truncated to an OTel-acceptable length if required
+ :returns: The name, truncated to an OTel-acceptable length if required.
"""
otel_safe_name = name[:OTEL_NAME_MAX_LENGTH]
if name != otel_safe_name:
@@ -134,6 +133,7 @@ def _skip_due_to_rate(rate: float) -> bool:
class _OtelTimer(Timer):
"""
An implementation of Stats.Timer() which records the result in the OTel
Metrics Map.
+
OpenTelemetry does not have a native timer, we will store the values as a
Gauge.
:param name: The name of the timer.
@@ -262,10 +262,7 @@ class SafeOtelLogger:
*,
tags: Attributes = None,
) -> None:
- """
- OpenTelemetry does not have a native timer, they are stored
- as a Gauge whose value represents the seconds elapsed.
- """
+ """OTel does not have a native timer, stored as a Gauge whose value is
number of seconds elapsed."""
if self.metrics_validator.test(stat) and
name_is_otel_safe(self.prefix, stat):
if isinstance(dt, datetime.timedelta):
dt = dt.total_seconds()
diff --git a/airflow/metrics/validators.py b/airflow/metrics/validators.py
index fd43a8adf3..0965b534ff 100644
--- a/airflow/metrics/validators.py
+++ b/airflow/metrics/validators.py
@@ -36,6 +36,7 @@ log = logging.getLogger(__name__)
class MetricNameLengthExemptionWarning(Warning):
"""
A Warning class to be used for the metric name length exemption notice.
+
Using a custom Warning class allows us to easily test that it is used.
"""
@@ -83,10 +84,7 @@ OTEL_NAME_MAX_LENGTH = 63
def validate_stat(fn: Callable) -> Callable:
- """
- Check if stat name contains invalid characters.
- Log and not emit stats if name is invalid.
- """
+ """Check if stat name contains invalid characters; logs and does not emit
stats if name is invalid."""
@wraps(fn)
def wrapper(self, stat: str | None = None, *args, **kwargs) -> Callable |
None:
@@ -200,8 +198,9 @@ def get_current_handler_stat_name_func() -> Callable[[str],
str]:
class ListValidator(metaclass=abc.ABCMeta):
"""
- ListValidator metaclass that can be implemented as a AllowListValidator
- or BlockListValidator. The test method must be overridden by its subclass.
+ ListValidator metaclass that can be implemented as a AllowListValidator or
BlockListValidator.
+
+ The test method must be overridden by its subclass.
"""
def __init__(self, validate_list: str | None = None) -> None:
diff --git a/airflow/migrations/utils.py b/airflow/migrations/utils.py
index c1182287a6..a5a65c6745 100644
--- a/airflow/migrations/utils.py
+++ b/airflow/migrations/utils.py
@@ -24,11 +24,11 @@ from sqlalchemy import text
def get_mssql_table_constraints(conn, table_name) -> dict[str, dict[str,
list[str]]]:
"""
- This function return primary and unique constraint
- along with column name. Some tables like `task_instance`
- is missing the primary key constraint name and the name is
- auto-generated by the SQL server. so this function helps to
- retrieve any primary or unique constraint name.
+ Returns the primary and unique constraint along with column name.
+
+ Some tables like `task_instance` are missing the primary key constraint
+ name and the name is auto-generated by the SQL server, so this function
+ helps to retrieve any primary or unique constraint name.
:param conn: sql connection object
:param table_name: table name
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 23553e5a89..a0ea5be80c 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -755,6 +755,7 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator):
def get_current_context() -> Context:
"""
Retrieve the execution context dictionary without altering user method's
signature.
+
This is the simplest method of retrieving the execution context dictionary.
**Old style:**
diff --git a/airflow/operators/subdag.py b/airflow/operators/subdag.py
index de7afcb4ea..30daf638bd 100644
--- a/airflow/operators/subdag.py
+++ b/airflow/operators/subdag.py
@@ -16,7 +16,8 @@
# specific language governing permissions and limitations
# under the License.
"""
-This module is deprecated. Please use :mod:`airflow.utils.task_group`.
+This module is deprecated, please use :mod:`airflow.utils.task_group`.
+
The module which provides a way to nest your DAGs and so your levels of
complexity.
"""
from __future__ import annotations
@@ -49,8 +50,7 @@ class SkippedStatePropagationOptions(Enum):
class SubDagOperator(BaseSensorOperator):
"""
- This class is deprecated.
- Please use `airflow.utils.task_group.TaskGroup`.
+ This class is deprecated, please use `airflow.utils.task_group.TaskGroup`.
This runs a sub dag. By convention, a sub dag's dag_id
should be prefixed by its parent and a dot. As in `parent.child`.
diff --git a/airflow/operators/trigger_dagrun.py
b/airflow/operators/trigger_dagrun.py
index a8efa003f4..0165bb470e 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -52,6 +52,7 @@ if TYPE_CHECKING:
class TriggerDagRunLink(BaseOperatorLink):
"""
Operator link for TriggerDagRunOperator.
+
It allows users to access DAG triggered by task using
TriggerDagRunOperator.
"""
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index 7b34ceebac..946f064269 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -175,8 +175,7 @@ class AirflowPlugin:
@classmethod
def on_load(cls, *args, **kwargs):
"""
- Executed when the plugin is loaded.
- This method is only called once during runtime.
+ Executed when the plugin is loaded; This method is only called once
during runtime.
:param args: If future arguments are passed in on call.
:param kwargs: If future arguments are passed in on call.
@@ -217,6 +216,7 @@ def register_plugin(plugin_instance):
def load_entrypoint_plugins():
"""
Load and register plugins AirflowPlugin subclasses from the entrypoints.
+
The entry_point group should be 'airflow.plugins'.
"""
global import_errors
diff --git a/airflow/policies.py b/airflow/policies.py
index c35ae827ee..175ae5bbf9 100644
--- a/airflow/policies.py
+++ b/airflow/policies.py
@@ -95,6 +95,8 @@ def pod_mutation_hook(pod) -> None:
@local_settings_hookspec(firstresult=True)
def get_airflow_context_vars(context) -> dict[str, str]: # type:
ignore[empty-body]
"""
+ Inject airflow context vars into default airflow context vars.
+
This setting allows getting the airflow context vars, which are key value
pairs. They are then injected
to default airflow context vars, which in the end are available as
environment variables when running
tasks dag_id, task_id, execution_date, dag_run_id, try_number are reserved
keys.
@@ -138,8 +140,10 @@ class DefaultPolicy:
def make_plugin_from_local_settings(pm: pluggy.PluginManager, module, names:
list[str]):
"""
- Turn the functions from airflow_local_settings module into a custom/local
plugin, so that
- plugin-registered functions can co-operate with pluggy/setuptool
entrypoint plugins of the same methods.
+ Turn the functions from airflow_local_settings module into a custom/local
plugin.
+
+ Allows plugin-registered functions to co-operate with pluggy/setuptool
+ entrypoint plugins of the same methods.
Airflow local settings will be "win" (i.e. they have the final say) as
they are the last plugin
registered.
diff --git a/airflow/providers/amazon/aws/hooks/emr.py
b/airflow/providers/amazon/aws/hooks/emr.py
index 4effe18ba2..9685678ec1 100644
--- a/airflow/providers/amazon/aws/hooks/emr.py
+++ b/airflow/providers/amazon/aws/hooks/emr.py
@@ -185,7 +185,6 @@ class EmrHook(AwsBaseHook):
"""
Return failed state for test Amazon Elastic MapReduce Connection
(untestable).
-
We need to overwrite this method because this hook is based on
:class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook`,
otherwise it will try to test connection to AWS STS by using the
default boto3 credential strategy.
diff --git a/airflow/providers/oracle/transfers/oracle_to_oracle.py
b/airflow/providers/oracle/transfers/oracle_to_oracle.py
index bc603bcd17..9e54a6d756 100644
--- a/airflow/providers/oracle/transfers/oracle_to_oracle.py
+++ b/airflow/providers/oracle/transfers/oracle_to_oracle.py
@@ -30,7 +30,6 @@ class OracleToOracleOperator(BaseOperator):
"""
Moves data from Oracle to Oracle.
-
:param oracle_destination_conn_id: destination Oracle connection.
:param destination_table: destination table to insert rows.
:param oracle_source_conn_id: :ref:`Source Oracle connection
<howto/connection:oracle>`.
diff --git a/airflow/secrets/base_secrets.py b/airflow/secrets/base_secrets.py
index 7ba9c47ee0..3346d880f2 100644
--- a/airflow/secrets/base_secrets.py
+++ b/airflow/secrets/base_secrets.py
@@ -54,6 +54,7 @@ class BaseSecretsBackend(ABC):
def deserialize_connection(self, conn_id: str, value: str) -> Connection:
"""
Given a serialized representation of the airflow Connection, return an
instance.
+
Looks at first character to determine how to deserialize.
:param conn_id: connection id
diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py
index 69ba41ef09..bacafc979c 100644
--- a/airflow/sensors/external_task.py
+++ b/airflow/sensors/external_task.py
@@ -487,6 +487,7 @@ class ExternalTaskMarker(EmptyOperator):
class ExternalTaskSensorLink(ExternalDagLink):
"""
This external link is deprecated.
+
Please use :class:`airflow.sensors.external_task.ExternalDagLink`.
"""
diff --git a/airflow/sentry.py b/airflow/sentry.py
index 8742552e7e..3e222405c4 100644
--- a/airflow/sentry.py
+++ b/airflow/sentry.py
@@ -157,6 +157,7 @@ if conf.getboolean("sentry", "sentry_on", fallback=False):
def enrich_errors(self, func):
"""
Decorate errors.
+
Wrap TaskInstance._run_raw_task and
LocalTaskJob._run_mini_scheduler_on_child_tasks
to support task specific tags and breadcrumbs.
"""
diff --git a/airflow/serialization/pydantic/dataset.py
b/airflow/serialization/pydantic/dataset.py
index 4ffcb6dc02..5ee0d18128 100644
--- a/airflow/serialization/pydantic/dataset.py
+++ b/airflow/serialization/pydantic/dataset.py
@@ -21,10 +21,7 @@ from pydantic import BaseModel as BaseModelPydantic
class DagScheduleDatasetReferencePydantic(BaseModelPydantic):
- """
- Serializable representation of the DagScheduleDatasetReference
- ORM SqlAlchemyModel used by internal API.
- """
+ """Serializable version of the DagScheduleDatasetReference ORM
SqlAlchemyModel used by internal API."""
dataset_id: int
dag_id: str
@@ -38,10 +35,7 @@ class DagScheduleDatasetReferencePydantic(BaseModelPydantic):
class TaskOutletDatasetReferencePydantic(BaseModelPydantic):
- """
- Serializable representation of the
- TaskOutletDatasetReference ORM SqlAlchemyModel used by internal API.
- """
+ """Serializable version of the TaskOutletDatasetReference ORM
SqlAlchemyModel used by internal API."""
dataset_id: int
dag_id = str
diff --git a/airflow/serialization/serde.py b/airflow/serialization/serde.py
index 16621f9d94..b7e9443441 100644
--- a/airflow/serialization/serde.py
+++ b/airflow/serialization/serde.py
@@ -179,8 +179,7 @@ def serialize(o: object, depth: int = 0) -> U | None:
def deserialize(o: T | None, full=True, type_hint: Any = None) -> object:
"""
- Deserializes an object of primitive type T into an object. Uses an allow
- list to determine if a class can be loaded.
+ Deserialize an object of primitive type and uses an allow list to
determine if a class can be loaded.
:param o: primitive to deserialize into an arbitrary object.
:param full: if False it will return a stringified representation
diff --git a/airflow/serialization/serialized_objects.py
b/airflow/serialization/serialized_objects.py
index 8e53aa3465..4f5c61943e 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -1088,6 +1088,7 @@ class SerializedBaseOperator(BaseOperator,
BaseSerialization):
def _deserialize_operator_extra_links(cls, encoded_op_links: list) ->
dict[str, BaseOperatorLink]:
"""
Deserialize Operator Links if the Classes are registered in Airflow
Plugins.
+
Error is raised if the OperatorLink is not found in Plugins too.
:param encoded_op_links: Serialized Operator Link
@@ -1447,7 +1448,9 @@ class TaskGroupSerialization(BaseSerialization):
@dataclass(frozen=True, order=True)
class DagDependency:
- """Dataclass for representing dependencies between DAGs.
+ """
+ Dataclass for representing dependencies between DAGs.
+
These are calculated during serialization and attached to serialized DAGs.
"""
diff --git a/airflow/ti_deps/dep_context.py b/airflow/ti_deps/dep_context.py
index f9815ba468..5a19c5bfc7 100644
--- a/airflow/ti_deps/dep_context.py
+++ b/airflow/ti_deps/dep_context.py
@@ -85,11 +85,10 @@ class DepContext:
def ensure_finished_tis(self, dag_run: DagRun, session: Session) ->
list[TaskInstance]:
"""
- This method makes sure finished_tis is populated if it's currently
None.
- This is for the strange feature of running tasks without dag_run.
+ Ensures finished_tis is populated if it's currently None, which allows
running tasks without dag_run.
- :param dag_run: The DagRun for which to find finished tasks
- :return: A list of all the finished tasks of this DAG and
execution_date
+ :param dag_run: The DagRun for which to find finished tasks
+ :return: A list of all the finished tasks of this DAG and
execution_date
"""
if self.finished_tis is None:
finished_tis = dag_run.get_task_instances(state=State.finished,
session=session)
diff --git a/airflow/ti_deps/deps/ready_to_reschedule.py
b/airflow/ti_deps/deps/ready_to_reschedule.py
index 66aa5c5613..c20ef98a08 100644
--- a/airflow/ti_deps/deps/ready_to_reschedule.py
+++ b/airflow/ti_deps/deps/ready_to_reschedule.py
@@ -37,6 +37,7 @@ class ReadyToRescheduleDep(BaseTIDep):
def _get_dep_statuses(self, ti, session, dep_context):
"""
Determines whether a task is ready to be rescheduled.
+
Only tasks in NONE state with at least one row in task_reschedule
table are
handled by this dependency class, otherwise this dependency is
considered as passed.
This dependency fails if the latest reschedule request's reschedule
date is still
diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py
b/airflow/ti_deps/deps/trigger_rule_dep.py
index df0101a4f7..b1c2d09b96 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -84,10 +84,7 @@ class _UpstreamTIStates(NamedTuple):
class TriggerRuleDep(BaseTIDep):
- """
- Determines if a task's upstream tasks are in a state that allows a given
task instance
- to run.
- """
+ """Determines if a task's upstream tasks are in a state that allows a
given task instance to run."""
NAME = "Trigger Rule"
IGNORABLE = True
diff --git a/airflow/triggers/base.py b/airflow/triggers/base.py
index 314d97b0ee..87dd12f317 100644
--- a/airflow/triggers/base.py
+++ b/airflow/triggers/base.py
@@ -44,10 +44,7 @@ class BaseTrigger(abc.ABC, LoggingMixin):
self.trigger_id = None
def _set_context(self, context):
- """
- This method, part of LoggingMixin, is used mainly for configuration of
logging
- for tasks, but is not used for triggers.
- """
+ """Part of LoggingMixin and used mainly for configuration of task
logging; not used for triggers."""
raise NotImplementedError
@abc.abstractmethod
diff --git a/airflow/triggers/external_task.py
b/airflow/triggers/external_task.py
index bbc576f321..547556e315 100644
--- a/airflow/triggers/external_task.py
+++ b/airflow/triggers/external_task.py
@@ -31,8 +31,7 @@ from airflow.utils.session import NEW_SESSION, provide_session
class TaskStateTrigger(BaseTrigger):
"""
- Waits asynchronously for a task in a different DAG to complete for a
- specific logical date.
+ Waits asynchronously for a task in a different DAG to complete for a
specific logical date.
:param dag_id: The dag_id that contains the task you want to wait for
:param task_id: The task_id that contains the task you want to
@@ -72,10 +71,7 @@ class TaskStateTrigger(BaseTrigger):
)
async def run(self) -> typing.AsyncIterator[TriggerEvent]:
- """
- Checks periodically in the database to see if the task exists, and has
- hit one of the states yet, or not.
- """
+ """Checks periodically in the database to see if the task exists and
has hit one of the states."""
while True:
# mypy confuses typing here
num_tasks = await self.count_tasks() # type: ignore[call-arg]
@@ -137,10 +133,7 @@ class DagStateTrigger(BaseTrigger):
)
async def run(self) -> typing.AsyncIterator[TriggerEvent]:
- """
- Checks periodically in the database to see if the dag run exists, and
has
- hit one of the states yet, or not.
- """
+ """Checks periodically in the database to see if the dag run exists
and has hit one of the states."""
while True:
# mypy confuses typing here
num_dags = await self.count_dags() # type: ignore[call-arg]
diff --git a/docs/apache-airflow/img/airflow_erd.sha256
b/docs/apache-airflow/img/airflow_erd.sha256
index 0536bf30d2..b8f3bd2d18 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-4bd168cc3731503483240260d78ddf0b610de6c79345b0cc4c749a92451e3b1d
\ No newline at end of file
+97c45f96cf0854ba57c0bc350d7e5412ae7afdb860c6ad05b08338ba7e390bd4
\ No newline at end of file
diff --git a/docs/apache-airflow/img/airflow_erd.svg
b/docs/apache-airflow/img/airflow_erd.svg
index 9a0481f657..69577c77e5 100644
--- a/docs/apache-airflow/img/airflow_erd.svg
+++ b/docs/apache-airflow/img/airflow_erd.svg
@@ -1225,28 +1225,28 @@
<g id="edge48" class="edge">
<title>task_instance--xcom</title>
<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1137.01,-488.65C1161.12,-465.83 1186.07,-443.06 1210,-422 1216.27,-416.48
1222.72,-410.89 1229.27,-405.29"/>
-<text text-anchor="start" x="1198.27" y="-394.09" font-family="Times,serif"
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1219.27" y="-394.09" font-family="Times,serif"
font-size="14.00">1</text>
<text text-anchor="start" x="1137.01" y="-477.45" font-family="Times,serif"
font-size="14.00">1</text>
</g>
<!-- task_instance--xcom -->
<g id="edge49" class="edge">
<title>task_instance--xcom</title>
<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1137.01,-506.4C1161.12,-483.83 1186.07,-461.06 1210,-440 1216.27,-434.48
1222.72,-428.89 1229.27,-423.28"/>
-<text text-anchor="start" x="1219.27" y="-412.08" font-family="Times,serif"
font-size="14.00">1</text>
+<text text-anchor="start" x="1198.27" y="-412.08" font-family="Times,serif"
font-size="14.00">0..N</text>
<text text-anchor="start" x="1137.01" y="-495.2" font-family="Times,serif"
font-size="14.00">1</text>
</g>
<!-- task_instance--xcom -->
<g id="edge50" class="edge">
<title>task_instance--xcom</title>
<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1137.01,-524.15C1161.12,-501.83 1186.07,-479.06 1210,-458 1216.66,-452.14
1223.53,-446.19 1230.5,-440.21"/>
-<text text-anchor="start" x="1199.5" y="-444.01" font-family="Times,serif"
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1220.5" y="-444.01" font-family="Times,serif"
font-size="14.00">1</text>
<text text-anchor="start" x="1137.01" y="-512.95" font-family="Times,serif"
font-size="14.00">1</text>
</g>
<!-- task_instance--xcom -->
<g id="edge51" class="edge">
<title>task_instance--xcom</title>
<path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2"
d="M1137.01,-541.9C1161.12,-519.83 1186.07,-497.06 1210,-476 1223.32,-464.28
1237.48,-452.19 1251.63,-440.12"/>
-<text text-anchor="start" x="1241.63" y="-443.92" font-family="Times,serif"
font-size="14.00">1</text>
+<text text-anchor="start" x="1251.63" y="-443.92" font-family="Times,serif"
font-size="14.00">0..N</text>
<text text-anchor="start" x="1137.01" y="-530.7" font-family="Times,serif"
font-size="14.00">1</text>
</g>
<!-- log_template -->