potiuk commented on code in PR #38992:
URL: https://github.com/apache/airflow/pull/38992#discussion_r1571154157


##########
airflow/models/taskinstance.py:
##########
@@ -1309,6 +1500,130 @@ def _update_rtif(ti, rendered_fields, session: Session 
| None = None):
     RenderedTaskInstanceFields.delete_old_records(ti.task_id, ti.dag_id, 
session=session)
 
 
+def _coalesce_to_orm_ti(*, ti, session: Session):
+    from airflow.models.dagrun import DagRun
+    from airflow.serialization.pydantic.taskinstance import 
TaskInstancePydantic
+
+    if isinstance(ti, TaskInstancePydantic):
+        orm_ti = DagRun.fetch_task_instance(
+            dag_id=ti.dag_id,
+            dag_run_id=ti.run_id,
+            task_id=ti.task_id,
+            map_index=ti.map_index,
+            session=session,
+        )
+        ti, pydantic_ti = orm_ti, ti
+        _set_ti_attrs(ti, pydantic_ti)
+    return ti
+
+
+@internal_api_call
+@provide_session
+def _defer_task(
+    ti: TaskInstance | TaskInstancePydantic, exception: TaskDeferred, session: 
Session = NEW_SESSION
+):
+    from airflow.models.trigger import Trigger
+
+    if TYPE_CHECKING:
+        assert ti.task
+
+    # First, make the trigger entry
+    trigger_row = Trigger.from_object(exception.trigger)
+    session.add(trigger_row)
+    session.flush()
+
+    # Decrement try number so the next one is the same try
+    # note: we must decrement try_number before changing state
+    # because try_number shenanigans
+    ti.try_number = ti.try_number - 1
+
+    # Then, update ourselves so it matches the deferral request
+    # Keep an eye on the logic in `check_and_change_state_before_execution()`
+    # depending on self.next_method semantics
+    ti.state = TaskInstanceState.DEFERRED
+    ti.trigger_id = trigger_row.id
+    ti.next_method = exception.method_name
+    ti.next_kwargs = exception.kwargs or {}
+
+    # Calculate timeout too if it was passed
+    if exception.timeout is not None:
+        ti.trigger_timeout = timezone.utcnow() + exception.timeout
+    else:
+        ti.trigger_timeout = None
+
+    # If an execution_timeout is set, set the timeout to the minimum of
+    # it and the trigger timeout
+    execution_timeout = ti.task.execution_timeout
+    if execution_timeout:
+        if TYPE_CHECKING:
+            assert ti.start_date
+        if ti.trigger_timeout:
+            ti.trigger_timeout = min(ti.start_date + execution_timeout, 
ti.trigger_timeout)
+        else:
+            ti.trigger_timeout = ti.start_date + execution_timeout
+    if ti.test_mode:
+        _add_log(event=ti.state, task_instance=ti, session=session)
+    session.commit()

Review Comment:
   Hmm. I think we should not `commit` in test_mode - or at least that's not 
what happened: 
   
   ```
                   self.defer_task(defer=defer, session=session)
                   self.log.info(
                       "Pausing task as DEFERRED. dag_id=%s, task_id=%s, 
execution_date=%s, start_date=%s",
                       self.dag_id,
                       self.task_id,
                       _date_or_empty(task_instance=self, 
attr="execution_date"),
                       _date_or_empty(task_instance=self, attr="start_date"),
                   )
                   if not test_mode:
                       session.add(Log(self.state, self))
                       session.merge(self)
                       session.commit()
                   return TaskReturnCode.DEFERRED
   ```
   
   I guess we should leave the default behaviour - which is not committed 
change - that might be used by some testing code  in the "normal" mode.
   
   We should also merge the ti changes here (see above comment).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to