amoghrajesh commented on code in PR #44241:
URL: https://github.com/apache/airflow/pull/44241#discussion_r1855966889


##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -122,6 +124,53 @@ def ti_update_state(
         )
     elif isinstance(ti_patch_payload, TITerminalStatePayload):
         query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
+    elif isinstance(ti_patch_payload, TIDeferredStatePayload):
+        trigger_row = Trigger(
+            classpath=ti_patch_payload.classpath,
+            kwargs=ti_patch_payload.kwargs,
+            created_date=ti_patch_payload.created_date,
+        )
+        session.add(trigger_row)
+        session.flush()
+
+        ti = session.query(TI).filter(TI.id == ti_id_str).one_or_none()
+
+        if not ti:
+            raise HTTPException(
+                status_code=status.HTTP_400_BAD_REQUEST,
+                detail={
+                    "message": f"TaskInstance with id {ti_id_str} not found.",
+                },
+            )
+
+        ti.state = TaskInstanceState.DEFERRED
+        ti.trigger_id = trigger_row.id
+        ti.next_method = ti_patch_payload.next_method
+        ti.next_kwargs = ti_patch_payload.kwargs or {}
+        timeout = ti_patch_payload.timeout
+
+        # Calculate timeout too if it was passed
+        if timeout is not None:
+            ti.trigger_timeout = timezone.utcnow() + timeout
+        else:
+            ti.trigger_timeout = None
+
+        # If an execution_timeout is set, set the timeout to the minimum of
+        # it and the trigger timeout
+        if ti.task:
+            execution_timeout = ti.task.execution_timeout
+            if execution_timeout:
+                if TYPE_CHECKING:
+                    assert ti.start_date
+                if ti.trigger_timeout:
+                    ti.trigger_timeout = min(ti.start_date + 
execution_timeout, ti.trigger_timeout)
+                else:
+                    ti.trigger_timeout = ti.start_date + execution_timeout
+
+        session.commit()
+
+        log.info("TI %s state updated to: deferred", ti_id_str)
+        return

Review Comment:
   Made the changes in latest commit



##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -122,6 +124,53 @@ def ti_update_state(
         )
     elif isinstance(ti_patch_payload, TITerminalStatePayload):
         query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
+    elif isinstance(ti_patch_payload, TIDeferredStatePayload):
+        trigger_row = Trigger(
+            classpath=ti_patch_payload.classpath,
+            kwargs=ti_patch_payload.kwargs,
+            created_date=ti_patch_payload.created_date,
+        )
+        session.add(trigger_row)
+        session.flush()
+
+        ti = session.query(TI).filter(TI.id == ti_id_str).one_or_none()
+
+        if not ti:
+            raise HTTPException(
+                status_code=status.HTTP_400_BAD_REQUEST,
+                detail={
+                    "message": f"TaskInstance with id {ti_id_str} not found.",
+                },
+            )
+
+        ti.state = TaskInstanceState.DEFERRED
+        ti.trigger_id = trigger_row.id
+        ti.next_method = ti_patch_payload.next_method
+        ti.next_kwargs = ti_patch_payload.kwargs or {}
+        timeout = ti_patch_payload.timeout

Review Comment:
   Made the changes for this



##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -122,6 +124,53 @@ def ti_update_state(
         )
     elif isinstance(ti_patch_payload, TITerminalStatePayload):
         query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
+    elif isinstance(ti_patch_payload, TIDeferredStatePayload):
+        trigger_row = Trigger(
+            classpath=ti_patch_payload.classpath,
+            kwargs=ti_patch_payload.kwargs,
+            created_date=ti_patch_payload.created_date,
+        )
+        session.add(trigger_row)
+        session.flush()

Review Comment:
   Actually we need this part. For deferred, we execute 2 queries, one is to 
update the trigger table, handled by this `session.flush()` and other to update 
the TI table, handled by the session.execute() below. We should keep this to 
take care of the `Trigger` table update. I dont want to extend the 
   ```
       try:
           result = session.execute(query)
           log.info("TI %s state updated: %s row(s) affected", ti_id_str, 
result.rowcount)
   ```
   
   Part to more than one query for a one-off case. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to