uranusjr commented on code in PR #40084:
URL: https://github.com/apache/airflow/pull/40084#discussion_r1687837706


##########
tests/sensors/test_time_sensor.py:
##########
@@ -63,7 +63,6 @@ def test_task_is_deferred(self):
 
         assert isinstance(exc_info.value.trigger, DateTimeTrigger)
         assert exc_info.value.trigger.moment == timezone.datetime(2020, 7, 7, 
10)
-        assert exc_info.value.method_name == "execute_complete"

Review Comment:
   Should we still check the method name is `__trigger_exit__` here?



##########
airflow/models/taskinstance.py:
##########
@@ -83,7 +83,6 @@
     AirflowTaskTimeout,
     DagRunNotFound,
     RemovedInAirflow3Warning,
-    TaskDeferralError,

Review Comment:
   Can we remove this? (Backward compatibility)



##########
tests/models/test_trigger.py:
##########
@@ -144,6 +151,58 @@ def test_submit_failure(session, create_task_instance):
     assert updated_task_instance.next_method == "__fail__"
 
 
[email protected](
+    "event_cls, expected",
+    [
+        (TaskSuccessEvent, "success"),
+        (TaskFailedEvent, "failed"),
+        (TaskSkippedEvent, "skipped"),
+    ],
+)
+def test_submit_event_task_end(session, create_task_instance, event_cls, 
expected):
+    """
+    Tests that events inheriting BaseTaskEndEvent *don't* re-wake their 
dependent
+    but mark them in the appropriate terminal state and send xcom
+    """
+    # Make a trigger
+    trigger = Trigger(classpath="does.not.matter", kwargs={})
+    trigger.id = 1
+    session.add(trigger)
+    session.commit()
+    # Make a TaskInstance that's deferred and waiting on it
+    task_instance = create_task_instance(
+        session=session, execution_date=timezone.utcnow(), state=State.DEFERRED
+    )
+    task_instance.trigger_id = trigger.id
+    session.commit()
+
+    def get_xcoms(ti):
+        return XCom.get_many(dag_ids=[ti.dag_id], task_ids=[ti.task_id], 
run_id=ti.run_id).all()
+
+    # now for the real test
+    # first check initial state
+    ti: TaskInstance = session.query(TaskInstance).one()
+    assert ti.state == "deferred"
+    assert get_xcoms(ti) == []
+
+    session.flush()
+    session.expunge_all()

Review Comment:
   Why do we need to expunge? (Same question for the call below)



##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -253,6 +253,73 @@ These parameters can be mapped using the ``expand`` and 
``partial`` methods. Not
         trigger_kwargs=[{"moment": timedelta(hours=2)}, {"moment": 
timedelta(hours=2)}]
     )
 
+Exiting deferred task from Triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ .. versionadded:: 2.10.0
+
+If you want to exit your task directly from the triggerer without going into 
the worker, you can specific operator level attribute ``end_from_trigger`` with 
the attributes to your deferrable operator other discussed above.
+
+Triggers can have two options: they can either send execution back to the 
worker or end the task instance directly. If the trigger ends the task instance 
itself, the ``method_name`` does not matter and can be ``None``. Otherwise, 
provide the name of the ``method_name`` that should be used when resuming 
execution in the task.

Review Comment:
   ```suggestion
   Triggers can have two options: they can either send execution back to the 
worker or end the task instance directly. If the trigger ends the task instance 
itself, the ``method_name`` does not matter and can be ``None``. Otherwise, 
provide ``method_name`` that should be used when resuming execution in the task.
   ```



##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -253,6 +253,73 @@ These parameters can be mapped using the ``expand`` and 
``partial`` methods. Not
         trigger_kwargs=[{"moment": timedelta(hours=2)}, {"moment": 
timedelta(hours=2)}]
     )
 
+Exiting deferred task from Triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ .. versionadded:: 2.10.0
+
+If you want to exit your task directly from the triggerer without going into 
the worker, you can specific operator level attribute ``end_from_trigger`` with 
the attributes to your deferrable operator other discussed above.
+
+Triggers can have two options: they can either send execution back to the 
worker or end the task instance directly. If the trigger ends the task instance 
itself, the ``method_name`` does not matter and can be ``None``. Otherwise, 
provide the name of the ``method_name`` that should be used when resuming 
execution in the task.
+
+.. code-block:: python
+
+    class DateTimeSensorAsync(DateTimeSensor):
+        def __init__(self, end_from_trigger: bool = True, **kwargs) -> None:
+            super().__init__(**kwargs)
+            self.end_from_trigger = end_from_trigger
+
+        def execute(self, context: Context) -> NoReturn:
+            self.defer(
+                method_name="execute_complete",
+                trigger=DateTimeTrigger(
+                    moment=timezone.parse(self.target_time), 
end_from_trigger=self.end_from_trigger
+                ),
+            )
+
+        def execute_complete(self, context, event=None) -> None:
+            return None
+
+``TaskSuccessEvent`` and ``TaskFailureEvent`` are the two events that can be 
used to end the task instance directly. This marks the task with the state 
``task_instance_state`` and optionally pushes xcom if applicable. Here's an 
example of how to use these events:
+
+.. code-block:: python
+
+
+    class DateTimeTrigger(BaseTrigger):
+        def __init__(self, moment: datetime.datetime, *, end_from_trigger: 
bool = False):

Review Comment:
   Same here, we can make the example simpler so it is easier to understand.



##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -253,6 +253,73 @@ These parameters can be mapped using the ``expand`` and 
``partial`` methods. Not
         trigger_kwargs=[{"moment": timedelta(hours=2)}, {"moment": 
timedelta(hours=2)}]
     )
 
+Exiting deferred task from Triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ .. versionadded:: 2.10.0
+
+If you want to exit your task directly from the triggerer without going into 
the worker, you can specific operator level attribute ``end_from_trigger`` with 
the attributes to your deferrable operator other discussed above.
+
+Triggers can have two options: they can either send execution back to the 
worker or end the task instance directly. If the trigger ends the task instance 
itself, the ``method_name`` does not matter and can be ``None``. Otherwise, 
provide the name of the ``method_name`` that should be used when resuming 
execution in the task.
+
+.. code-block:: python
+
+    class DateTimeSensorAsync(DateTimeSensor):
+        def __init__(self, end_from_trigger: bool = True, **kwargs) -> None:
+            super().__init__(**kwargs)
+            self.end_from_trigger = end_from_trigger
+
+        def execute(self, context: Context) -> NoReturn:
+            self.defer(
+                method_name="execute_complete",
+                trigger=DateTimeTrigger(
+                    moment=timezone.parse(self.target_time), 
end_from_trigger=self.end_from_trigger
+                ),
+            )
+
+        def execute_complete(self, context, event=None) -> None:
+            return None
+
+``TaskSuccessEvent`` and ``TaskFailureEvent`` are the two events that can be 
used to end the task instance directly. This marks the task with the state 
``task_instance_state`` and optionally pushes xcom if applicable. Here's an 
example of how to use these events:
+
+.. code-block:: python
+
+
+    class DateTimeTrigger(BaseTrigger):
+        def __init__(self, moment: datetime.datetime, *, end_from_trigger: 
bool = False):

Review Comment:
   Maybe we should have a better example than datatime in the first place; 
people don’t ever need to implement a datetime sensor because Airflow already 
has one.



##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -253,6 +253,73 @@ These parameters can be mapped using the ``expand`` and 
``partial`` methods. Not
         trigger_kwargs=[{"moment": timedelta(hours=2)}, {"moment": 
timedelta(hours=2)}]
     )
 
+Exiting deferred task from Triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ .. versionadded:: 2.10.0
+
+If you want to exit your task directly from the triggerer without going into 
the worker, you can specific operator level attribute ``end_from_trigger`` with 
the attributes to your deferrable operator other discussed above.

Review Comment:
   ```suggestion
   If you want to exit your task directly from the triggerer without going into 
the worker, you can specific operator level attribute ``end_from_trigger`` with 
the attributes to your deferrable operator other discussed above. This can save 
some resources needed to start a new worker.
   ```



##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -253,6 +253,73 @@ These parameters can be mapped using the ``expand`` and 
``partial`` methods. Not
         trigger_kwargs=[{"moment": timedelta(hours=2)}, {"moment": 
timedelta(hours=2)}]
     )
 
+Exiting deferred task from Triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ .. versionadded:: 2.10.0
+
+If you want to exit your task directly from the triggerer without going into 
the worker, you can specific operator level attribute ``end_from_trigger`` with 
the attributes to your deferrable operator other discussed above.
+
+Triggers can have two options: they can either send execution back to the 
worker or end the task instance directly. If the trigger ends the task instance 
itself, the ``method_name`` does not matter and can be ``None``. Otherwise, 
provide the name of the ``method_name`` that should be used when resuming 
execution in the task.
+
+.. code-block:: python
+
+    class DateTimeSensorAsync(DateTimeSensor):
+        def __init__(self, end_from_trigger: bool = True, **kwargs) -> None:
+            super().__init__(**kwargs)
+            self.end_from_trigger = end_from_trigger
+
+        def execute(self, context: Context) -> NoReturn:
+            self.defer(
+                method_name="execute_complete",
+                trigger=DateTimeTrigger(
+                    moment=timezone.parse(self.target_time), 
end_from_trigger=self.end_from_trigger
+                ),
+            )
+
+        def execute_complete(self, context, event=None) -> None:
+            return None

Review Comment:
   ```suggestion
       class DateTimeSensorAsync(DateTimeSensor):
           def __init__(self, **kwargs) -> None:
               super().__init__(**kwargs)
               self.end_from_trigger = True
   
           def execute(self, context: Context) -> NoReturn:
               self.defer(
                   method_name=None,
                   trigger=DateTimeTrigger(
                       moment=timezone.parse(self.target_time), 
end_from_trigger=self.end_from_trigger
                   ),
               )
   ```
   
   Let’s make this example simple for people. If users want their sensor to 
support both modes, I think they should be smart enough to add the argument.
   
   (We should also add text to tell users this sensor _always_ exit from 
trigger.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to