Lee-W commented on code in PR #40084:
URL: https://github.com/apache/airflow/pull/40084#discussion_r1684537451
##########
airflow/sensors/time_delta.py:
##########
@@ -59,28 +59,34 @@ class TimeDeltaSensorAsync(TimeDeltaSensor):
Will defers itself to avoid taking up a worker slot while it is waiting.
:param delta: time length to wait after the data interval before
succeeding.
+ :param end_from_trigger: End the task directly from the triggerer without
going into the worker.
.. seealso::
For more information on how to use this sensor, take a look at the
guide:
:ref:`howto/operator:TimeDeltaSensorAsync`
"""
+ def __init__(self, *, end_from_trigger: bool = False, delta, **kwargs):
Review Comment:
```suggestion
def __init__(self, *, end_from_trigger: bool = False, delta, **kwargs)
-> None:
```
##########
airflow/triggers/base.py:
##########
@@ -137,3 +150,96 @@ def __eq__(self, other):
if isinstance(other, TriggerEvent):
return other.payload == self.payload
return False
+
+ @provide_session
+ def handle_submit(self, *, task_instance: TaskInstance, session: Session =
NEW_SESSION) -> None:
+ """
+ Handle the submit event for a given task instance.
+
+ This function sets the next method and next kwargs of the task
instance,
+ as well as its state to scheduled. It also adds the event's payload
+ into the kwargs for the task.
+
+ :param task_instance: The task instance to handle the submit event for.
+ :param session: The session to be used for the database callback sink.
+ """
+ next_kwargs = task_instance.next_kwargs or {}
+ next_kwargs["event"] = self.payload
+ task_instance.next_kwargs = next_kwargs
+ task_instance.trigger_id = None
+ task_instance.state = TaskInstanceState.SCHEDULED
+
+
+class BaseTaskEndEvent(TriggerEvent):
+ """Base event class to end the task without resuming on worker."""
+
+ task_instance_state: TaskInstanceState
+
+ def __init__(self, *, xcoms: dict[str, Any] | None = None, **kwargs) ->
None:
+ """
+ Initialize the class with the specified parameters.
+
+ :param xcoms: A dictionary of XComs or None.
+ :param kwargs: Additional keyword arguments.
+ """
+ if "payload" in kwargs:
+ raise ValueError("Param 'payload' not supported for this class.")
+ super().__init__(payload=self.task_instance_state)
+ self.xcoms = xcoms
+
+ @provide_session
+ def handle_submit(self, *, task_instance: TaskInstance, session: Session =
NEW_SESSION) -> None:
+ """
+ Submit event for the given task instance.
+
+ Marks the task with the state `task_instance_state` and optionally
pushes xcom if applicable.
+
+ :param task_instance: The task instance to be submitted.
+ :param session: The session to be used for the database callback sink.
+ """
+ # Mark the task with terminal state and prevent it from resuming on
worker
+ task_instance.trigger_id = None
+ task_instance.state = self.task_instance_state
+
+ self._submit_callback_if_necessary(task_instance=task_instance,
session=session)
+ self._push_xcoms_if_necessary(task_instance=task_instance)
+
+ def _submit_callback_if_necessary(self, *, task_instance: TaskInstance,
session) -> None:
+ """Submit a callback request if the task state is SUCCESS or FAILED."""
+ is_failure = self.task_instance_state == TaskInstanceState.FAILED
+ if self.task_instance_state in [TaskInstanceState.SUCCESS,
TaskInstanceState.FAILED]:
Review Comment:
```suggestion
if self.task_instance_state in (TaskInstanceState.SUCCESS,
TaskInstanceState.FAILED]):
```
##########
airflow/triggers/temporal.py:
##########
@@ -84,7 +95,11 @@ class TimeDeltaTrigger(DateTimeTrigger):
While this is its own distinct class here, it will serialise to a
DateTimeTrigger class, since they're operationally the same.
+
+ :param delta: how long to wait
+ :param end_from_trigger: whether the trigger should mark the task
successful after time condition
+ reached or resume the task after time condition reached.
"""
- def __init__(self, delta: datetime.timedelta):
- super().__init__(moment=timezone.utcnow() + delta)
+ def __init__(self, delta: datetime.timedelta, *, end_from_trigger: bool =
False):
Review Comment:
```suggestion
def __init__(self, delta: datetime.timedelta, *, end_from_trigger: bool
= False) -> None:
```
##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -213,6 +213,70 @@ This is particularly useful when deferring is the only
thing the ``execute`` met
# We have no more work to do here. Mark as complete.
return
+Exiting deferred task from Triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ .. versionadded:: 2.10.0
+
+If you want to exit your task directly from the triggerer without going into
the worker, you can specific operator level attribute ``end_from_trigger`` with
the attributes to your deferrable operator other discussed above.
Review Comment:
Is the name `end_from_trigger` a convention or a requirement?
##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -213,6 +213,70 @@ This is particularly useful when deferring is the only
thing the ``execute`` met
# We have no more work to do here. Mark as complete.
return
+Exiting deferred task from Triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ .. versionadded:: 2.10.0
+
+If you want to exit your task directly from the triggerer without going into
the worker, you can specific operator level attribute ``end_from_trigger`` with
the attributes to your deferrable operator other discussed above.
+
+Triggers can have two options: they can either send execution back to the
worker or end the task instance directly. If the trigger ends the task instance
itself, the ``method_name`` does not matter and can be ``None``. Otherwise,
provide the name of the ``method_name`` that should be used when resuming
execution in the task.
+
+.. code-block:: python
+
+ class DateTimeSensorAsync(DateTimeSensor):
+ def __init__(self, end_from_trigger: bool = True, **kwargs) -> None:
+ super().__init__(**kwargs)
+ self.end_from_trigger = end_from_trigger
+
+ def execute(self, context: Context) -> NoReturn:
+ self.defer(
+ method_name="execute_complete",
Review Comment:
What if we only want to support the `end_from_tirgger` mode? Should we make
it `None` or `__trigger_exit__`?
##########
airflow/sensors/time_delta.py:
##########
@@ -59,28 +59,34 @@ class TimeDeltaSensorAsync(TimeDeltaSensor):
Will defers itself to avoid taking up a worker slot while it is waiting.
:param delta: time length to wait after the data interval before
succeeding.
+ :param end_from_trigger: End the task directly from the triggerer without
going into the worker.
.. seealso::
For more information on how to use this sensor, take a look at the
guide:
:ref:`howto/operator:TimeDeltaSensorAsync`
"""
+ def __init__(self, *, end_from_trigger: bool = False, delta, **kwargs):
+ super().__init__(delta=delta, **kwargs)
+ self.end_from_trigger = end_from_trigger
+ self.delta = delta
Review Comment:
```suggestion
self.delta = delta
```
I think it's assigned in `TimeDeltaSensor`
##########
airflow/sensors/time_sensor.py:
##########
@@ -73,12 +75,11 @@ def __init__(self, *, target_time: datetime.time, **kwargs)
-> None:
self.target_datetime = timezone.convert_to_utc(aware_time)
def execute(self, context: Context) -> NoReturn:
- trigger = DateTimeTrigger(moment=self.target_datetime)
self.defer(
- trigger=trigger,
+ trigger=DateTimeTrigger(moment=self.target_datetime,
end_from_trigger=self.end_from_trigger),
method_name="execute_complete",
)
- def execute_complete(self, context, event=None) -> None:
- """Execute when the trigger fires - returns immediately."""
+ def execute_complete(self, context, event=None):
Review Comment:
```suggestion
def execute_complete(self, context: Context, event: Any = None) -> None:
```
##########
airflow/triggers/base.py:
##########
@@ -137,3 +150,96 @@ def __eq__(self, other):
if isinstance(other, TriggerEvent):
return other.payload == self.payload
return False
+
+ @provide_session
+ def handle_submit(self, *, task_instance: TaskInstance, session: Session =
NEW_SESSION) -> None:
+ """
+ Handle the submit event for a given task instance.
+
+ This function sets the next method and next kwargs of the task
instance,
+ as well as its state to scheduled. It also adds the event's payload
+ into the kwargs for the task.
+
+ :param task_instance: The task instance to handle the submit event for.
+ :param session: The session to be used for the database callback sink.
+ """
+ next_kwargs = task_instance.next_kwargs or {}
+ next_kwargs["event"] = self.payload
+ task_instance.next_kwargs = next_kwargs
+ task_instance.trigger_id = None
+ task_instance.state = TaskInstanceState.SCHEDULED
+
+
+class BaseTaskEndEvent(TriggerEvent):
+ """Base event class to end the task without resuming on worker."""
Review Comment:
Should we add `:meta private:` here? I feel this is probably something we
don't need to make it public
##########
airflow/triggers/temporal.py:
##########
@@ -34,9 +34,13 @@ class DateTimeTrigger(BaseTrigger):
a few seconds.
The provided datetime MUST be in UTC.
+
+ :param moment: when to yield event
+ :param end_from_trigger: whether the trigger should mark the task
successful after time condition
+ reached or resume the task after time condition reached.
"""
- def __init__(self, moment: datetime.datetime):
+ def __init__(self, moment: datetime.datetime, *, end_from_trigger: bool =
False):
Review Comment:
```suggestion
def __init__(self, moment: datetime.datetime, *, end_from_trigger: bool
= False) -> None:
```
##########
airflow/triggers/base.py:
##########
@@ -137,3 +150,96 @@ def __eq__(self, other):
if isinstance(other, TriggerEvent):
return other.payload == self.payload
return False
+
+ @provide_session
+ def handle_submit(self, *, task_instance: TaskInstance, session: Session =
NEW_SESSION) -> None:
+ """
+ Handle the submit event for a given task instance.
+
+ This function sets the next method and next kwargs of the task
instance,
+ as well as its state to scheduled. It also adds the event's payload
+ into the kwargs for the task.
+
+ :param task_instance: The task instance to handle the submit event for.
+ :param session: The session to be used for the database callback sink.
+ """
+ next_kwargs = task_instance.next_kwargs or {}
+ next_kwargs["event"] = self.payload
+ task_instance.next_kwargs = next_kwargs
+ task_instance.trigger_id = None
+ task_instance.state = TaskInstanceState.SCHEDULED
+
+
+class BaseTaskEndEvent(TriggerEvent):
+ """Base event class to end the task without resuming on worker."""
+
+ task_instance_state: TaskInstanceState
+
+ def __init__(self, *, xcoms: dict[str, Any] | None = None, **kwargs) ->
None:
+ """
+ Initialize the class with the specified parameters.
+
+ :param xcoms: A dictionary of XComs or None.
+ :param kwargs: Additional keyword arguments.
+ """
+ if "payload" in kwargs:
+ raise ValueError("Param 'payload' not supported for this class.")
+ super().__init__(payload=self.task_instance_state)
+ self.xcoms = xcoms
+
+ @provide_session
+ def handle_submit(self, *, task_instance: TaskInstance, session: Session =
NEW_SESSION) -> None:
+ """
+ Submit event for the given task instance.
+
+ Marks the task with the state `task_instance_state` and optionally
pushes xcom if applicable.
+
+ :param task_instance: The task instance to be submitted.
+ :param session: The session to be used for the database callback sink.
+ """
+ # Mark the task with terminal state and prevent it from resuming on
worker
+ task_instance.trigger_id = None
+ task_instance.state = self.task_instance_state
+
+ self._submit_callback_if_necessary(task_instance=task_instance,
session=session)
+ self._push_xcoms_if_necessary(task_instance=task_instance)
+
+ def _submit_callback_if_necessary(self, *, task_instance: TaskInstance,
session) -> None:
+ """Submit a callback request if the task state is SUCCESS or FAILED."""
+ is_failure = self.task_instance_state == TaskInstanceState.FAILED
+ if self.task_instance_state in [TaskInstanceState.SUCCESS,
TaskInstanceState.FAILED]:
+ request = TaskCallbackRequest(
+ full_filepath=task_instance.dag_model.fileloc,
+ simple_task_instance=SimpleTaskInstance.from_ti(task_instance),
+ is_failure_callback=is_failure,
+ task_callback_type=self.task_instance_state,
+ )
+ log.info("Sending callback: %s", request)
+ try:
+ DatabaseCallbackSink().send(callback=request, session=session)
+ except Exception as e:
+ log.error("Failed to send callback: %s", e)
Review Comment:
```suggestion
except Exception:
log.exception("Failed to send callback.")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]