jedcunningham commented on code in PR #39585:
URL: https://github.com/apache/airflow/pull/39585#discussion_r1634005883
##########
airflow/models/taskinstance.py:
##########
@@ -3000,12 +3019,12 @@ def _execute_task(self, context: Context, task_orig:
Operator):
return _execute_task(self, context, task_orig)
@provide_session
- def defer_task(self, exception: TaskDeferred, session: Session) -> None:
- """Mark the task as deferred and sets up the trigger that is needed to
resume it.
+ def defer_task(self, exception: TaskDeferred | None = None, session:
Session = NEW_SESSION) -> None:
+ """Mark the task as deferred and sets up the trigger that is needed to
resume it when TaskDeferred is raised.
:meta: private
"""
- _defer_task(ti=self, exception=exception, session=session)
+ _defer_task(ti=self, session=session, exception=exception)
Review Comment:
```suggestion
_defer_task(ti=self, exception=exception, session=session)
```
nit: this order matches the signature.
##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -143,10 +143,14 @@ The ``self.defer`` call raises the ``TaskDeferred``
exception, so it can work an
Triggering Deferral from Start
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-If you want to defer your task directly to the triggerer without going into
the worker, you can add the class level attributes ``start_trigger`` and
``next_method`` to your deferrable operator.
+ .. versionadded:: 2.10.0
-* ``start_trigger``: An instance of a trigger you want to defer to. It will be
serialized into the database.
+If you want to defer your task directly to the triggerer without going into
the worker, you can set class level attribute ``start_with_trigger`` to
``True`` add add class level attribute ``start_trigger_args`` with the
following 4 attributes to your deferrable operator.
Review Comment:
```suggestion
If you want to defer your task directly to the triggerer without going into
the worker, you can set class level attribute ``start_with_trigger`` to
``True`` and add a class level attribute ``start_trigger_args`` with an
``StartTriggerArgs`` object with the following 4 attributes to your deferrable
operator:
```
##########
airflow/models/taskinstance.py:
##########
@@ -1575,12 +1575,29 @@ def _coalesce_to_orm_ti(*, ti: TaskInstancePydantic |
TaskInstance, session: Ses
@internal_api_call
@provide_session
def _defer_task(
- ti: TaskInstance | TaskInstancePydantic, exception: TaskDeferred, session:
Session = NEW_SESSION
+ ti: TaskInstance | TaskInstancePydantic,
+ exception: TaskDeferred | None = None,
+ session: Session = NEW_SESSION,
) -> TaskInstancePydantic | TaskInstance:
from airflow.models.trigger import Trigger
+ if exception is not None:
+ trigger_row = Trigger.from_object(exception.trigger)
+ trigger_kwargs = exception.kwargs
+ next_method = exception.method_name
+ timeout = exception.timeout
+ elif ti.task is not None and ti.task.start_trigger_args is not None:
+ trigger_row = Trigger(
+ classpath=ti.task.start_trigger_args.trigger_cls,
+ kwargs=ti.task.start_trigger_args.trigger_kwargs or {},
+ )
+ trigger_kwargs = ti.task.start_trigger_args.trigger_kwargs
Review Comment:
Should this also be `or {}`?
##########
airflow/serialization/pydantic/taskinstance.py:
##########
@@ -499,7 +499,7 @@ def _register_dataset_changes(self, *, events, session:
Session | None = None) -
def defer_task(self, exception: TaskDeferred, session: Session | None =
None):
"""Defer task."""
- updated_ti = _defer_task(ti=self, exception=exception, session=session)
+ updated_ti = _defer_task(ti=self, session=session, exception=exception)
Review Comment:
```suggestion
updated_ti = _defer_task(ti=self, exception=exception,
session=session)
```
same nit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]