ashb commented on code in PR #45106:
URL: https://github.com/apache/airflow/pull/45106#discussion_r1895724282


##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -359,3 +364,23 @@ def ti_put_rtif(
     _update_rtif(task_instance, put_rtif_payload, session)
 
     return {"message": "Rendered task instance fields successfully set"}
+
+
+def _is_eligible_to_retry(task_instance: TI, task_retries: int | None):

Review Comment:
   Two things;
   
   1) This should probably be a method on TI. It might already exist as a 
method actually
   2) I don't grok what we are doing with the `task_retries` parameter -- why 
do we need to pass it at all? When is `task_instance.try_number <= 
task_instance.max_tries` not right/good enough?



##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -518,13 +520,15 @@ def wait(self) -> int:
         # If it hasn't, assume it's failed
         self._exit_code = self._exit_code if self._exit_code is not None else 1
 
+        print("The exit code is", self._exit_code)
+

Review Comment:
   I assume this is a debug print left over
   
   ```suggestion
   ```



##########
tests/api_fastapi/execution_api/routes/test_task_instances.py:
##########
@@ -340,6 +340,71 @@ def test_ti_update_state_to_reschedule(self, client, 
session, create_task_instan
         assert trs[0].map_index == -1
         assert trs[0].duration == 129600
 
+    @pytest.mark.parametrize(
+        ("retries", "expected_state"),
+        [
+            # retries given
+            (2, State.UP_FOR_RETRY),
+            # retries not given
+            (None, State.FAILED),
+            # retries given but as 0
+            (0, State.FAILED),
+            # retries not known, given as -1, calculates on table default
+            (-1, State.UP_FOR_RETRY),
+        ],
+    )
+    def test_ti_update_state_to_retry(self, client, session, 
create_task_instance, retries, expected_state):
+        ti = create_task_instance(
+            task_id="test_ti_update_state_to_retry",
+            state=State.RUNNING,
+        )
+        ti.retries = retries
+        session.commit()
+
+        response = client.patch(
+            f"/execution/task-instances/{ti.id}/state",
+            json={
+                "state": State.FAILED,
+                "end_date": DEFAULT_END_DATE.isoformat(),
+                "task_retries": retries,

Review Comment:
   Yeah, this feels like a very leaky abstraction, lets remove it, and if we 
want a "fail and don't retry" lets have that be something else rather than 
overload `task_retires` in a non-obvious manner.



##########
tests/api_fastapi/execution_api/routes/test_task_instances.py:
##########
@@ -340,6 +340,71 @@ def test_ti_update_state_to_reschedule(self, client, 
session, create_task_instan
         assert trs[0].map_index == -1
         assert trs[0].duration == 129600
 
+    @pytest.mark.parametrize(
+        ("retries", "expected_state"),
+        [
+            # retries given
+            (2, State.UP_FOR_RETRY),
+            # retries not given
+            (None, State.FAILED),
+            # retries given but as 0
+            (0, State.FAILED),
+            # retries not known, given as -1, calculates on table default
+            (-1, State.UP_FOR_RETRY),
+        ],
+    )
+    def test_ti_update_state_to_retry(self, client, session, 
create_task_instance, retries, expected_state):
+        ti = create_task_instance(
+            task_id="test_ti_update_state_to_retry",
+            state=State.RUNNING,
+        )
+        ti.retries = retries
+        session.commit()
+
+        response = client.patch(
+            f"/execution/task-instances/{ti.id}/state",
+            json={
+                "state": State.FAILED,
+                "end_date": DEFAULT_END_DATE.isoformat(),
+                "task_retries": retries,
+            },
+        )
+
+        assert response.status_code == 204
+        assert response.text == ""
+
+        session.expire_all()
+
+        ti = session.get(TaskInstance, ti.id)
+        assert ti.state == expected_state
+        assert ti.next_method is None
+        assert ti.next_kwargs is None
+
+    def test_ti_update_state_to_retry_when_restarting(self, client, session, 
create_task_instance):
+        ti = create_task_instance(
+            task_id="test_ti_update_state_to_retry_when_restarting",
+            state=State.RESTARTING,
+        )
+        session.commit()

Review Comment:
   This does nothing I think, or at the very least you should pass session on 
to `create_task_instance` too.



##########
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##########
@@ -59,6 +59,8 @@ class TITerminalStatePayload(BaseModel):
     end_date: UtcDateTime
     """When the task completed executing"""
 
+    task_retries: int | None = None

Review Comment:
   I think the fact that this shows up in theĀ API makes it too confusing to 
client authors, so lets work out a way to remove this 



##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -714,6 +718,14 @@ def _handle_request(self, msg: ToSupervisor, log: 
FilteringBoundLogger):
         if isinstance(msg, TaskState):
             self._terminal_state = msg.state
             self._task_end_time_monotonic = time.monotonic()
+            if msg.task_retries:
+                self.client.task_instances.finish(
+                    id=self.id,
+                    state=self.final_state,
+                    when=datetime.now(tz=timezone.utc),
+                    task_retries=msg.task_retries,
+                )
+                self._should_retry = True

Review Comment:
   What triggers this path? I don't really think this is needed/I can't work 
out when it's not always triggered. This feels like something we should handle 
only on the server side, and not include this extra info at all from the client



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to