This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch release/v1.2
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/release/v1.2 by this push:
new 50051ad11e fix(amber): add timeout and retry to dataset file-service
requests (#5667)
50051ad11e is described below
commit 50051ad11e30f12dfb31db548c97bb1919bf90ed
Author: Matthew B. <[email protected]>
AuthorDate: Sun Jun 21 18:23:16 2026 +0000
fix(amber): add timeout and retry to dataset file-service requests (#5667)
### What changes were proposed in this PR?
- Route `DatasetFileDocument`'s presigned-URL fetch and file download
through a `requests.Session` configured with a `(5s connect, 10s read)`
timeout, so a hung or unreachable file-service fails in bounded time
instead of blocking the worker thread forever. The read timeout bounds
inactivity *between bytes*, not the total download time, so large
dataset files that stream steadily are unaffected; it only trips when
the connection stalls for 10s with no data.
- Mount a `urllib3` `Retry` policy on the session (3 retries,
exponential backoff, retrying on connection errors and 5xx). Both calls
are idempotent GETs, so the retry set is restricted to `GET`.
- Translate network failures (connect/read timeouts and connection
errors, including those surfaced after retries are exhausted) into
`RuntimeError`, consistent with the module's existing failure handling,
so callers get a uniform error contract instead of a raw
`requests`/`urllib3` exception.
### Any related issues, documentation, discussions?
Closes: #5666
### How was this PR tested?
- Added `pytest` coverage in `test_dataset_file_document.py` (26 tests):
- asserts the `(connect, read)` timeout tuple is passed on both the
presigned-URL request and the file download;
- asserts the retry adapter is mounted for `http://` and `https://` with
the expected policy (`total=3`, `connect=3`, `read=3`,
`backoff_factor=0.5`, `status_forcelist={500,502,503,504}`, GET-only);
- asserts a `ReadTimeout` / `ConnectionError` is wrapped in
`RuntimeError` on both code paths.
- `ruff check` and `ruff format --check` pass on the modified files.
### Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF
(backported from commit 86f865c7820e418d7a641957b3ab4b0ed6edfdb7)
---
amber/LICENSE-binary-python | 2 +-
amber/requirements.txt | 2 +
.../pytexera/storage/dataset_file_document.py | 50 ++++++++-
.../pytexera/storage/test_dataset_file_document.py | 123 ++++++++++++++++++---
4 files changed, 160 insertions(+), 17 deletions(-)
diff --git a/amber/LICENSE-binary-python b/amber/LICENSE-binary-python
index c4f8d10d47..fc1514110c 100644
--- a/amber/LICENSE-binary-python
+++ b/amber/LICENSE-binary-python
@@ -228,7 +228,7 @@ Python packages:
- pympler==1.1
- python-dateutil==2.8.2
- regex==2026.5.9
- - requests==2.34.2
+ - requests==2.34.0
- s3transfer==0.14.0
- safetensors==0.8.0
- tenacity==8.5.0
diff --git a/amber/requirements.txt b/amber/requirements.txt
index 726310934d..b7c7c08a62 100644
--- a/amber/requirements.txt
+++ b/amber/requirements.txt
@@ -48,3 +48,5 @@ SQLAlchemy==2.0.37
pg8000==1.31.5
pympler==1.1
boto3==1.40.53
+requests==2.34.0
+urllib3==2.7.0
diff --git a/amber/src/main/python/pytexera/storage/dataset_file_document.py
b/amber/src/main/python/pytexera/storage/dataset_file_document.py
index 31f95d3fc7..5a063f6047 100644
--- a/amber/src/main/python/pytexera/storage/dataset_file_document.py
+++ b/amber/src/main/python/pytexera/storage/dataset_file_document.py
@@ -19,9 +19,38 @@ import io
import os
import requests
import urllib.parse
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
class DatasetFileDocument:
+ # (connect, read) timeout and retry settings for the file-service GETs
below.
+ # Read timeout bounds inactivity between bytes, not total download time.
+ _CONNECT_TIMEOUT_SECONDS = 5
+ _READ_TIMEOUT_SECONDS = 10
+ _REQUEST_TIMEOUT = (_CONNECT_TIMEOUT_SECONDS, _READ_TIMEOUT_SECONDS)
+ _MAX_RETRIES = 3
+ _RETRY_BACKOFF_FACTOR = 0.5
+ _RETRY_STATUS_FORCELIST = (500, 502, 503, 504)
+
+ @classmethod
+ def _retry_session(cls) -> requests.Session:
+ """Returns a Session that retries GETs on connection errors and 5xx."""
+ retry = Retry(
+ total=cls._MAX_RETRIES,
+ connect=cls._MAX_RETRIES,
+ read=cls._MAX_RETRIES,
+ backoff_factor=cls._RETRY_BACKOFF_FACTOR,
+ status_forcelist=cls._RETRY_STATUS_FORCELIST,
+ allowed_methods=frozenset({"GET"}),
+ raise_on_status=False,
+ )
+ adapter = HTTPAdapter(max_retries=retry)
+ session = requests.Session()
+ session.mount("http://", adapter)
+ session.mount("https://", adapter)
+ return session
+
def __init__(self, file_path: str):
"""
Parses the file path into dataset metadata.
@@ -69,7 +98,18 @@ class DatasetFileDocument:
params = {"filePath": encoded_file_path}
- response = requests.get(self.presign_endpoint, headers=headers,
params=params)
+ try:
+ with self._retry_session() as session:
+ response = session.get(
+ self.presign_endpoint,
+ headers=headers,
+ params=params,
+ timeout=self._REQUEST_TIMEOUT,
+ )
+ except requests.exceptions.RequestException as e:
+ raise RuntimeError(
+ f"Failed to get presigned URL: request failed: {e}"
+ ) from e
if response.status_code != 200:
raise RuntimeError(
@@ -100,7 +140,13 @@ class DatasetFileDocument:
:raises: RuntimeError if the retrieval fails.
"""
presigned_url = self.get_presigned_url()
- response = requests.get(presigned_url)
+ try:
+ with self._retry_session() as session:
+ response = session.get(presigned_url,
timeout=self._REQUEST_TIMEOUT)
+ except requests.exceptions.RequestException as e:
+ raise RuntimeError(
+ f"Failed to retrieve file content: request failed: {e}"
+ ) from e
if response.status_code != 200:
raise RuntimeError(
diff --git
a/amber/src/test/python/pytexera/storage/test_dataset_file_document.py
b/amber/src/test/python/pytexera/storage/test_dataset_file_document.py
index 680f512072..36882fe27e 100644
--- a/amber/src/test/python/pytexera/storage/test_dataset_file_document.py
+++ b/amber/src/test/python/pytexera/storage/test_dataset_file_document.py
@@ -18,11 +18,11 @@
import io
import pytest
+import requests
from unittest.mock import patch, MagicMock
from pytexera.storage.dataset_file_document import DatasetFileDocument
-
DEFAULT_ENDPOINT = "http://localhost:9092/api/dataset/presign-download"
CUSTOM_ENDPOINT = "https://example.test/api/presign"
@@ -95,7 +95,9 @@ class TestGetPresignedUrl:
def test_returns_presigned_url_field_from_json_body(self, monkeypatch):
doc = self._make_doc(monkeypatch)
- with patch("pytexera.storage.dataset_file_document.requests.get") as
mock_get:
+ with patch(
+ "pytexera.storage.dataset_file_document.requests.Session.get"
+ ) as mock_get:
mock_get.return_value = make_response(
200, body={"presignedUrl": "https://signed.test/x"}
)
@@ -103,7 +105,9 @@ class TestGetPresignedUrl:
def test_sends_bearer_authorization_header_with_jwt(self, monkeypatch):
doc = self._make_doc(monkeypatch)
- with patch("pytexera.storage.dataset_file_document.requests.get") as
mock_get:
+ with patch(
+ "pytexera.storage.dataset_file_document.requests.Session.get"
+ ) as mock_get:
mock_get.return_value = make_response(200, body={"presignedUrl":
"u"})
doc.get_presigned_url()
_, kwargs = mock_get.call_args
@@ -113,7 +117,9 @@ class TestGetPresignedUrl:
# urllib.parse.quote keeps "/" as safe by default, but encodes "@"
# and " " — pin both pieces so the contract is explicit.
doc = self._make_doc(monkeypatch, path="/[email protected]/ds/v1/data
file.csv")
- with patch("pytexera.storage.dataset_file_document.requests.get") as
mock_get:
+ with patch(
+ "pytexera.storage.dataset_file_document.requests.Session.get"
+ ) as mock_get:
mock_get.return_value = make_response(200, body={"presignedUrl":
"u"})
doc.get_presigned_url()
_, kwargs = mock_get.call_args
@@ -124,7 +130,9 @@ class TestGetPresignedUrl:
def test_calls_configured_endpoint(self, monkeypatch):
doc = self._make_doc(monkeypatch)
- with patch("pytexera.storage.dataset_file_document.requests.get") as
mock_get:
+ with patch(
+ "pytexera.storage.dataset_file_document.requests.Session.get"
+ ) as mock_get:
mock_get.return_value = make_response(200, body={"presignedUrl":
"u"})
doc.get_presigned_url()
args, _ = mock_get.call_args
@@ -132,21 +140,27 @@ class TestGetPresignedUrl:
def test_raises_runtime_error_with_status_and_body_on_failure(self,
monkeypatch):
doc = self._make_doc(monkeypatch)
- with patch("pytexera.storage.dataset_file_document.requests.get") as
mock_get:
+ with patch(
+ "pytexera.storage.dataset_file_document.requests.Session.get"
+ ) as mock_get:
mock_get.return_value = make_response(403, body="forbidden")
with pytest.raises(RuntimeError, match=r"403.*forbidden"):
doc.get_presigned_url()
def test_raises_when_response_body_lacks_presigned_url_key(self,
monkeypatch):
doc = self._make_doc(monkeypatch)
- with patch("pytexera.storage.dataset_file_document.requests.get") as
mock_get:
+ with patch(
+ "pytexera.storage.dataset_file_document.requests.Session.get"
+ ) as mock_get:
mock_get.return_value = make_response(200, body={"other": "value"})
with pytest.raises(RuntimeError, match="'presignedUrl' missing"):
doc.get_presigned_url()
def test_raises_when_response_body_is_not_valid_json(self, monkeypatch):
doc = self._make_doc(monkeypatch)
- with patch("pytexera.storage.dataset_file_document.requests.get") as
mock_get:
+ with patch(
+ "pytexera.storage.dataset_file_document.requests.Session.get"
+ ) as mock_get:
response = MagicMock()
response.status_code = 200
response.json.side_effect = ValueError("Expecting value")
@@ -157,14 +171,18 @@ class TestGetPresignedUrl:
def test_raises_when_presigned_url_is_empty_string(self, monkeypatch):
doc = self._make_doc(monkeypatch)
- with patch("pytexera.storage.dataset_file_document.requests.get") as
mock_get:
+ with patch(
+ "pytexera.storage.dataset_file_document.requests.Session.get"
+ ) as mock_get:
mock_get.return_value = make_response(200, body={"presignedUrl":
""})
with pytest.raises(RuntimeError, match="'presignedUrl' missing"):
doc.get_presigned_url()
def test_raises_when_presigned_url_is_not_a_string(self, monkeypatch):
doc = self._make_doc(monkeypatch)
- with patch("pytexera.storage.dataset_file_document.requests.get") as
mock_get:
+ with patch(
+ "pytexera.storage.dataset_file_document.requests.Session.get"
+ ) as mock_get:
mock_get.return_value = make_response(200, body={"presignedUrl":
None})
with pytest.raises(RuntimeError, match="'presignedUrl' missing"):
doc.get_presigned_url()
@@ -178,7 +196,9 @@ class TestReadFile:
def test_returns_bytesio_with_downloaded_content(self, monkeypatch):
doc = self._make_doc(monkeypatch)
- with patch("pytexera.storage.dataset_file_document.requests.get") as
mock_get:
+ with patch(
+ "pytexera.storage.dataset_file_document.requests.Session.get"
+ ) as mock_get:
mock_get.side_effect = [
make_response(200, body={"presignedUrl":
"https://signed.test/x"}),
make_response(200, content=b"hello-bytes"),
@@ -189,14 +209,18 @@ class TestReadFile:
def test_propagates_presigned_url_failure(self, monkeypatch):
doc = self._make_doc(monkeypatch)
- with patch("pytexera.storage.dataset_file_document.requests.get") as
mock_get:
+ with patch(
+ "pytexera.storage.dataset_file_document.requests.Session.get"
+ ) as mock_get:
mock_get.return_value = make_response(500, body="upstream down")
with pytest.raises(RuntimeError, match=r"500.*upstream down"):
doc.read_file()
def test_raises_runtime_error_when_download_fails(self, monkeypatch):
doc = self._make_doc(monkeypatch)
- with patch("pytexera.storage.dataset_file_document.requests.get") as
mock_get:
+ with patch(
+ "pytexera.storage.dataset_file_document.requests.Session.get"
+ ) as mock_get:
mock_get.side_effect = [
make_response(200, body={"presignedUrl":
"https://signed.test/x"}),
make_response(404, body="missing"),
@@ -206,7 +230,9 @@ class TestReadFile:
def test_downloads_from_presigned_url_returned_by_first_call(self,
monkeypatch):
doc = self._make_doc(monkeypatch)
- with patch("pytexera.storage.dataset_file_document.requests.get") as
mock_get:
+ with patch(
+ "pytexera.storage.dataset_file_document.requests.Session.get"
+ ) as mock_get:
mock_get.side_effect = [
make_response(200, body={"presignedUrl":
"https://signed.test/x"}),
make_response(200, content=b""),
@@ -214,3 +240,72 @@ class TestReadFile:
doc.read_file()
second_call_args, _ = mock_get.call_args_list[1]
assert second_call_args[0] == "https://signed.test/x"
+
+
+class TestTimeoutsAndRetries:
+ def _make_doc(self, monkeypatch):
+ monkeypatch.setenv("USER_JWT_TOKEN", "test-jwt-token")
+ monkeypatch.setenv("FILE_SERVICE_GET_PRESIGNED_URL_ENDPOINT",
CUSTOM_ENDPOINT)
+ return DatasetFileDocument("/[email protected]/ds/v1/file.csv")
+
+ def test_presigned_url_request_passes_request_timeout(self, monkeypatch):
+ doc = self._make_doc(monkeypatch)
+ with patch(
+ "pytexera.storage.dataset_file_document.requests.Session.get"
+ ) as mock_get:
+ mock_get.return_value = make_response(200, body={"presignedUrl":
"u"})
+ doc.get_presigned_url()
+ _, kwargs = mock_get.call_args
+ assert kwargs["timeout"] == DatasetFileDocument._REQUEST_TIMEOUT
+
+ def test_download_request_passes_request_timeout(self, monkeypatch):
+ doc = self._make_doc(monkeypatch)
+ with patch(
+ "pytexera.storage.dataset_file_document.requests.Session.get"
+ ) as mock_get:
+ mock_get.side_effect = [
+ make_response(200, body={"presignedUrl":
"https://signed.test/x"}),
+ make_response(200, content=b"data"),
+ ]
+ doc.read_file()
+ _, download_kwargs = mock_get.call_args_list[1]
+ assert download_kwargs["timeout"] ==
DatasetFileDocument._REQUEST_TIMEOUT
+
+ def test_session_mounts_retry_adapter_for_http_and_https(self):
+ session = DatasetFileDocument._retry_session()
+ try:
+ for prefix in ("http://", "https://"):
+ retry = session.get_adapter(prefix).max_retries
+ assert retry.total == DatasetFileDocument._MAX_RETRIES
+ assert retry.connect == DatasetFileDocument._MAX_RETRIES
+ assert retry.read == DatasetFileDocument._MAX_RETRIES
+ assert set(retry.status_forcelist) == set(
+ DatasetFileDocument._RETRY_STATUS_FORCELIST
+ )
+ # Only idempotent GETs should be retried.
+ assert retry.allowed_methods == frozenset({"GET"})
+ finally:
+ session.close()
+
+ def test_presigned_url_request_timeout_is_wrapped_in_runtime_error(
+ self, monkeypatch
+ ):
+ doc = self._make_doc(monkeypatch)
+ with patch(
+ "pytexera.storage.dataset_file_document.requests.Session.get"
+ ) as mock_get:
+ mock_get.side_effect = requests.exceptions.ReadTimeout("timed out")
+ with pytest.raises(RuntimeError, match="request failed"):
+ doc.get_presigned_url()
+
+ def test_download_request_timeout_is_wrapped_in_runtime_error(self,
monkeypatch):
+ doc = self._make_doc(monkeypatch)
+ with patch(
+ "pytexera.storage.dataset_file_document.requests.Session.get"
+ ) as mock_get:
+ mock_get.side_effect = [
+ make_response(200, body={"presignedUrl":
"https://signed.test/x"}),
+ requests.exceptions.ConnectionError("connection reset"),
+ ]
+ with pytest.raises(RuntimeError, match="Failed to retrieve file
content"):
+ doc.read_file()