kaxil commented on code in PR #60855:
URL: https://github.com/apache/airflow/pull/60855#discussion_r2985452026


##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -216,7 +216,18 @@ def start(self, id: uuid.UUID, pid: int, when: datetime) 
-> TIRunContext:
         """Tell the API server that this TI has started running."""
         body = TIEnterRunningPayload(pid=pid, hostname=get_hostname(), 
unixname=getuser(), start_date=when)
 
-        resp = self.client.patch(f"task-instances/{id}/run", 
content=body.model_dump_json())
+        try:
+            resp = self.client.patch(f"task-instances/{id}/run", 
content=body.model_dump_json())
+        except ServerResponseError as e:
+            if e.response.status_code == HTTPStatus.CONFLICT:
+                detail = e.detail
+                if (
+                    isinstance(detail, dict)
+                    and detail.get("reason") == "invalid_state"
+                    and detail.get("previous_state") == "running"
+                ):
+                    raise TaskAlreadyRunningError(f"Task {id} is already 
running") from e

Review Comment:
   `TaskAlreadyRunningError` inherits from `AirflowException`, not 
`ServerResponseError`. Anyone currently catching `ServerResponseError` from 
`start()` to detect this 409 case will stop catching it. The deleted supervisor 
test (`test_supervisor_handles_already_running_task`) was doing exactly that -- 
catching `ServerResponseError`.
   
   This is probably the right behavior change, but should there be a 
supervisor-level test that verifies `TaskAlreadyRunningError` propagates 
through `ActivitySubprocess.start()` and kills the subprocess? The deleted test 
covered that integration path; the new `test_client.py` tests only cover the 
`TaskInstanceOperations.start()` layer.



##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -216,7 +216,18 @@ def start(self, id: uuid.UUID, pid: int, when: datetime) 
-> TIRunContext:
         """Tell the API server that this TI has started running."""
         body = TIEnterRunningPayload(pid=pid, hostname=get_hostname(), 
unixname=getuser(), start_date=when)
 
-        resp = self.client.patch(f"task-instances/{id}/run", 
content=body.model_dump_json())
+        try:
+            resp = self.client.patch(f"task-instances/{id}/run", 
content=body.model_dump_json())
+        except ServerResponseError as e:
+            if e.response.status_code == HTTPStatus.CONFLICT:
+                detail = e.detail
+                if (
+                    isinstance(detail, dict)
+                    and detail.get("reason") == "invalid_state"
+                    and detail.get("previous_state") == "running"
+                ):
+                    raise TaskAlreadyRunningError(f"Task {id} is already 
running") from e

Review Comment:
   Nit: `id` here is a task *instance* UUID, not a task ID. `f"Task instance 
{id} is already running"` would be clearer in logs and error messages.



##########
task-sdk/tests/task_sdk/execution_time/test_supervisor.py:
##########
@@ -731,40 +731,6 @@ def mock_monotonic():
             "task_instance_id": str(ti.id),
         } in captured_logs
 
-    def test_supervisor_handles_already_running_task(self):

Review Comment:
   The deleted test verified the supervisor's end-to-end behavior when the API 
returns 409 for an already-running task (subprocess gets killed, error 
propagates). The new client-level tests cover the exception translation, but 
nothing tests the supervisor's reaction to `TaskAlreadyRunningError` 
specifically.
   
   Consider adding a replacement that asserts `ActivitySubprocess.start()` 
raises `TaskAlreadyRunningError` (not `ServerResponseError`) and that the child 
process is killed.



##########
task-sdk/tests/task_sdk/api/test_client.py:
##########
@@ -333,6 +333,55 @@ def handle_request(request: httpx.Request) -> 
httpx.Response:
             assert resp == ti_context
             assert call_count == 3
 
+    def test_task_instance_start_already_running(self):
+        """Test that start() raises TaskAlreadyRunningError when TI is already 
running."""
+        from airflow.sdk.exceptions import TaskAlreadyRunningError

Review Comment:
   `TaskAlreadyRunningError` is imported inside the test body. Other exceptions 
like `ErrorType` are imported at the top of the file (line 50). Move this up 
there for consistency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to