Copilot commented on code in PR #60855:
URL: https://github.com/apache/airflow/pull/60855#discussion_r2981033364


##########
task-sdk/tests/task_sdk/execution_time/test_supervisor.py:
##########
@@ -731,40 +731,6 @@ def mock_monotonic():
             "task_instance_id": str(ti.id),
         } in captured_logs
 
-    def test_supervisor_handles_already_running_task(self):
-        """Test that Supervisor prevents starting a Task Instance that is 
already running."""
-        ti = TaskInstance(
-            id=uuid7(), task_id="b", dag_id="c", run_id="d", try_number=1, 
dag_version_id=uuid7()
-        )
-
-        # Mock API Server response indicating the TI is already running
-        # The API Server would return a 409 Conflict status code if the TI is 
not
-        # in a "queued" state.
-        def handle_request(request: httpx.Request) -> httpx.Response:
-            if request.url.path == f"/task-instances/{ti.id}/run":
-                return httpx.Response(
-                    409,
-                    json={
-                        "reason": "invalid_state",
-                        "message": "TI was not in a state where it could be 
marked as running",
-                        "previous_state": "running",
-                    },
-                )
-
-            return httpx.Response(status_code=204)
-
-        client = make_client(transport=httpx.MockTransport(handle_request))
-
-        with pytest.raises(ServerResponseError, match="Server returned error") 
as err:
-            ActivitySubprocess.start(dag_rel_path=os.devnull, 
bundle_info=FAKE_BUNDLE, what=ti, client=client)
-
-        assert err.value.response.status_code == 409
-        assert err.value.detail == {
-            "reason": "invalid_state",
-            "message": "TI was not in a state where it could be marked as 
running",
-            "previous_state": "running",
-        }
-
     @pytest.mark.parametrize("captured_logs", [logging.ERROR], indirect=True, 
ids=["log_level=error"])
     def test_state_conflict_on_heartbeat(self, captured_logs, monkeypatch, 
mocker, make_ti_context_dict):

Review Comment:
   The supervisor-level test for handling a 409 conflict when starting an 
already-running TI was removed, but there isn't a replacement assertion at the 
supervisor/celery boundary. Since the PR intent is to make broker redelivery of 
an already-running task not propagate as a failure, please add/update a test 
that exercises the supervisor path (e.g. 
`ActivitySubprocess.start`/`supervise`) with this scenario and asserts the 
expected non-failure behavior.



##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -216,7 +216,18 @@ def start(self, id: uuid.UUID, pid: int, when: datetime) 
-> TIRunContext:
         """Tell the API server that this TI has started running."""
         body = TIEnterRunningPayload(pid=pid, hostname=get_hostname(), 
unixname=getuser(), start_date=when)
 
-        resp = self.client.patch(f"task-instances/{id}/run", 
content=body.model_dump_json())
+        try:
+            resp = self.client.patch(f"task-instances/{id}/run", 
content=body.model_dump_json())
+        except ServerResponseError as e:
+            if e.response.status_code == HTTPStatus.CONFLICT:
+                detail = e.detail
+                if (
+                    isinstance(detail, dict)
+                    and detail.get("reason") == "invalid_state"
+                    and detail.get("previous_state") == "running"
+                ):
+                    raise TaskAlreadyRunningError(f"Task {id} is already 
running") from e

Review Comment:
   `TaskInstanceOperations.start()` now raises `TaskAlreadyRunningError` on a 
409/invalid_state(previous_state=running), but there are no callers in the repo 
that catch/handle this exception (searching for `TaskAlreadyRunningError` only 
finds this file and tests). In the Celery redelivery scenario this will still 
surface as a task failure to Celery/Scheduler unless the supervisor / celery 
worker path explicitly treats it as a non-error (e.g., exit cleanly without 
reporting failure).
   ```suggestion
                       # In this case the task is already running. Treat this 
as a benign race and
                       # attempt to return a TIRunContext instead of surfacing 
a failure.
                       structlog.get_logger(__name__).info(
                           "Task already running when starting; treating as 
no-op",
                           task_instance_id=str(id),
                       )
                       try:
                           return 
TIRunContext.model_validate_json(e.response.read())
                       except Exception:
                           # If we can't interpret the response body, fall back 
to raising the
                           # dedicated error to avoid masking unexpected API 
responses.
                           raise TaskAlreadyRunningError(f"Task {id} is already 
running") from e
   ```



##########
task-sdk/tests/task_sdk/api/test_client.py:
##########
@@ -333,6 +333,55 @@ def handle_request(request: httpx.Request) -> 
httpx.Response:
             assert resp == ti_context
             assert call_count == 3
 
+    def test_task_instance_start_already_running(self):
+        """Test that start() raises TaskAlreadyRunningError when TI is already 
running."""
+        from airflow.sdk.exceptions import TaskAlreadyRunningError
+

Review Comment:
   This test introduces an import inside the test function body (`from 
airflow.sdk.exceptions import TaskAlreadyRunningError`). The codebase generally 
keeps imports at module scope; please move this import to the top of the file 
unless there's a specific reason it must be local.



##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -216,7 +216,18 @@ def start(self, id: uuid.UUID, pid: int, when: datetime) 
-> TIRunContext:
         """Tell the API server that this TI has started running."""
         body = TIEnterRunningPayload(pid=pid, hostname=get_hostname(), 
unixname=getuser(), start_date=when)
 
-        resp = self.client.patch(f"task-instances/{id}/run", 
content=body.model_dump_json())
+        try:
+            resp = self.client.patch(f"task-instances/{id}/run", 
content=body.model_dump_json())
+        except ServerResponseError as e:
+            if e.response.status_code == HTTPStatus.CONFLICT:
+                detail = e.detail
+                if (
+                    isinstance(detail, dict)
+                    and detail.get("reason") == "invalid_state"
+                    and detail.get("previous_state") == "running"
+                ):
+                    raise TaskAlreadyRunningError(f"Task {id} is already 
running") from e
+            raise

Review Comment:
   PR description mentions that the Celery executor catches the "already 
running" case and raises `Ignore()` to prevent state reporting, but there are 
no corresponding changes in the celery provider/executor code (and no repo 
references to `Ignore`/`TaskAlreadyRunningError` outside task-sdk). Either 
update the PR description to match the actual scope, or add the missing 
executor-side handling so the reported bug is actually fixed end-to-end.



##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -216,7 +216,18 @@ def start(self, id: uuid.UUID, pid: int, when: datetime) 
-> TIRunContext:
         """Tell the API server that this TI has started running."""
         body = TIEnterRunningPayload(pid=pid, hostname=get_hostname(), 
unixname=getuser(), start_date=when)
 
-        resp = self.client.patch(f"task-instances/{id}/run", 
content=body.model_dump_json())
+        try:
+            resp = self.client.patch(f"task-instances/{id}/run", 
content=body.model_dump_json())
+        except ServerResponseError as e:
+            if e.response.status_code == HTTPStatus.CONFLICT:
+                detail = e.detail
+                if (
+                    isinstance(detail, dict)
+                    and detail.get("reason") == "invalid_state"
+                    and detail.get("previous_state") == "running"
+                ):
+                    raise TaskAlreadyRunningError(f"Task {id} is already 
running") from e

Review Comment:
   The new `TaskAlreadyRunningError` message says "Task {id} is already 
running", but `id` here is a task *instance* id. Consider adjusting the wording 
to avoid confusion with a DAG task id (e.g. "Task instance {id}...").
   ```suggestion
                       raise TaskAlreadyRunningError(f"Task instance {id} is 
already running") from e
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to