jedcunningham commented on code in PR #39585:
URL: https://github.com/apache/airflow/pull/39585#discussion_r1634005883


##########
airflow/models/taskinstance.py:
##########
@@ -3000,12 +3019,12 @@ def _execute_task(self, context: Context, task_orig: 
Operator):
         return _execute_task(self, context, task_orig)
 
     @provide_session
-    def defer_task(self, exception: TaskDeferred, session: Session) -> None:
-        """Mark the task as deferred and sets up the trigger that is needed to 
resume it.
+    def defer_task(self, exception: TaskDeferred | None = None, session: 
Session = NEW_SESSION) -> None:
+        """Mark the task as deferred and sets up the trigger that is needed to 
resume it when TaskDeferred is raised.
 
         :meta: private
         """
-        _defer_task(ti=self, exception=exception, session=session)
+        _defer_task(ti=self, session=session, exception=exception)

Review Comment:
   ```suggestion
           _defer_task(ti=self, exception=exception, session=session)
   ```
   
   nit: this order matches the signature.



##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -143,10 +143,14 @@ The ``self.defer`` call raises the ``TaskDeferred`` 
exception, so it can work an
 Triggering Deferral from Start
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-If you want to defer your task directly to the triggerer without going into 
the worker, you can add the class level attributes ``start_trigger`` and 
``next_method`` to your deferrable operator.
+ .. versionadded:: 2.10.0
 
-* ``start_trigger``: An instance of a trigger you want to defer to. It will be 
serialized into the database.
+If you want to defer your task directly to the triggerer without going into 
the worker, you can set class level attribute ``start_with_trigger`` to 
``True`` add add class level attribute ``start_trigger_args`` with the 
following 4 attributes to your deferrable operator.

Review Comment:
   ```suggestion
   If you want to defer your task directly to the triggerer without going into 
the worker, you can set class level attribute ``start_with_trigger`` to 
``True`` and add a class level attribute ``start_trigger_args`` with an 
``StartTriggerArgs`` object with the following 4 attributes to your deferrable 
operator:
   ```



##########
airflow/models/taskinstance.py:
##########
@@ -1575,12 +1575,29 @@ def _coalesce_to_orm_ti(*, ti: TaskInstancePydantic | 
TaskInstance, session: Ses
 @internal_api_call
 @provide_session
 def _defer_task(
-    ti: TaskInstance | TaskInstancePydantic, exception: TaskDeferred, session: 
Session = NEW_SESSION
+    ti: TaskInstance | TaskInstancePydantic,
+    exception: TaskDeferred | None = None,
+    session: Session = NEW_SESSION,
 ) -> TaskInstancePydantic | TaskInstance:
     from airflow.models.trigger import Trigger
 
+    if exception is not None:
+        trigger_row = Trigger.from_object(exception.trigger)
+        trigger_kwargs = exception.kwargs
+        next_method = exception.method_name
+        timeout = exception.timeout
+    elif ti.task is not None and ti.task.start_trigger_args is not None:
+        trigger_row = Trigger(
+            classpath=ti.task.start_trigger_args.trigger_cls,
+            kwargs=ti.task.start_trigger_args.trigger_kwargs or {},
+        )
+        trigger_kwargs = ti.task.start_trigger_args.trigger_kwargs

Review Comment:
   Should this also be `or {}`?



##########
airflow/serialization/pydantic/taskinstance.py:
##########
@@ -499,7 +499,7 @@ def _register_dataset_changes(self, *, events, session: 
Session | None = None) -
 
     def defer_task(self, exception: TaskDeferred, session: Session | None = 
None):
         """Defer task."""
-        updated_ti = _defer_task(ti=self, exception=exception, session=session)
+        updated_ti = _defer_task(ti=self, session=session, exception=exception)

Review Comment:
   ```suggestion
           updated_ti = _defer_task(ti=self, exception=exception, 
session=session)
   ```
   
   same nit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to