jedcunningham commented on code in PR #33620:
URL: https://github.com/apache/airflow/pull/33620#discussion_r1316139157
##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -18,45 +18,48 @@
Deferrable Operators & Triggers
===============================
-Standard :doc:`Operators </core-concepts/operators>` and :doc:`Sensors
<../core-concepts/sensors>` take up a full *worker slot* for the entire time
they are running, even if they are idle; for example, if you only have 100
worker slots available to run Tasks, and you have 100 DAGs waiting on a Sensor
that's currently running but idle, then you *cannot run anything else* - even
though your entire Airflow cluster is essentially idle. ``reschedule`` mode for
Sensors solves some of this, allowing Sensors to only run at fixed intervals,
but it is inflexible and only allows using time as the reason to resume, not
anything else.
+Standard :doc:`Operators </core-concepts/operators>` and :doc:`Sensors
<../core-concepts/sensors>` take up a full *worker slot* for the entire time
they are running, even if they are idle. For example, if you only have 100
worker slots available to run tasks, and you have 100 DAGs waiting on a sensor
that's currently running but idle, then you *cannot run anything else* - even
though your entire Airflow cluster is essentially idle. ``reschedule`` mode for
sensors solves some of this, by allowing sensors to only run at fixed
intervals, but it is inflexible and only allows using time as the reason to
resume, not other criteria.
-This is where *Deferrable Operators* come in. A deferrable operator is one
that is written with the ability to suspend itself and free up the worker when
it knows it has to wait, and hand off the job of resuming it to something
called a *Trigger*. As a result, while it is suspended (deferred), it is not
taking up a worker slot and your cluster will have a lot less resources wasted
on idle Operators or Sensors. Note that by default deferred tasks will not use
up pool slots, if you would like them to, you can change this by editing the
pool in question.
+This is where *Deferrable Operators* can be used. A deferrable operator can
suspend itself and free up the worker for other processes when certain
conditions are met. A pre-defined *Trigger* then resumes the deferred operator.
As a result, while it is suspended (deferred) and isn't using worker slot, your
cluster has fewer resources wasted on idle operators or sensors. By default,
deferred tasks don't use pool slots. If you would like them to, you can change
this by editing the pool in question.
-*Triggers* are small, asynchronous pieces of Python code designed to be run
all together in a single Python process; because they are asynchronous, they
are able to all co-exist efficiently. As an overview of how this process works:
+*Triggers* are small, asynchronous pieces of Python code designed to run in a
single Python process. Because they are asynchronous, they can all co-exist
efficiently in a *triggerer*, an Airflow service similar to a scheduler or
worker.
-* A task instance (running operator) gets to a point where it has to wait, and
defers itself with a trigger tied to the event that should resume it. This
frees up the worker to run something else.
-* The new Trigger instance is registered inside Airflow, and picked up by a
*triggerer* process
-* The trigger is run until it fires, at which point its source task is
re-scheduled
-* The scheduler queues the task to resume on a worker node
+An overview of how this process works:
-Using deferrable operators as a DAG author is almost transparent; writing
them, however, takes a bit more work.
+* A task instance (running operator) reaches a point where it has to wait for
other operations or conditions, and defers itself with a trigger tied to an
event to resume it. This frees up the worker to run something else.
+* The new trigger instance is registered by Airflow, and picked up by a
triggerer process.
+* The trigger runs until it fires, at which point its source task is
re-scheduled by the scheduler.
+* The scheduler queues the task to resume on a worker node.
+You can either use pre-written deferrable operators as a DAG author or write
your own. Writing them, however, requires that they meet certain design
criteria.
Using Deferrable Operators
--------------------------
-If all you wish to do is use pre-written Deferrable Operators (such as
``TimeSensorAsync``, which comes with Airflow), then there are only two steps
you need:
+If you want to use pre-written deferrable operators that come with Airflow,
such as ``TimeSensorAsync``, then you only need to complete two steps:
-* Ensure your Airflow installation is running at least one ``triggerer``
process, as well as the normal ``scheduler``
+* Ensure your Airflow installation runs at least one ``triggerer`` process, as
well as the normal ``scheduler``
* Use deferrable operators/sensors in your DAGs
-That's it; everything else will be automatically handled for you. If you're
upgrading existing DAGs, we even provide some API-compatible sensor variants
(e.g. ``TimeSensorAsync`` for ``TimeSensor``) that you can swap into your DAG
with no other changes required.
+Airflow automatically handles and implements the deferral processes for you.
-Note that you cannot yet use the deferral ability from inside custom
PythonOperator/TaskFlow Python functions; it is only available to traditional,
class-based Operators at the moment.
+If you're upgrading existing DAGs to use deferrable operators, Airflow
contains API-compatible sensor variants, like ``TimeSensorAsync`` for
``TimeSensor``. Add these variants into your DAG to use deferrable operators
with no other changes required.
+
+Note that you can't use the deferral ability from inside custom PythonOperator
or TaskFlow Python functions. Deferral is only available to traditional,
class-based operators.
.. _deferring/writing:
Writing Deferrable Operators
----------------------------
-Writing a deferrable operator takes a bit more work. There are some main
points to consider:
+Writing a deferrable operator requires more configuration than updating your
DAGs with pre-written operators. There are some main points to consider:
Review Comment:
```suggestion
When writing a deferrable operators these are the main points to consider:
```
If you are worried about carryover context from the last section, let's do
this instead. Writing operators will naturally be more complex than using
pre-written ones - I see no value in being more verbose here.
##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -97,22 +100,22 @@ Writing a deferrable operator takes a bit more work. There
are some main points
Triggering Deferral
~~~~~~~~~~~~~~~~~~~
-If you want to trigger deferral, at any place in your Operator you can call
``self.defer(trigger, method_name, kwargs, timeout)``, which will raise a
special exception that Airflow will catch. The arguments are:
+If you want to trigger deferral, at any place in your operator, you can call
``self.defer(trigger, method_name, kwargs, timeout)``. This raises a special
exception for Airflow. The arguments are:
-* ``trigger``: An instance of a Trigger that you wish to defer on. It will be
serialized into the database.
-* ``method_name``: The method name on your Operator you want Airflow to call
when it resumes.
-* ``kwargs``: Additional keyword arguments to pass to the method when it is
called. Optional, defaults to ``{}``.
-* ``timeout``: A timedelta that specifies a timeout after which this deferral
will fail, and fail the task instance. Optional, defaults to ``None``, meaning
no timeout.
+* ``trigger``: An instance of a trigger that you want to defer. It will be
serialized into the database.
+* ``method_name``: The method name of your operator that you want Airflow to
call when it resumes.
Review Comment:
```suggestion
* ``method_name``: The method name on your operator that you want Airflow to
call when it resumes.
```
"on" is correct here. Or "in". It's a reference to the method in the task
object.
##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -97,22 +100,22 @@ Writing a deferrable operator takes a bit more work. There
are some main points
Triggering Deferral
~~~~~~~~~~~~~~~~~~~
-If you want to trigger deferral, at any place in your Operator you can call
``self.defer(trigger, method_name, kwargs, timeout)``, which will raise a
special exception that Airflow will catch. The arguments are:
+If you want to trigger deferral, at any place in your operator, you can call
``self.defer(trigger, method_name, kwargs, timeout)``. This raises a special
exception for Airflow. The arguments are:
-* ``trigger``: An instance of a Trigger that you wish to defer on. It will be
serialized into the database.
-* ``method_name``: The method name on your Operator you want Airflow to call
when it resumes.
-* ``kwargs``: Additional keyword arguments to pass to the method when it is
called. Optional, defaults to ``{}``.
-* ``timeout``: A timedelta that specifies a timeout after which this deferral
will fail, and fail the task instance. Optional, defaults to ``None``, meaning
no timeout.
+* ``trigger``: An instance of a trigger that you want to defer. It will be
serialized into the database.
Review Comment:
```suggestion
* ``trigger``: An instance of a trigger that you want to defer to. It will
be serialized into the database.
```
We are deferring the operator, not the trigger. The trigger is the
responsible party now. So maybe this?
##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -185,43 +187,41 @@ Here's the structure of a basic Trigger
yield TriggerEvent(self.moment)
-This is a very simplified version of Airflow's ``DateTimeTrigger``, and you
can see several things here:
+The code example shows several things:
-* ``__init__`` and ``serialize`` are written as a pair; the Trigger is
instantiated once when it is submitted by the Operator as part of its deferral
request, then serialized and re-instantiated on any *triggerer* process that
runs the trigger.
-* The ``run`` method is declared as an ``async def``, as it *must* be
asynchronous, and uses ``asyncio.sleep`` rather than the regular ``time.sleep``
(as that would block the process).
+* ``__init__`` and ``serialize`` are written as a pair. The trigger is
instantiated once when it is submitted by the operator as part of its deferral
request, then serialized and re-instantiated on any triggerer process that runs
the trigger.
+* The ``run`` method is declared as an ``async def``, as it *must* be
asynchronous, and uses ``asyncio.sleep`` rather than the regular ``time.sleep``
(because that would block the process).
* When it emits its event it packs ``self.moment`` in there, so if this
trigger is being run redundantly on multiple hosts, the event can be
de-duplicated.
-Triggers can be as complex or as simple as you like provided you keep inside
this contract; they are designed to be run in a highly-available fashion,
auto-distributed among hosts running the *triggerer*. We encourage you to avoid
any kind of persistent state in a trigger; they should get everything they need
from their ``__init__``, so they can be serialized and moved around freely.
-
-If you are new to writing asynchronous Python, you should be very careful
writing your ``run()`` method; Python's async model means that any code that
does not correctly ``await`` when it does a blocking operation will block the
*entire process*. Airflow will attempt to detect this and warn you in the
triggerer logs when it happens, but we strongly suggest you set the variable
``PYTHONASYNCIODEBUG=1`` when you are writing your Trigger to enable extra
checks from Python to make sure you're writing non-blocking code. Be especially
careful when doing filesystem calls, as if the underlying filesystem is
network-backed it may be blocking.
+Triggers can be as complex or as simple as you want, provided they meet the
design constraints. They can run in a highly-available fashion, and are
auto-distributed among hosts running the triggerer. We encourage you to avoid
any kind of persistent state in a trigger. Triggers should get everything they
need from their ``__init__``, so they can be serialized and moved around freely.
+If you are new to writing asynchronous Python, be very careful when writing
your ``run()`` method. Python's async model means that code can block the
entire process if it does not correctly ``await`` when it does a blocking
operation. Airflow attempts to detect process blocking code and warn you in the
triggerer logs when it happens. You can enable extra checks by Python by
setting the variable ``PYTHONASYNCIODEBUG=1`` when you are writing your trigger
to make sure you're writing non-blocking code. Be especially careful when doing
filesystem calls, because if the underlying filesystem is network-backed, it
can be blocking.
High Availability
-----------------
-Triggers are designed from the ground-up to be highly-available; if you want
to run a highly-available setup, simply run multiple copies of ``triggerer`` on
multiple hosts. Much like ``scheduler``, they will automatically co-exist with
correct locking and HA.
-
-Depending on how much work the triggers are doing, you can fit from hundreds
to tens of thousands of triggers on a single ``triggerer`` host. By default,
every ``triggerer`` will have a capacity of 1000 triggers it will try to run at
once; you can change this with the ``--capacity`` argument. If you have more
triggers trying to run than you have capacity across all of your ``triggerer``
processes, some triggers will be delayed from running until others have
completed.
+Triggers are designed to work in high availability (HA) architecture. If you
want to run a high availability setup, run multiple copies of ``triggerer`` on
multiple hosts. Much like ``scheduler``, they automatically co-exist with
correct locking and HA.
Review Comment:
```suggestion
Triggers are designed to work in a high availability (HA) architecture. If
you want to run a high availability setup, run multiple copies of ``triggerer``
on multiple hosts. Much like ``scheduler``, they automatically co-exist with
correct locking and HA.
```
nit
##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -18,45 +18,48 @@
Deferrable Operators & Triggers
===============================
-Standard :doc:`Operators </core-concepts/operators>` and :doc:`Sensors
<../core-concepts/sensors>` take up a full *worker slot* for the entire time
they are running, even if they are idle; for example, if you only have 100
worker slots available to run Tasks, and you have 100 DAGs waiting on a Sensor
that's currently running but idle, then you *cannot run anything else* - even
though your entire Airflow cluster is essentially idle. ``reschedule`` mode for
Sensors solves some of this, allowing Sensors to only run at fixed intervals,
but it is inflexible and only allows using time as the reason to resume, not
anything else.
+Standard :doc:`Operators </core-concepts/operators>` and :doc:`Sensors
<../core-concepts/sensors>` take up a full *worker slot* for the entire time
they are running, even if they are idle. For example, if you only have 100
worker slots available to run tasks, and you have 100 DAGs waiting on a sensor
that's currently running but idle, then you *cannot run anything else* - even
though your entire Airflow cluster is essentially idle. ``reschedule`` mode for
sensors solves some of this, by allowing sensors to only run at fixed
intervals, but it is inflexible and only allows using time as the reason to
resume, not other criteria.
-This is where *Deferrable Operators* come in. A deferrable operator is one
that is written with the ability to suspend itself and free up the worker when
it knows it has to wait, and hand off the job of resuming it to something
called a *Trigger*. As a result, while it is suspended (deferred), it is not
taking up a worker slot and your cluster will have a lot less resources wasted
on idle Operators or Sensors. Note that by default deferred tasks will not use
up pool slots, if you would like them to, you can change this by editing the
pool in question.
+This is where *Deferrable Operators* can be used. A deferrable operator can
suspend itself and free up the worker for other processes when certain
conditions are met. A pre-defined *Trigger* then resumes the deferred operator.
As a result, while it is suspended (deferred) and isn't using worker slot, your
cluster has fewer resources wasted on idle operators or sensors. By default,
deferred tasks don't use pool slots. If you would like them to, you can change
this by editing the pool in question.
-*Triggers* are small, asynchronous pieces of Python code designed to be run
all together in a single Python process; because they are asynchronous, they
are able to all co-exist efficiently. As an overview of how this process works:
+*Triggers* are small, asynchronous pieces of Python code designed to run in a
single Python process. Because they are asynchronous, they can all co-exist
efficiently in a *triggerer*, an Airflow service similar to a scheduler or
worker.
Review Comment:
nit: Not sure I like "similar" here, that section. Maybe "in the triggerer
Airflow component".
##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -18,45 +18,48 @@
Deferrable Operators & Triggers
===============================
-Standard :doc:`Operators </core-concepts/operators>` and :doc:`Sensors
<../core-concepts/sensors>` take up a full *worker slot* for the entire time
they are running, even if they are idle; for example, if you only have 100
worker slots available to run Tasks, and you have 100 DAGs waiting on a Sensor
that's currently running but idle, then you *cannot run anything else* - even
though your entire Airflow cluster is essentially idle. ``reschedule`` mode for
Sensors solves some of this, allowing Sensors to only run at fixed intervals,
but it is inflexible and only allows using time as the reason to resume, not
anything else.
+Standard :doc:`Operators </core-concepts/operators>` and :doc:`Sensors
<../core-concepts/sensors>` take up a full *worker slot* for the entire time
they are running, even if they are idle. For example, if you only have 100
worker slots available to run tasks, and you have 100 DAGs waiting on a sensor
that's currently running but idle, then you *cannot run anything else* - even
though your entire Airflow cluster is essentially idle. ``reschedule`` mode for
sensors solves some of this, by allowing sensors to only run at fixed
intervals, but it is inflexible and only allows using time as the reason to
resume, not other criteria.
-This is where *Deferrable Operators* come in. A deferrable operator is one
that is written with the ability to suspend itself and free up the worker when
it knows it has to wait, and hand off the job of resuming it to something
called a *Trigger*. As a result, while it is suspended (deferred), it is not
taking up a worker slot and your cluster will have a lot less resources wasted
on idle Operators or Sensors. Note that by default deferred tasks will not use
up pool slots, if you would like them to, you can change this by editing the
pool in question.
+This is where *Deferrable Operators* can be used. A deferrable operator can
suspend itself and free up the worker for other processes when certain
conditions are met. A pre-defined *Trigger* then resumes the deferred operator.
As a result, while it is suspended (deferred) and isn't using worker slot, your
cluster has fewer resources wasted on idle operators or sensors. By default,
deferred tasks don't use pool slots. If you would like them to, you can change
this by editing the pool in question.
-*Triggers* are small, asynchronous pieces of Python code designed to be run
all together in a single Python process; because they are asynchronous, they
are able to all co-exist efficiently. As an overview of how this process works:
+*Triggers* are small, asynchronous pieces of Python code designed to run in a
single Python process. Because they are asynchronous, they can all co-exist
efficiently in a *triggerer*, an Airflow service similar to a scheduler or
worker.
-* A task instance (running operator) gets to a point where it has to wait, and
defers itself with a trigger tied to the event that should resume it. This
frees up the worker to run something else.
-* The new Trigger instance is registered inside Airflow, and picked up by a
*triggerer* process
-* The trigger is run until it fires, at which point its source task is
re-scheduled
-* The scheduler queues the task to resume on a worker node
+An overview of how this process works:
-Using deferrable operators as a DAG author is almost transparent; writing
them, however, takes a bit more work.
+* A task instance (running operator) reaches a point where it has to wait for
other operations or conditions, and defers itself with a trigger tied to an
event to resume it. This frees up the worker to run something else.
+* The new trigger instance is registered by Airflow, and picked up by a
triggerer process.
+* The trigger runs until it fires, at which point its source task is
re-scheduled by the scheduler.
+* The scheduler queues the task to resume on a worker node.
+You can either use pre-written deferrable operators as a DAG author or write
your own. Writing them, however, requires that they meet certain design
criteria.
Using Deferrable Operators
--------------------------
-If all you wish to do is use pre-written Deferrable Operators (such as
``TimeSensorAsync``, which comes with Airflow), then there are only two steps
you need:
+If you want to use pre-written deferrable operators that come with Airflow,
such as ``TimeSensorAsync``, then you only need to complete two steps:
-* Ensure your Airflow installation is running at least one ``triggerer``
process, as well as the normal ``scheduler``
+* Ensure your Airflow installation runs at least one ``triggerer`` process, as
well as the normal ``scheduler``
* Use deferrable operators/sensors in your DAGs
-That's it; everything else will be automatically handled for you. If you're
upgrading existing DAGs, we even provide some API-compatible sensor variants
(e.g. ``TimeSensorAsync`` for ``TimeSensor``) that you can swap into your DAG
with no other changes required.
+Airflow automatically handles and implements the deferral processes for you.
-Note that you cannot yet use the deferral ability from inside custom
PythonOperator/TaskFlow Python functions; it is only available to traditional,
class-based Operators at the moment.
+If you're upgrading existing DAGs to use deferrable operators, Airflow
contains API-compatible sensor variants, like ``TimeSensorAsync`` for
``TimeSensor``. Add these variants into your DAG to use deferrable operators
with no other changes required.
+
+Note that you can't use the deferral ability from inside custom PythonOperator
or TaskFlow Python functions. Deferral is only available to traditional,
class-based operators.
.. _deferring/writing:
Writing Deferrable Operators
----------------------------
-Writing a deferrable operator takes a bit more work. There are some main
points to consider:
+Writing a deferrable operator requires more configuration than updating your
DAGs with pre-written operators. There are some main points to consider:
-* Your Operator must defer itself with a Trigger. If there is a Trigger in
core Airflow you can use, great; otherwise, you will have to write one.
-* Your Operator will be stopped and removed from its worker while deferred,
and no state will persist automatically. You can persist state by asking
Airflow to resume you at a certain method or pass certain kwargs, but that's it.
-* You can defer multiple times, and you can defer before/after your Operator
does significant work, or only defer if certain conditions are met (e.g. a
system does not have an immediate answer). Deferral is entirely under your
control.
-* Any Operator can defer; no special marking on its class is needed, and it's
not limited to Sensors.
-* In order for any changes to a Trigger to be reflected, the *triggerer* needs
to be restarted whenever the Trigger is modified.
-* If you want to add an operator or sensor that supports both deferrable and
non-deferrable modes, it's suggested to add ``deferrable: bool =
conf.getboolean("operators", "default_deferrable", fallback=False)`` to the
``__init__`` method of the operator and use it to decide whether to run the
operator in deferrable mode. You'll be able to configure the default value of
``deferrable`` of all the operators and sensors that support switching between
deferrable and non-deferrable mode through ``default_deferrable`` in the
``operator`` section. Here's an example of a sensor that supports both modes.
+* Your operator must defer itself with a trigger. You can use a trigger
included in core Airflow, or you can write a custom one.
+* Your operator will be stopped and removed from its worker while deferred,
and no state persists automatically. You can persist state by instructing
Airflow to resume the operation at a certain method or by passing certain
kwargs.
Review Comment:
```suggestion
* Your operator will be stopped and removed from its worker while deferred,
and no state persists automatically. You can persist state by instructing
Airflow to resume the operator at a certain method or by passing certain kwargs.
```
##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -18,45 +18,48 @@
Deferrable Operators & Triggers
===============================
-Standard :doc:`Operators </core-concepts/operators>` and :doc:`Sensors
<../core-concepts/sensors>` take up a full *worker slot* for the entire time
they are running, even if they are idle; for example, if you only have 100
worker slots available to run Tasks, and you have 100 DAGs waiting on a Sensor
that's currently running but idle, then you *cannot run anything else* - even
though your entire Airflow cluster is essentially idle. ``reschedule`` mode for
Sensors solves some of this, allowing Sensors to only run at fixed intervals,
but it is inflexible and only allows using time as the reason to resume, not
anything else.
+Standard :doc:`Operators </core-concepts/operators>` and :doc:`Sensors
<../core-concepts/sensors>` take up a full *worker slot* for the entire time
they are running, even if they are idle. For example, if you only have 100
worker slots available to run tasks, and you have 100 DAGs waiting on a sensor
that's currently running but idle, then you *cannot run anything else* - even
though your entire Airflow cluster is essentially idle. ``reschedule`` mode for
sensors solves some of this, by allowing sensors to only run at fixed
intervals, but it is inflexible and only allows using time as the reason to
resume, not other criteria.
-This is where *Deferrable Operators* come in. A deferrable operator is one
that is written with the ability to suspend itself and free up the worker when
it knows it has to wait, and hand off the job of resuming it to something
called a *Trigger*. As a result, while it is suspended (deferred), it is not
taking up a worker slot and your cluster will have a lot less resources wasted
on idle Operators or Sensors. Note that by default deferred tasks will not use
up pool slots, if you would like them to, you can change this by editing the
pool in question.
+This is where *Deferrable Operators* can be used. A deferrable operator can
suspend itself and free up the worker for other processes when certain
conditions are met. A pre-defined *Trigger* then resumes the deferred operator.
As a result, while it is suspended (deferred) and isn't using worker slot, your
cluster has fewer resources wasted on idle operators or sensors. By default,
deferred tasks don't use pool slots. If you would like them to, you can change
this by editing the pool in question.
Review Comment:
I don't really like this rewording: "for other processes when certain
conditions are met. A pre-defined *Trigger* then resumes the deferred operator."
"when certain conditions are met" is technically correct, but is also a bit
vague. Since all, or at least the vast majority only defer when they need to
wait, maybe we can incorporate that so we have an example "condition"?
"A pre-defined *Trigger* then resumes the deferred operator" - I thought the
old wording was easier to grok. The trigger is responsible for resuming the
remaining non-async operator code (if any? @dstandish, did we ever get that
short circuit done?).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]