dstandish commented on code in PR #39585:
URL: https://github.com/apache/airflow/pull/39585#discussion_r1619935579
##########
airflow/models/taskinstance.py:
##########
@@ -1580,13 +1581,39 @@ def _coalesce_to_orm_ti(*, ti: TaskInstancePydantic |
TaskInstance, session: Ses
@internal_api_call
@provide_session
-def _defer_task(
+def _defer_task_from_task_deferred(
ti: TaskInstance | TaskInstancePydantic, exception: TaskDeferred, session:
Session = NEW_SESSION
) -> TaskInstancePydantic | TaskInstance:
from airflow.models.trigger import Trigger
# First, make the trigger entry
trigger_row = Trigger.from_object(exception.trigger)
+ updated_ti = _defer_task(
+ ti=ti,
+ session=session,
+ trigger_row=trigger_row,
+ trigger_kwargs=exception.kwargs,
+ next_method=exception.method_name,
+ timeout=exception.timeout,
+ )
+
+ session.merge(updated_ti)
+ session.commit()
+ return updated_ti
+
+
+@internal_api_call
+@provide_session
+def _defer_task(
Review Comment:
i think it's unnecessary to change the signature of this function and add
the other two wrapper functions.
just build the exception object and pass it
```
e = TaskDeferred(trigger=BaseTrigger(...), method_name="...", ...)
```
unfortunately, we don't have integration tests set up for AIP-44 but if we
did then it would reveal that this will break that mode of execution because
for example there's no mechanism for serializing `Trigger`, possibly among
other things.
obviously it's not your fault and a shortcoming of the current state, but in
any case i think it will be simpler anyway to keep the existing signature and
just build and exc object that you don't raise.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]