potiuk commented on code in PR #38992:
URL: https://github.com/apache/airflow/pull/38992#discussion_r1571143500
##########
airflow/models/taskinstance.py:
##########
@@ -1309,6 +1500,130 @@ def _update_rtif(ti, rendered_fields, session: Session
| None = None):
RenderedTaskInstanceFields.delete_old_records(ti.task_id, ti.dag_id,
session=session)
+def _coalesce_to_orm_ti(*, ti, session: Session):
+ from airflow.models.dagrun import DagRun
+ from airflow.serialization.pydantic.taskinstance import
TaskInstancePydantic
+
+ if isinstance(ti, TaskInstancePydantic):
+ orm_ti = DagRun.fetch_task_instance(
+ dag_id=ti.dag_id,
+ dag_run_id=ti.run_id,
+ task_id=ti.task_id,
+ map_index=ti.map_index,
+ session=session,
+ )
+ ti, pydantic_ti = orm_ti, ti
+ _set_ti_attrs(ti, pydantic_ti)
+ return ti
+
+
+@internal_api_call
+@provide_session
+def _defer_task(
+ ti: TaskInstance | TaskInstancePydantic, exception: TaskDeferred, session:
Session = NEW_SESSION
+):
+ from airflow.models.trigger import Trigger
+
+ if TYPE_CHECKING:
+ assert ti.task
+
Review Comment:
We miss here is `_coalesce_to_orm_ti`. We are really looking and changing
state of the task instance based on what is in the db. This is what
`merge/commit` was supposed to be doing originally. Now we will have a task
instance pydantic from the remote session-> which (potentially) has some
fields modified and we and to merge those changes and commit. also below - we
are modifying ti try_number and state - if we don'tt have the `ti` instance
here, the changes will never make it to the DB - because we modify the pydantic
instance below, not the `ORM Task Instance`. Originally after deferr_task the
ti was merged and then committed, so we need to do the same here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]