kaxil commented on code in PR #45106:
URL: https://github.com/apache/airflow/pull/45106#discussion_r1898412149


##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -199,17 +199,22 @@ def ti_update_state(
         )
 
     # We exclude_unset to avoid updating fields that are not set in the payload
-    data = ti_patch_payload.model_dump(exclude_unset=True)
+    # We do not need to deserialize "should_retry" -- it is used for dynamic 
decision-making within failed state
+    data = ti_patch_payload.model_dump(exclude_unset=True, 
exclude={"should_retry"})
 
     query = update(TI).where(TI.id == ti_id_str).values(data)
 
     if isinstance(ti_patch_payload, TITerminalStatePayload):
         query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
         query = query.values(state=ti_patch_payload.state)
+        updated_state = ti_patch_payload.state
         if ti_patch_payload.state == State.FAILED:
             # clear the next_method and next_kwargs
             query = query.values(next_method=None, next_kwargs=None)
-            updated_state = State.FAILED
+            task_instance = session.get(TI, ti_id_str)
+            if _is_eligible_to_retry(task_instance, 
ti_patch_payload.should_retry):

Review Comment:
   Do we need to get entire TI here?



##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -199,17 +199,22 @@ def ti_update_state(
         )
 
     # We exclude_unset to avoid updating fields that are not set in the payload
-    data = ti_patch_payload.model_dump(exclude_unset=True)
+    # We do not need to deserialize "should_retry" -- it is used for dynamic 
decision-making within failed state
+    data = ti_patch_payload.model_dump(exclude_unset=True, 
exclude={"should_retry"})
 
     query = update(TI).where(TI.id == ti_id_str).values(data)
 
     if isinstance(ti_patch_payload, TITerminalStatePayload):
         query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
         query = query.values(state=ti_patch_payload.state)
+        updated_state = ti_patch_payload.state
         if ti_patch_payload.state == State.FAILED:
             # clear the next_method and next_kwargs
             query = query.values(next_method=None, next_kwargs=None)
-            updated_state = State.FAILED
+            task_instance = session.get(TI, ti_id_str)
+            if _is_eligible_to_retry(task_instance, 
ti_patch_payload.should_retry):
+                query = query.values(state=State.UP_FOR_RETRY)
+                updated_state = State.UP_FOR_RETRY

Review Comment:
   ```suggestion
                   updated_state = State.UP_FOR_RETRY
                   query = query.values(state=updated_state)
   ```



##########
task_sdk/src/airflow/sdk/api/client.py:
##########
@@ -128,7 +128,11 @@ def finish(self, id: uuid.UUID, state: TerminalTIState, 
when: datetime):
         """Tell the API server that this TI has reached a terminal state."""
         # TODO: handle the naming better. finish sounds wrong as "even" 
deferred is essentially finishing.
         body = TITerminalStatePayload(end_date=when, 
state=TerminalTIState(state))
+        self.client.patch(f"task-instances/{id}/state", 
content=body.model_dump_json())
 
+    def fail(self, id: uuid.UUID, when: datetime, should_retry: bool):
+        """Tell the API server that this TI has to fail, with or without 
retries."""
+        body = TITerminalStatePayload(end_date=when, 
state=TerminalTIState.FAILED, should_retry=should_retry)

Review Comment:
   Hmm do we need a separate func here (`finish` and `fail` feels confusing), 
`should_retry` already has default, we can not send it if it is False?



##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -710,7 +714,14 @@ def handle_requests(self, log: FilteringBoundLogger) -> 
Generator[None, bytes, N
     def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger):
         log.debug("Received message from task runner", msg=msg)
         resp = None
-        if isinstance(msg, TaskState):
+        if isinstance(msg, FailState):
+            self._terminal_state = TerminalTIState.FAILED
+            self._task_end_time_monotonic = time.monotonic()
+            self._fail_request_sent = True

Review Comment:
   Why do we need `self_fail_request_sent`?



##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -199,17 +199,22 @@ def ti_update_state(
         )
 
     # We exclude_unset to avoid updating fields that are not set in the payload
-    data = ti_patch_payload.model_dump(exclude_unset=True)
+    # We do not need to deserialize "should_retry" -- it is used for dynamic 
decision-making within failed state
+    data = ti_patch_payload.model_dump(exclude_unset=True, 
exclude={"should_retry"})
 
     query = update(TI).where(TI.id == ti_id_str).values(data)
 
     if isinstance(ti_patch_payload, TITerminalStatePayload):
         query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
         query = query.values(state=ti_patch_payload.state)
+        updated_state = ti_patch_payload.state
         if ti_patch_payload.state == State.FAILED:
             # clear the next_method and next_kwargs
             query = query.values(next_method=None, next_kwargs=None)
-            updated_state = State.FAILED
+            task_instance = session.get(TI, ti_id_str)
+            if _is_eligible_to_retry(task_instance, 
ti_patch_payload.should_retry):

Review Comment:
   You can also get `max_tries` & `try_number` too on L188 and here simply 
compare
   
   
https://github.com/apache/airflow/blob/873e765376eb9654e5c8b8a71d95f92dd96f8958/airflow/api_fastapi/execution_api/routes/task_instances.py#L188



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to