This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ef8f3498b4 docs(deferring): fix wrong example and remove unnecessay 
example (#41691)
ef8f3498b4 is described below

commit ef8f3498b41cdd7927ed6f0ddcce7664a4707ab0
Author: Wei Lee <[email protected]>
AuthorDate: Mon Aug 26 16:41:01 2024 +0800

    docs(deferring): fix wrong example and remove unnecessay example (#41691)
---
 .../authoring-and-scheduling/deferring.rst         | 77 +++++++++++-----------
 1 file changed, 37 insertions(+), 40 deletions(-)

diff --git a/docs/apache-airflow/authoring-and-scheduling/deferring.rst 
b/docs/apache-airflow/authoring-and-scheduling/deferring.rst
index def9246968..0b477151a9 100644
--- a/docs/apache-airflow/authoring-and-scheduling/deferring.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/deferring.rst
@@ -171,12 +171,16 @@ Here's a basic example of how a sensor might trigger 
deferral:
 
 .. code-block:: python
 
+    from __future__ import annotations
+
     from datetime import timedelta
-    from typing import Any
+    from typing import TYPE_CHECKING, Any
 
     from airflow.sensors.base import BaseSensorOperator
     from airflow.triggers.temporal import TimeDeltaTrigger
-    from airflow.utils.context import Context
+
+    if TYPE_CHECKING:
+        from airflow.utils.context import Context
 
 
     class WaitOneHourSensor(BaseSensorOperator):
@@ -187,6 +191,7 @@ Here's a basic example of how a sensor might trigger 
deferral:
             # We have no more work to do here. Mark as complete.
             return
 
+
 When you opt to defer, your operator will stop executing at that point and be 
removed from its current worker. No state will persist, such as local variables 
or attributes set on ``self``. When your operator resumes, it resumes as a new 
instance of it. The only way you can pass state from the old instance of the 
operator to the new one is with ``method_name`` and ``kwargs``.
 
 When your operator resumes, Airflow adds a ``context`` object and an ``event`` 
object to the kwargs passed to the ``method_name`` method. This ``event`` 
object contains the payload from the trigger event that resumed your operator. 
Depending on the trigger, this can be useful to your operator, like it's a 
status code or URL to fetch results. Or, it might be unimportant information, 
like a datetime. Your ``method_name`` method, however, *must* accept 
``context`` and ``event`` as a keyword [...]
@@ -209,47 +214,31 @@ Triggering Deferral from Task Start
 If you want to defer your task directly to the triggerer without going into 
the worker, you can set class level attribute ``start_from_trigger`` to 
``True`` and add a class level attribute ``start_trigger_args`` with an 
``StartTriggerArgs`` object with the following 4 attributes to your deferrable 
operator:
 
 * ``trigger_cls``: An importable path to your trigger class.
-* ``trigger_kwargs``: Keyword arguments to pass to the ``trigger_cls`` when 
it's initialized. **Note that all the arguments need to be serializable. It's 
the main limitation of this feature.**
+* ``trigger_kwargs``: Keyword arguments to pass to the ``trigger_cls`` when 
it's initialized. **Note that all the arguments need to be serializable by 
Airflow. It's the main limitation of this feature.**
 * ``next_method``: The method name on your operator that you want Airflow to 
call when it resumes.
 * ``next_kwargs``: Additional keyword arguments to pass to the ``next_method`` 
when it is called.
 * ``timeout``: (Optional) A timedelta that specifies a timeout after which 
this deferral will fail, and fail the task instance. Defaults to ``None``, 
which means no timeout.
 
-This is particularly useful when deferring is the only thing the ``execute`` 
method does. Here's a basic refinement of the previous example. In the previous 
example, we used ``DateTimeTrigger`` which takes an argument ``delta`` with 
type ``datetime.timedelta`` which is not serializable. Thus, we need to create 
a new trigger with serializable arguments.
+In the sensor part, we'll need to provide the path to ``TimeDeltaTrigger`` as 
``trigger_cls``.
 
 .. code-block:: python
 
     from __future__ import annotations
 
-    import datetime
-
-    from airflow.triggers.temporal import DateTimeTrigger
-    from airflow.utils import timezone
-
-
-    class HourDeltaTrigger(DateTimeTrigger):
-        def __init__(self, hours: int):
-            moment = timezone.utcnow() + datetime.timedelta(hours=hours)
-            super().__init__(moment=moment)
-
-
-In the sensor part, we'll need to provide the path to ``HourDeltaTrigger`` as 
``trigger_cls``.
-
-.. code-block:: python
-
-    from __future__ import annotations
-
-    from typing import Any
+    from datetime import timedelta
+    from typing import TYPE_CHECKING, Any
 
     from airflow.sensors.base import BaseSensorOperator
     from airflow.triggers.base import StartTriggerArgs
-    from airflow.utils.context import Context
+
+    if TYPE_CHECKING:
+        from airflow.utils.context import Context
 
 
     class WaitOneHourSensor(BaseSensorOperator):
-        # You'll need to change trigger_cls to the actual path to 
HourDeltaTrigger.
         start_trigger_args = StartTriggerArgs(
-            trigger_cls="airflow.triggers.temporal.HourDeltaTrigger",
-            trigger_kwargs={"hours": 1},
+            trigger_cls="airflow.triggers.temporal.TimeDeltaTrigger",
+            trigger_kwargs={"moment": timedelta(hours=1)},
             next_method="execute_complete",
             next_kwargs=None,
             timeout=None,
@@ -265,23 +254,27 @@ In the sensor part, we'll need to provide the path to 
``HourDeltaTrigger`` as ``
 
 .. code-block:: python
 
+    from __future__ import annotations
+
     from datetime import timedelta
-    from typing import Any
+    from typing import TYPE_CHECKING, Any
 
     from airflow.sensors.base import BaseSensorOperator
-    from airflow.triggers.temporal import TimeDeltaTrigger
-    from airflow.utils.context import Context
+    from airflow.triggers.base import StartTriggerArgs
 
+    if TYPE_CHECKING:
+        from airflow.utils.context import Context
 
-    class WaitTwoHourSensor(BaseSensorOperator):
-        # You'll need to change trigger_cls to the actual path to 
HourDeltaTrigger.
+
+    class WaitHoursSensor(BaseSensorOperator):
         start_trigger_args = StartTriggerArgs(
-            trigger_cls="airflow.triggers.temporal.HourDeltaTrigger",
-            trigger_kwargs={"hours": 1},
+            trigger_cls="airflow.triggers.temporal.TimeDeltaTrigger",
+            trigger_kwargs={"moment": timedelta(hours=1)},
             next_method="execute_complete",
             next_kwargs=None,
             timeout=None,
         )
+        start_from_trigger = True
 
         def __init__(self, *args: list[Any], **kwargs: dict[str, Any]) -> None:
             super().__init__(*args, **kwargs)
@@ -300,23 +293,27 @@ After the trigger has finished executing, the task may be 
sent back to the worke
 
 .. code-block:: python
 
+    from __future__ import annotations
+
     from datetime import timedelta
-    from typing import Any
+    from typing import TYPE_CHECKING, Any
 
     from airflow.sensors.base import BaseSensorOperator
-    from airflow.triggers.temporal import TimeDeltaTrigger
-    from airflow.utils.context import Context
+    from airflow.triggers.base import StartTriggerArgs
+
+    if TYPE_CHECKING:
+        from airflow.utils.context import Context
 
 
     class WaitHoursSensor(BaseSensorOperator):
-        # You'll need to change trigger_cls to the actual path to 
HourDeltaTrigger.
         start_trigger_args = StartTriggerArgs(
-            trigger_cls="airflow.triggers.temporal.HourDeltaTrigger",
-            trigger_kwargs={"hours": 1},
+            trigger_cls="airflow.triggers.temporal.TimeDeltaTrigger",
+            trigger_kwargs={"moment": timedelta(hours=1)},
             next_method="execute_complete",
             next_kwargs=None,
             timeout=None,
         )
+        start_from_trigger = True
 
         def __init__(
             self,

Reply via email to