Lee-W commented on code in PR #40084:
URL: https://github.com/apache/airflow/pull/40084#discussion_r1684537451


##########
airflow/sensors/time_delta.py:
##########
@@ -59,28 +59,34 @@ class TimeDeltaSensorAsync(TimeDeltaSensor):
     Will defers itself to avoid taking up a worker slot while it is waiting.
 
     :param delta: time length to wait after the data interval before 
succeeding.
+    :param end_from_trigger: End the task directly from the triggerer without 
going into the worker.
 
     .. seealso::
         For more information on how to use this sensor, take a look at the 
guide:
         :ref:`howto/operator:TimeDeltaSensorAsync`
 
     """
 
+    def __init__(self, *, end_from_trigger: bool = False, delta, **kwargs):

Review Comment:
   ```suggestion
       def __init__(self, *, end_from_trigger: bool = False, delta, **kwargs) 
-> None:
   ```



##########
airflow/triggers/base.py:
##########
@@ -137,3 +150,96 @@ def __eq__(self, other):
         if isinstance(other, TriggerEvent):
             return other.payload == self.payload
         return False
+
+    @provide_session
+    def handle_submit(self, *, task_instance: TaskInstance, session: Session = 
NEW_SESSION) -> None:
+        """
+        Handle the submit event for a given task instance.
+
+        This function sets the next method and next kwargs of the task 
instance,
+        as well as its state to scheduled. It also adds the event's payload
+        into the kwargs for the task.
+
+        :param task_instance: The task instance to handle the submit event for.
+        :param session: The session to be used for the database callback sink.
+        """
+        next_kwargs = task_instance.next_kwargs or {}
+        next_kwargs["event"] = self.payload
+        task_instance.next_kwargs = next_kwargs
+        task_instance.trigger_id = None
+        task_instance.state = TaskInstanceState.SCHEDULED
+
+
+class BaseTaskEndEvent(TriggerEvent):
+    """Base event class to end the task without resuming on worker."""
+
+    task_instance_state: TaskInstanceState
+
+    def __init__(self, *, xcoms: dict[str, Any] | None = None, **kwargs) -> 
None:
+        """
+        Initialize the class with the specified parameters.
+
+        :param xcoms: A dictionary of XComs or None.
+        :param kwargs: Additional keyword arguments.
+        """
+        if "payload" in kwargs:
+            raise ValueError("Param 'payload' not supported for this class.")
+        super().__init__(payload=self.task_instance_state)
+        self.xcoms = xcoms
+
+    @provide_session
+    def handle_submit(self, *, task_instance: TaskInstance, session: Session = 
NEW_SESSION) -> None:
+        """
+        Submit event for the given task instance.
+
+        Marks the task with the state `task_instance_state` and optionally 
pushes xcom if applicable.
+
+        :param task_instance: The task instance to be submitted.
+        :param session: The session to be used for the database callback sink.
+        """
+        # Mark the task with terminal state and prevent it from resuming on 
worker
+        task_instance.trigger_id = None
+        task_instance.state = self.task_instance_state
+
+        self._submit_callback_if_necessary(task_instance=task_instance, 
session=session)
+        self._push_xcoms_if_necessary(task_instance=task_instance)
+
+    def _submit_callback_if_necessary(self, *, task_instance: TaskInstance, 
session) -> None:
+        """Submit a callback request if the task state is SUCCESS or FAILED."""
+        is_failure = self.task_instance_state == TaskInstanceState.FAILED
+        if self.task_instance_state in [TaskInstanceState.SUCCESS, 
TaskInstanceState.FAILED]:

Review Comment:
   ```suggestion
           if self.task_instance_state in (TaskInstanceState.SUCCESS, 
TaskInstanceState.FAILED]):
   ```



##########
airflow/triggers/temporal.py:
##########
@@ -84,7 +95,11 @@ class TimeDeltaTrigger(DateTimeTrigger):
 
     While this is its own distinct class here, it will serialise to a
     DateTimeTrigger class, since they're operationally the same.
+
+    :param delta: how long to wait
+    :param end_from_trigger: whether the trigger should mark the task 
successful after time condition
+        reached or resume the task after time condition reached.
     """
 
-    def __init__(self, delta: datetime.timedelta):
-        super().__init__(moment=timezone.utcnow() + delta)
+    def __init__(self, delta: datetime.timedelta, *, end_from_trigger: bool = 
False):

Review Comment:
   ```suggestion
       def __init__(self, delta: datetime.timedelta, *, end_from_trigger: bool 
= False) -> None:
   ```



##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -213,6 +213,70 @@ This is particularly useful when deferring is the only 
thing the ``execute`` met
             # We have no more work to do here. Mark as complete.
             return
 
+Exiting deferred task from Triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ .. versionadded:: 2.10.0
+
+If you want to exit your task directly from the triggerer without going into 
the worker, you can specific operator level attribute ``end_from_trigger`` with 
the attributes to your deferrable operator other discussed above.

Review Comment:
   Is the name `end_from_trigger` a convention or a requirement?



##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -213,6 +213,70 @@ This is particularly useful when deferring is the only 
thing the ``execute`` met
             # We have no more work to do here. Mark as complete.
             return
 
+Exiting deferred task from Triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ .. versionadded:: 2.10.0
+
+If you want to exit your task directly from the triggerer without going into 
the worker, you can specific operator level attribute ``end_from_trigger`` with 
the attributes to your deferrable operator other discussed above.
+
+Triggers can have two options: they can either send execution back to the 
worker or end the task instance directly. If the trigger ends the task instance 
itself, the ``method_name`` does not matter and can be ``None``. Otherwise, 
provide the name of the ``method_name`` that should be used when resuming 
execution in the task.
+
+.. code-block:: python
+
+    class DateTimeSensorAsync(DateTimeSensor):
+        def __init__(self, end_from_trigger: bool = True, **kwargs) -> None:
+            super().__init__(**kwargs)
+            self.end_from_trigger = end_from_trigger
+
+        def execute(self, context: Context) -> NoReturn:
+            self.defer(
+                method_name="execute_complete",

Review Comment:
   What if we only want to support the `end_from_tirgger` mode? Should we make 
it `None` or `__trigger_exit__`?



##########
airflow/sensors/time_delta.py:
##########
@@ -59,28 +59,34 @@ class TimeDeltaSensorAsync(TimeDeltaSensor):
     Will defers itself to avoid taking up a worker slot while it is waiting.
 
     :param delta: time length to wait after the data interval before 
succeeding.
+    :param end_from_trigger: End the task directly from the triggerer without 
going into the worker.
 
     .. seealso::
         For more information on how to use this sensor, take a look at the 
guide:
         :ref:`howto/operator:TimeDeltaSensorAsync`
 
     """
 
+    def __init__(self, *, end_from_trigger: bool = False, delta, **kwargs):
+        super().__init__(delta=delta, **kwargs)
+        self.end_from_trigger = end_from_trigger
+        self.delta = delta

Review Comment:
   ```suggestion
           self.delta = delta
   ```
   
   I think it's assigned in `TimeDeltaSensor`



##########
airflow/sensors/time_sensor.py:
##########
@@ -73,12 +75,11 @@ def __init__(self, *, target_time: datetime.time, **kwargs) 
-> None:
         self.target_datetime = timezone.convert_to_utc(aware_time)
 
     def execute(self, context: Context) -> NoReturn:
-        trigger = DateTimeTrigger(moment=self.target_datetime)
         self.defer(
-            trigger=trigger,
+            trigger=DateTimeTrigger(moment=self.target_datetime, 
end_from_trigger=self.end_from_trigger),
             method_name="execute_complete",
         )
 
-    def execute_complete(self, context, event=None) -> None:
-        """Execute when the trigger fires - returns immediately."""
+    def execute_complete(self, context, event=None):

Review Comment:
   ```suggestion
       def execute_complete(self, context: Context, event: Any = None) -> None:
   ```



##########
airflow/triggers/base.py:
##########
@@ -137,3 +150,96 @@ def __eq__(self, other):
         if isinstance(other, TriggerEvent):
             return other.payload == self.payload
         return False
+
+    @provide_session
+    def handle_submit(self, *, task_instance: TaskInstance, session: Session = 
NEW_SESSION) -> None:
+        """
+        Handle the submit event for a given task instance.
+
+        This function sets the next method and next kwargs of the task 
instance,
+        as well as its state to scheduled. It also adds the event's payload
+        into the kwargs for the task.
+
+        :param task_instance: The task instance to handle the submit event for.
+        :param session: The session to be used for the database callback sink.
+        """
+        next_kwargs = task_instance.next_kwargs or {}
+        next_kwargs["event"] = self.payload
+        task_instance.next_kwargs = next_kwargs
+        task_instance.trigger_id = None
+        task_instance.state = TaskInstanceState.SCHEDULED
+
+
+class BaseTaskEndEvent(TriggerEvent):
+    """Base event class to end the task without resuming on worker."""

Review Comment:
   Should we add `:meta private:` here? I feel this is probably something we 
don't need to make it public



##########
airflow/triggers/temporal.py:
##########
@@ -34,9 +34,13 @@ class DateTimeTrigger(BaseTrigger):
     a few seconds.
 
     The provided datetime MUST be in UTC.
+
+    :param moment: when to yield event
+    :param end_from_trigger: whether the trigger should mark the task 
successful after time condition
+        reached or resume the task after time condition reached.
     """
 
-    def __init__(self, moment: datetime.datetime):
+    def __init__(self, moment: datetime.datetime, *, end_from_trigger: bool = 
False):

Review Comment:
   ```suggestion
       def __init__(self, moment: datetime.datetime, *, end_from_trigger: bool 
= False) -> None:
   ```



##########
airflow/triggers/base.py:
##########
@@ -137,3 +150,96 @@ def __eq__(self, other):
         if isinstance(other, TriggerEvent):
             return other.payload == self.payload
         return False
+
+    @provide_session
+    def handle_submit(self, *, task_instance: TaskInstance, session: Session = 
NEW_SESSION) -> None:
+        """
+        Handle the submit event for a given task instance.
+
+        This function sets the next method and next kwargs of the task 
instance,
+        as well as its state to scheduled. It also adds the event's payload
+        into the kwargs for the task.
+
+        :param task_instance: The task instance to handle the submit event for.
+        :param session: The session to be used for the database callback sink.
+        """
+        next_kwargs = task_instance.next_kwargs or {}
+        next_kwargs["event"] = self.payload
+        task_instance.next_kwargs = next_kwargs
+        task_instance.trigger_id = None
+        task_instance.state = TaskInstanceState.SCHEDULED
+
+
+class BaseTaskEndEvent(TriggerEvent):
+    """Base event class to end the task without resuming on worker."""
+
+    task_instance_state: TaskInstanceState
+
+    def __init__(self, *, xcoms: dict[str, Any] | None = None, **kwargs) -> 
None:
+        """
+        Initialize the class with the specified parameters.
+
+        :param xcoms: A dictionary of XComs or None.
+        :param kwargs: Additional keyword arguments.
+        """
+        if "payload" in kwargs:
+            raise ValueError("Param 'payload' not supported for this class.")
+        super().__init__(payload=self.task_instance_state)
+        self.xcoms = xcoms
+
+    @provide_session
+    def handle_submit(self, *, task_instance: TaskInstance, session: Session = 
NEW_SESSION) -> None:
+        """
+        Submit event for the given task instance.
+
+        Marks the task with the state `task_instance_state` and optionally 
pushes xcom if applicable.
+
+        :param task_instance: The task instance to be submitted.
+        :param session: The session to be used for the database callback sink.
+        """
+        # Mark the task with terminal state and prevent it from resuming on 
worker
+        task_instance.trigger_id = None
+        task_instance.state = self.task_instance_state
+
+        self._submit_callback_if_necessary(task_instance=task_instance, 
session=session)
+        self._push_xcoms_if_necessary(task_instance=task_instance)
+
+    def _submit_callback_if_necessary(self, *, task_instance: TaskInstance, 
session) -> None:
+        """Submit a callback request if the task state is SUCCESS or FAILED."""
+        is_failure = self.task_instance_state == TaskInstanceState.FAILED
+        if self.task_instance_state in [TaskInstanceState.SUCCESS, 
TaskInstanceState.FAILED]:
+            request = TaskCallbackRequest(
+                full_filepath=task_instance.dag_model.fileloc,
+                simple_task_instance=SimpleTaskInstance.from_ti(task_instance),
+                is_failure_callback=is_failure,
+                task_callback_type=self.task_instance_state,
+            )
+            log.info("Sending callback: %s", request)
+            try:
+                DatabaseCallbackSink().send(callback=request, session=session)
+            except Exception as e:
+                log.error("Failed to send callback: %s", e)

Review Comment:
   ```suggestion
               except Exception:
                   log.exception("Failed to send callback.")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to