Copilot commented on code in PR #60855:
URL: https://github.com/apache/airflow/pull/60855#discussion_r2981033364
##########
task-sdk/tests/task_sdk/execution_time/test_supervisor.py:
##########
@@ -731,40 +731,6 @@ def mock_monotonic():
"task_instance_id": str(ti.id),
} in captured_logs
- def test_supervisor_handles_already_running_task(self):
- """Test that Supervisor prevents starting a Task Instance that is
already running."""
- ti = TaskInstance(
- id=uuid7(), task_id="b", dag_id="c", run_id="d", try_number=1,
dag_version_id=uuid7()
- )
-
- # Mock API Server response indicating the TI is already running
- # The API Server would return a 409 Conflict status code if the TI is
not
- # in a "queued" state.
- def handle_request(request: httpx.Request) -> httpx.Response:
- if request.url.path == f"/task-instances/{ti.id}/run":
- return httpx.Response(
- 409,
- json={
- "reason": "invalid_state",
- "message": "TI was not in a state where it could be
marked as running",
- "previous_state": "running",
- },
- )
-
- return httpx.Response(status_code=204)
-
- client = make_client(transport=httpx.MockTransport(handle_request))
-
- with pytest.raises(ServerResponseError, match="Server returned error")
as err:
- ActivitySubprocess.start(dag_rel_path=os.devnull,
bundle_info=FAKE_BUNDLE, what=ti, client=client)
-
- assert err.value.response.status_code == 409
- assert err.value.detail == {
- "reason": "invalid_state",
- "message": "TI was not in a state where it could be marked as
running",
- "previous_state": "running",
- }
-
@pytest.mark.parametrize("captured_logs", [logging.ERROR], indirect=True,
ids=["log_level=error"])
def test_state_conflict_on_heartbeat(self, captured_logs, monkeypatch,
mocker, make_ti_context_dict):
Review Comment:
The supervisor-level test for handling a 409 conflict when starting an
already-running TI was removed, but there isn't a replacement assertion at the
supervisor/celery boundary. Since the PR intent is to make broker redelivery of
an already-running task not propagate as a failure, please add/update a test
that exercises the supervisor path (e.g.
`ActivitySubprocess.start`/`supervise`) with this scenario and asserts the
expected non-failure behavior.
##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -216,7 +216,18 @@ def start(self, id: uuid.UUID, pid: int, when: datetime)
-> TIRunContext:
"""Tell the API server that this TI has started running."""
body = TIEnterRunningPayload(pid=pid, hostname=get_hostname(),
unixname=getuser(), start_date=when)
- resp = self.client.patch(f"task-instances/{id}/run",
content=body.model_dump_json())
+ try:
+ resp = self.client.patch(f"task-instances/{id}/run",
content=body.model_dump_json())
+ except ServerResponseError as e:
+ if e.response.status_code == HTTPStatus.CONFLICT:
+ detail = e.detail
+ if (
+ isinstance(detail, dict)
+ and detail.get("reason") == "invalid_state"
+ and detail.get("previous_state") == "running"
+ ):
+ raise TaskAlreadyRunningError(f"Task {id} is already
running") from e
Review Comment:
`TaskInstanceOperations.start()` now raises `TaskAlreadyRunningError` on a
409/invalid_state(previous_state=running), but there are no callers in the repo
that catch/handle this exception (searching for `TaskAlreadyRunningError` only
finds this file and tests). In the Celery redelivery scenario this will still
surface as a task failure to Celery/Scheduler unless the supervisor / celery
worker path explicitly treats it as a non-error (e.g., exit cleanly without
reporting failure).
```suggestion
# In this case the task is already running. Treat this
as a benign race and
# attempt to return a TIRunContext instead of surfacing
a failure.
structlog.get_logger(__name__).info(
"Task already running when starting; treating as
no-op",
task_instance_id=str(id),
)
try:
return
TIRunContext.model_validate_json(e.response.read())
except Exception:
# If we can't interpret the response body, fall back
to raising the
# dedicated error to avoid masking unexpected API
responses.
raise TaskAlreadyRunningError(f"Task {id} is already
running") from e
```
##########
task-sdk/tests/task_sdk/api/test_client.py:
##########
@@ -333,6 +333,55 @@ def handle_request(request: httpx.Request) ->
httpx.Response:
assert resp == ti_context
assert call_count == 3
+ def test_task_instance_start_already_running(self):
+ """Test that start() raises TaskAlreadyRunningError when TI is already
running."""
+ from airflow.sdk.exceptions import TaskAlreadyRunningError
+
Review Comment:
This test introduces an import inside the test function body (`from
airflow.sdk.exceptions import TaskAlreadyRunningError`). The codebase generally
keeps imports at module scope; please move this import to the top of the file
unless there's a specific reason it must be local.
##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -216,7 +216,18 @@ def start(self, id: uuid.UUID, pid: int, when: datetime)
-> TIRunContext:
"""Tell the API server that this TI has started running."""
body = TIEnterRunningPayload(pid=pid, hostname=get_hostname(),
unixname=getuser(), start_date=when)
- resp = self.client.patch(f"task-instances/{id}/run",
content=body.model_dump_json())
+ try:
+ resp = self.client.patch(f"task-instances/{id}/run",
content=body.model_dump_json())
+ except ServerResponseError as e:
+ if e.response.status_code == HTTPStatus.CONFLICT:
+ detail = e.detail
+ if (
+ isinstance(detail, dict)
+ and detail.get("reason") == "invalid_state"
+ and detail.get("previous_state") == "running"
+ ):
+ raise TaskAlreadyRunningError(f"Task {id} is already
running") from e
+ raise
Review Comment:
PR description mentions that the Celery executor catches the "already
running" case and raises `Ignore()` to prevent state reporting, but there are
no corresponding changes in the celery provider/executor code (and no repo
references to `Ignore`/`TaskAlreadyRunningError` outside task-sdk). Either
update the PR description to match the actual scope, or add the missing
executor-side handling so the reported bug is actually fixed end-to-end.
##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -216,7 +216,18 @@ def start(self, id: uuid.UUID, pid: int, when: datetime)
-> TIRunContext:
"""Tell the API server that this TI has started running."""
body = TIEnterRunningPayload(pid=pid, hostname=get_hostname(),
unixname=getuser(), start_date=when)
- resp = self.client.patch(f"task-instances/{id}/run",
content=body.model_dump_json())
+ try:
+ resp = self.client.patch(f"task-instances/{id}/run",
content=body.model_dump_json())
+ except ServerResponseError as e:
+ if e.response.status_code == HTTPStatus.CONFLICT:
+ detail = e.detail
+ if (
+ isinstance(detail, dict)
+ and detail.get("reason") == "invalid_state"
+ and detail.get("previous_state") == "running"
+ ):
+ raise TaskAlreadyRunningError(f"Task {id} is already
running") from e
Review Comment:
The new `TaskAlreadyRunningError` message says "Task {id} is already
running", but `id` here is a task *instance* id. Consider adjusting the wording
to avoid confusion with a DAG task id (e.g. "Task instance {id}...").
```suggestion
raise TaskAlreadyRunningError(f"Task instance {id} is
already running") from e
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]