anishgirianish commented on code in PR #60855:
URL: https://github.com/apache/airflow/pull/60855#discussion_r2985568419
##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -216,7 +216,18 @@ def start(self, id: uuid.UUID, pid: int, when: datetime)
-> TIRunContext:
"""Tell the API server that this TI has started running."""
body = TIEnterRunningPayload(pid=pid, hostname=get_hostname(),
unixname=getuser(), start_date=when)
- resp = self.client.patch(f"task-instances/{id}/run",
content=body.model_dump_json())
+ try:
+ resp = self.client.patch(f"task-instances/{id}/run",
content=body.model_dump_json())
+ except ServerResponseError as e:
+ if e.response.status_code == HTTPStatus.CONFLICT:
+ detail = e.detail
+ if (
+ isinstance(detail, dict)
+ and detail.get("reason") == "invalid_state"
+ and detail.get("previous_state") == "running"
+ ):
+ raise TaskAlreadyRunningError(f"Task {id} is already
running") from e
Review Comment:
updated
##########
task-sdk/tests/task_sdk/api/test_client.py:
##########
@@ -333,6 +333,55 @@ def handle_request(request: httpx.Request) ->
httpx.Response:
assert resp == ti_context
assert call_count == 3
+ def test_task_instance_start_already_running(self):
+ """Test that start() raises TaskAlreadyRunningError when TI is already
running."""
+ from airflow.sdk.exceptions import TaskAlreadyRunningError
Review Comment:
updated thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]