amoghrajesh commented on code in PR #44241:
URL: https://github.com/apache/airflow/pull/44241#discussion_r1855966889
##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -122,6 +124,53 @@ def ti_update_state(
)
elif isinstance(ti_patch_payload, TITerminalStatePayload):
query = TI.duration_expression_update(ti_patch_payload.end_date,
query, session.bind)
+ elif isinstance(ti_patch_payload, TIDeferredStatePayload):
+ trigger_row = Trigger(
+ classpath=ti_patch_payload.classpath,
+ kwargs=ti_patch_payload.kwargs,
+ created_date=ti_patch_payload.created_date,
+ )
+ session.add(trigger_row)
+ session.flush()
+
+ ti = session.query(TI).filter(TI.id == ti_id_str).one_or_none()
+
+ if not ti:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail={
+ "message": f"TaskInstance with id {ti_id_str} not found.",
+ },
+ )
+
+ ti.state = TaskInstanceState.DEFERRED
+ ti.trigger_id = trigger_row.id
+ ti.next_method = ti_patch_payload.next_method
+ ti.next_kwargs = ti_patch_payload.kwargs or {}
+ timeout = ti_patch_payload.timeout
+
+ # Calculate timeout too if it was passed
+ if timeout is not None:
+ ti.trigger_timeout = timezone.utcnow() + timeout
+ else:
+ ti.trigger_timeout = None
+
+ # If an execution_timeout is set, set the timeout to the minimum of
+ # it and the trigger timeout
+ if ti.task:
+ execution_timeout = ti.task.execution_timeout
+ if execution_timeout:
+ if TYPE_CHECKING:
+ assert ti.start_date
+ if ti.trigger_timeout:
+ ti.trigger_timeout = min(ti.start_date +
execution_timeout, ti.trigger_timeout)
+ else:
+ ti.trigger_timeout = ti.start_date + execution_timeout
+
+ session.commit()
+
+ log.info("TI %s state updated to: deferred", ti_id_str)
+ return
Review Comment:
Made the changes in latest commit
##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -122,6 +124,53 @@ def ti_update_state(
)
elif isinstance(ti_patch_payload, TITerminalStatePayload):
query = TI.duration_expression_update(ti_patch_payload.end_date,
query, session.bind)
+ elif isinstance(ti_patch_payload, TIDeferredStatePayload):
+ trigger_row = Trigger(
+ classpath=ti_patch_payload.classpath,
+ kwargs=ti_patch_payload.kwargs,
+ created_date=ti_patch_payload.created_date,
+ )
+ session.add(trigger_row)
+ session.flush()
+
+ ti = session.query(TI).filter(TI.id == ti_id_str).one_or_none()
+
+ if not ti:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail={
+ "message": f"TaskInstance with id {ti_id_str} not found.",
+ },
+ )
+
+ ti.state = TaskInstanceState.DEFERRED
+ ti.trigger_id = trigger_row.id
+ ti.next_method = ti_patch_payload.next_method
+ ti.next_kwargs = ti_patch_payload.kwargs or {}
+ timeout = ti_patch_payload.timeout
Review Comment:
Made the changes for this
##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -122,6 +124,53 @@ def ti_update_state(
)
elif isinstance(ti_patch_payload, TITerminalStatePayload):
query = TI.duration_expression_update(ti_patch_payload.end_date,
query, session.bind)
+ elif isinstance(ti_patch_payload, TIDeferredStatePayload):
+ trigger_row = Trigger(
+ classpath=ti_patch_payload.classpath,
+ kwargs=ti_patch_payload.kwargs,
+ created_date=ti_patch_payload.created_date,
+ )
+ session.add(trigger_row)
+ session.flush()
Review Comment:
Actually we need this part. For deferred, we execute 2 queries, one is to
update the trigger table, handled by this `session.flush()` and other to update
the TI table, handled by the session.execute() below. We should keep this to
take care of the `Trigger` table update. I dont want to extend the
```
try:
result = session.execute(query)
log.info("TI %s state updated: %s row(s) affected", ti_id_str,
result.rowcount)
```
Part to more than one query for a one-off case. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]