This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1e5d79945ad Don't crash supervisor IPC loop on transient network
errors (#66572)
1e5d79945ad is described below
commit 1e5d79945ad5df1fca8f6d06c8f2cde8124a981a
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue May 19 13:21:18 2026 +0200
Don't crash supervisor IPC loop on transient network errors (#66572)
* Don't crash supervisor IPC loop on transient network errors
handle_requests in the supervisor only caught ServerResponseError. Any
non-HTTP exception (httpx.ConnectError, httpx.TimeoutException, socket
timeouts, etc.) would propagate, terminate the generator, and
permanently break the supervisor-to-task IPC channel. The task
subprocess would then get EOFError on every subsequent send, and the
worker would be stuck waiting for replies that never come.
Add a catch-all except Exception after the ServerResponseError handler
that logs the unhandled exception with type info, sends a best-effort
ErrorResponse(API_SERVER_ERROR, ...) back to the task so the failure
surfaces in task logs (wrapped in suppress(Exception) because if we
can't reach the task subprocess via stdin we shouldn't double-fault),
and lets the request loop continue to the next request.
Test added: a fake httpx.ConnectError on the first call produces an
ErrorResponse, the generator stays alive, and a second request is
processed normally (the loop is not dead).
Reported by the L3 ASVS sweep at apache/tooling-agents#24 (FINDING-005).
* Address review comments: shorten comment and use exc_info
- Shorten the catch-all comment per amoghrajesh's suggestion.
- Use exc_info=e in log.exception instead of exception_type field
per jason810496's suggestion (exception type is redundant since
the exception itself is logged with full type info and traceback).
---
.../src/airflow/sdk/execution_time/supervisor.py | 22 ++++++++++++
.../task_sdk/execution_time/test_supervisor.py | 40 ++++++++++++++++++++++
2 files changed, 62 insertions(+)
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 27dcbb44346..7c9ecbeab9e 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -793,6 +793,28 @@ class WatchedSubprocess:
),
request_id=request.id,
)
+ except Exception as e:
+ # Generic exception handling so a transient network error
(httpx.ConnectError /
+ # httpx.TimeoutException) or any other exception
+ # doesn't crash this generator and crash the IPC communication
between supervisor and task.
+ log.exception(
+ "Unhandled exception while handling task request",
+ request_id=request.id,
+ exc_info=e,
+ )
+ with suppress(Exception):
+ self.send_msg(
+ msg=None,
+ error=ErrorResponse(
+ error=ErrorType.API_SERVER_ERROR,
+ detail={
+ "status_code": None,
+ "message": str(e),
+ "exception_type": type(e).__name__,
+ },
+ ),
+ request_id=request.id,
+ )
finally:
if token is not None:
otel_context.detach(token)
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 881f5ddaad1..0b3cd64a21e 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -3039,6 +3039,46 @@ class TestHandleRequest:
"detail": error.response.json(),
}
+ def test_handle_requests_network_exception_does_not_crash_loop(self,
watched_subprocess, mocker):
+ """A transient network error must not crash the IPC generator.
+
+ Without the catch-all in handle_requests, an httpx.ConnectError would
+ propagate, the generator would terminate, the task subprocess would
+ get EOFError on every subsequent send, and the worker would be stuck.
+ Verify that the error is reported back to the task as an
+ API_SERVER_ERROR ErrorResponse and that the loop stays alive for the
+ next request.
+ """
+ watched_subprocess, read_socket = watched_subprocess
+
+ # First request raises a network exception, second succeeds.
+ first_call = httpx.ConnectError("connection refused")
+ watched_subprocess.client.task_instances.succeed =
mocker.Mock(side_effect=[first_call, None])
+
+ generator = watched_subprocess.handle_requests(log=mocker.Mock())
+ next(generator)
+
+ # First request — should produce an ErrorResponse, not crash the
generator.
+ msg1 = SucceedTask(end_date=timezone.parse("2024-10-31T12:00:00Z"))
+ req1 = _RequestFrame(id=randint(1, 2**32 - 1), body=msg1.model_dump())
+ generator.send(req1)
+
+ read_socket.settimeout(0.5)
+ frame_len = int.from_bytes(read_socket.recv(4), "big")
+ bytes_ = read_socket.recv(frame_len)
+ frame = msgspec.msgpack.Decoder(_ResponseFrame).decode(bytes_)
+
+ assert frame.id == req1.id
+ assert frame.error is not None
+ assert frame.error["error"] == "API_SERVER_ERROR"
+ assert frame.error["detail"]["exception_type"] == "ConnectError"
+
+ # Second request — generator must still be alive and process it
normally.
+ msg2 = SucceedTask(end_date=timezone.parse("2024-10-31T12:01:00Z"))
+ req2 = _RequestFrame(id=randint(1, 2**32 - 1), body=msg2.model_dump())
+ # Should not raise StopIteration (which would mean the loop crashed).
+ generator.send(req2)
+
class TestSetSupervisorComms:
class DummyComms: