This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch release/v1.2
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/release/v1.2 by this push:
     new 50051ad11e fix(amber): add timeout and retry to dataset file-service 
requests (#5667)
50051ad11e is described below

commit 50051ad11e30f12dfb31db548c97bb1919bf90ed
Author: Matthew B. <[email protected]>
AuthorDate: Sun Jun 21 18:23:16 2026 +0000

    fix(amber): add timeout and retry to dataset file-service requests (#5667)
    
    ### What changes were proposed in this PR?
    - Route `DatasetFileDocument`'s presigned-URL fetch and file download
    through a `requests.Session` configured with a `(5s connect, 10s read)`
    timeout, so a hung or unreachable file-service fails in bounded time
    instead of blocking the worker thread forever. The read timeout bounds
    inactivity *between bytes*, not the total download time, so large
    dataset files that stream steadily are unaffected; it only trips when
    the connection stalls for 10s with no data.
    - Mount a `urllib3` `Retry` policy on the session (3 retries,
    exponential backoff, retrying on connection errors and 5xx). Both calls
    are idempotent GETs, so the retry set is restricted to `GET`.
    - Translate network failures (connect/read timeouts and connection
    errors, including those surfaced after retries are exhausted) into
    `RuntimeError`, consistent with the module's existing failure handling,
    so callers get a uniform error contract instead of a raw
    `requests`/`urllib3` exception.
    
    ### Any related issues, documentation, discussions?
    Closes: #5666
    
    ### How was this PR tested?
    - Added `pytest` coverage in `test_dataset_file_document.py` (26 tests):
    - asserts the `(connect, read)` timeout tuple is passed on both the
    presigned-URL request and the file download;
    - asserts the retry adapter is mounted for `http://` and `https://` with
    the expected policy (`total=3`, `connect=3`, `read=3`,
    `backoff_factor=0.5`, `status_forcelist={500,502,503,504}`, GET-only);
    - asserts a `ReadTimeout` / `ConnectionError` is wrapped in
    `RuntimeError` on both code paths.
    - `ruff check` and `ruff format --check` pass on the modified files.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    Co-authored with Claude Opus 4.8 in compliance with ASF
    
    (backported from commit 86f865c7820e418d7a641957b3ab4b0ed6edfdb7)
---
 amber/LICENSE-binary-python                        |   2 +-
 amber/requirements.txt                             |   2 +
 .../pytexera/storage/dataset_file_document.py      |  50 ++++++++-
 .../pytexera/storage/test_dataset_file_document.py | 123 ++++++++++++++++++---
 4 files changed, 160 insertions(+), 17 deletions(-)

diff --git a/amber/LICENSE-binary-python b/amber/LICENSE-binary-python
index c4f8d10d47..fc1514110c 100644
--- a/amber/LICENSE-binary-python
+++ b/amber/LICENSE-binary-python
@@ -228,7 +228,7 @@ Python packages:
   - pympler==1.1
   - python-dateutil==2.8.2
   - regex==2026.5.9
-  - requests==2.34.2
+  - requests==2.34.0
   - s3transfer==0.14.0
   - safetensors==0.8.0
   - tenacity==8.5.0
diff --git a/amber/requirements.txt b/amber/requirements.txt
index 726310934d..b7c7c08a62 100644
--- a/amber/requirements.txt
+++ b/amber/requirements.txt
@@ -48,3 +48,5 @@ SQLAlchemy==2.0.37
 pg8000==1.31.5
 pympler==1.1
 boto3==1.40.53
+requests==2.34.0
+urllib3==2.7.0
diff --git a/amber/src/main/python/pytexera/storage/dataset_file_document.py 
b/amber/src/main/python/pytexera/storage/dataset_file_document.py
index 31f95d3fc7..5a063f6047 100644
--- a/amber/src/main/python/pytexera/storage/dataset_file_document.py
+++ b/amber/src/main/python/pytexera/storage/dataset_file_document.py
@@ -19,9 +19,38 @@ import io
 import os
 import requests
 import urllib.parse
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
 
 
 class DatasetFileDocument:
+    # (connect, read) timeout and retry settings for the file-service GETs 
below.
+    # Read timeout bounds inactivity between bytes, not total download time.
+    _CONNECT_TIMEOUT_SECONDS = 5
+    _READ_TIMEOUT_SECONDS = 10
+    _REQUEST_TIMEOUT = (_CONNECT_TIMEOUT_SECONDS, _READ_TIMEOUT_SECONDS)
+    _MAX_RETRIES = 3
+    _RETRY_BACKOFF_FACTOR = 0.5
+    _RETRY_STATUS_FORCELIST = (500, 502, 503, 504)
+
+    @classmethod
+    def _retry_session(cls) -> requests.Session:
+        """Returns a Session that retries GETs on connection errors and 5xx."""
+        retry = Retry(
+            total=cls._MAX_RETRIES,
+            connect=cls._MAX_RETRIES,
+            read=cls._MAX_RETRIES,
+            backoff_factor=cls._RETRY_BACKOFF_FACTOR,
+            status_forcelist=cls._RETRY_STATUS_FORCELIST,
+            allowed_methods=frozenset({"GET"}),
+            raise_on_status=False,
+        )
+        adapter = HTTPAdapter(max_retries=retry)
+        session = requests.Session()
+        session.mount("http://";, adapter)
+        session.mount("https://";, adapter)
+        return session
+
     def __init__(self, file_path: str):
         """
         Parses the file path into dataset metadata.
@@ -69,7 +98,18 @@ class DatasetFileDocument:
 
         params = {"filePath": encoded_file_path}
 
-        response = requests.get(self.presign_endpoint, headers=headers, 
params=params)
+        try:
+            with self._retry_session() as session:
+                response = session.get(
+                    self.presign_endpoint,
+                    headers=headers,
+                    params=params,
+                    timeout=self._REQUEST_TIMEOUT,
+                )
+        except requests.exceptions.RequestException as e:
+            raise RuntimeError(
+                f"Failed to get presigned URL: request failed: {e}"
+            ) from e
 
         if response.status_code != 200:
             raise RuntimeError(
@@ -100,7 +140,13 @@ class DatasetFileDocument:
         :raises: RuntimeError if the retrieval fails.
         """
         presigned_url = self.get_presigned_url()
-        response = requests.get(presigned_url)
+        try:
+            with self._retry_session() as session:
+                response = session.get(presigned_url, 
timeout=self._REQUEST_TIMEOUT)
+        except requests.exceptions.RequestException as e:
+            raise RuntimeError(
+                f"Failed to retrieve file content: request failed: {e}"
+            ) from e
 
         if response.status_code != 200:
             raise RuntimeError(
diff --git 
a/amber/src/test/python/pytexera/storage/test_dataset_file_document.py 
b/amber/src/test/python/pytexera/storage/test_dataset_file_document.py
index 680f512072..36882fe27e 100644
--- a/amber/src/test/python/pytexera/storage/test_dataset_file_document.py
+++ b/amber/src/test/python/pytexera/storage/test_dataset_file_document.py
@@ -18,11 +18,11 @@
 import io
 
 import pytest
+import requests
 from unittest.mock import patch, MagicMock
 
 from pytexera.storage.dataset_file_document import DatasetFileDocument
 
-
 DEFAULT_ENDPOINT = "http://localhost:9092/api/dataset/presign-download";
 CUSTOM_ENDPOINT = "https://example.test/api/presign";
 
@@ -95,7 +95,9 @@ class TestGetPresignedUrl:
 
     def test_returns_presigned_url_field_from_json_body(self, monkeypatch):
         doc = self._make_doc(monkeypatch)
-        with patch("pytexera.storage.dataset_file_document.requests.get") as 
mock_get:
+        with patch(
+            "pytexera.storage.dataset_file_document.requests.Session.get"
+        ) as mock_get:
             mock_get.return_value = make_response(
                 200, body={"presignedUrl": "https://signed.test/x"}
             )
@@ -103,7 +105,9 @@ class TestGetPresignedUrl:
 
     def test_sends_bearer_authorization_header_with_jwt(self, monkeypatch):
         doc = self._make_doc(monkeypatch)
-        with patch("pytexera.storage.dataset_file_document.requests.get") as 
mock_get:
+        with patch(
+            "pytexera.storage.dataset_file_document.requests.Session.get"
+        ) as mock_get:
             mock_get.return_value = make_response(200, body={"presignedUrl": 
"u"})
             doc.get_presigned_url()
             _, kwargs = mock_get.call_args
@@ -113,7 +117,9 @@ class TestGetPresignedUrl:
         # urllib.parse.quote keeps "/" as safe by default, but encodes "@"
         # and " " — pin both pieces so the contract is explicit.
         doc = self._make_doc(monkeypatch, path="/[email protected]/ds/v1/data 
file.csv")
-        with patch("pytexera.storage.dataset_file_document.requests.get") as 
mock_get:
+        with patch(
+            "pytexera.storage.dataset_file_document.requests.Session.get"
+        ) as mock_get:
             mock_get.return_value = make_response(200, body={"presignedUrl": 
"u"})
             doc.get_presigned_url()
             _, kwargs = mock_get.call_args
@@ -124,7 +130,9 @@ class TestGetPresignedUrl:
 
     def test_calls_configured_endpoint(self, monkeypatch):
         doc = self._make_doc(monkeypatch)
-        with patch("pytexera.storage.dataset_file_document.requests.get") as 
mock_get:
+        with patch(
+            "pytexera.storage.dataset_file_document.requests.Session.get"
+        ) as mock_get:
             mock_get.return_value = make_response(200, body={"presignedUrl": 
"u"})
             doc.get_presigned_url()
             args, _ = mock_get.call_args
@@ -132,21 +140,27 @@ class TestGetPresignedUrl:
 
     def test_raises_runtime_error_with_status_and_body_on_failure(self, 
monkeypatch):
         doc = self._make_doc(monkeypatch)
-        with patch("pytexera.storage.dataset_file_document.requests.get") as 
mock_get:
+        with patch(
+            "pytexera.storage.dataset_file_document.requests.Session.get"
+        ) as mock_get:
             mock_get.return_value = make_response(403, body="forbidden")
             with pytest.raises(RuntimeError, match=r"403.*forbidden"):
                 doc.get_presigned_url()
 
     def test_raises_when_response_body_lacks_presigned_url_key(self, 
monkeypatch):
         doc = self._make_doc(monkeypatch)
-        with patch("pytexera.storage.dataset_file_document.requests.get") as 
mock_get:
+        with patch(
+            "pytexera.storage.dataset_file_document.requests.Session.get"
+        ) as mock_get:
             mock_get.return_value = make_response(200, body={"other": "value"})
             with pytest.raises(RuntimeError, match="'presignedUrl' missing"):
                 doc.get_presigned_url()
 
     def test_raises_when_response_body_is_not_valid_json(self, monkeypatch):
         doc = self._make_doc(monkeypatch)
-        with patch("pytexera.storage.dataset_file_document.requests.get") as 
mock_get:
+        with patch(
+            "pytexera.storage.dataset_file_document.requests.Session.get"
+        ) as mock_get:
             response = MagicMock()
             response.status_code = 200
             response.json.side_effect = ValueError("Expecting value")
@@ -157,14 +171,18 @@ class TestGetPresignedUrl:
 
     def test_raises_when_presigned_url_is_empty_string(self, monkeypatch):
         doc = self._make_doc(monkeypatch)
-        with patch("pytexera.storage.dataset_file_document.requests.get") as 
mock_get:
+        with patch(
+            "pytexera.storage.dataset_file_document.requests.Session.get"
+        ) as mock_get:
             mock_get.return_value = make_response(200, body={"presignedUrl": 
""})
             with pytest.raises(RuntimeError, match="'presignedUrl' missing"):
                 doc.get_presigned_url()
 
     def test_raises_when_presigned_url_is_not_a_string(self, monkeypatch):
         doc = self._make_doc(monkeypatch)
-        with patch("pytexera.storage.dataset_file_document.requests.get") as 
mock_get:
+        with patch(
+            "pytexera.storage.dataset_file_document.requests.Session.get"
+        ) as mock_get:
             mock_get.return_value = make_response(200, body={"presignedUrl": 
None})
             with pytest.raises(RuntimeError, match="'presignedUrl' missing"):
                 doc.get_presigned_url()
@@ -178,7 +196,9 @@ class TestReadFile:
 
     def test_returns_bytesio_with_downloaded_content(self, monkeypatch):
         doc = self._make_doc(monkeypatch)
-        with patch("pytexera.storage.dataset_file_document.requests.get") as 
mock_get:
+        with patch(
+            "pytexera.storage.dataset_file_document.requests.Session.get"
+        ) as mock_get:
             mock_get.side_effect = [
                 make_response(200, body={"presignedUrl": 
"https://signed.test/x"}),
                 make_response(200, content=b"hello-bytes"),
@@ -189,14 +209,18 @@ class TestReadFile:
 
     def test_propagates_presigned_url_failure(self, monkeypatch):
         doc = self._make_doc(monkeypatch)
-        with patch("pytexera.storage.dataset_file_document.requests.get") as 
mock_get:
+        with patch(
+            "pytexera.storage.dataset_file_document.requests.Session.get"
+        ) as mock_get:
             mock_get.return_value = make_response(500, body="upstream down")
             with pytest.raises(RuntimeError, match=r"500.*upstream down"):
                 doc.read_file()
 
     def test_raises_runtime_error_when_download_fails(self, monkeypatch):
         doc = self._make_doc(monkeypatch)
-        with patch("pytexera.storage.dataset_file_document.requests.get") as 
mock_get:
+        with patch(
+            "pytexera.storage.dataset_file_document.requests.Session.get"
+        ) as mock_get:
             mock_get.side_effect = [
                 make_response(200, body={"presignedUrl": 
"https://signed.test/x"}),
                 make_response(404, body="missing"),
@@ -206,7 +230,9 @@ class TestReadFile:
 
     def test_downloads_from_presigned_url_returned_by_first_call(self, 
monkeypatch):
         doc = self._make_doc(monkeypatch)
-        with patch("pytexera.storage.dataset_file_document.requests.get") as 
mock_get:
+        with patch(
+            "pytexera.storage.dataset_file_document.requests.Session.get"
+        ) as mock_get:
             mock_get.side_effect = [
                 make_response(200, body={"presignedUrl": 
"https://signed.test/x"}),
                 make_response(200, content=b""),
@@ -214,3 +240,72 @@ class TestReadFile:
             doc.read_file()
             second_call_args, _ = mock_get.call_args_list[1]
             assert second_call_args[0] == "https://signed.test/x";
+
+
+class TestTimeoutsAndRetries:
+    def _make_doc(self, monkeypatch):
+        monkeypatch.setenv("USER_JWT_TOKEN", "test-jwt-token")
+        monkeypatch.setenv("FILE_SERVICE_GET_PRESIGNED_URL_ENDPOINT", 
CUSTOM_ENDPOINT)
+        return DatasetFileDocument("/[email protected]/ds/v1/file.csv")
+
+    def test_presigned_url_request_passes_request_timeout(self, monkeypatch):
+        doc = self._make_doc(monkeypatch)
+        with patch(
+            "pytexera.storage.dataset_file_document.requests.Session.get"
+        ) as mock_get:
+            mock_get.return_value = make_response(200, body={"presignedUrl": 
"u"})
+            doc.get_presigned_url()
+            _, kwargs = mock_get.call_args
+            assert kwargs["timeout"] == DatasetFileDocument._REQUEST_TIMEOUT
+
+    def test_download_request_passes_request_timeout(self, monkeypatch):
+        doc = self._make_doc(monkeypatch)
+        with patch(
+            "pytexera.storage.dataset_file_document.requests.Session.get"
+        ) as mock_get:
+            mock_get.side_effect = [
+                make_response(200, body={"presignedUrl": 
"https://signed.test/x"}),
+                make_response(200, content=b"data"),
+            ]
+            doc.read_file()
+            _, download_kwargs = mock_get.call_args_list[1]
+            assert download_kwargs["timeout"] == 
DatasetFileDocument._REQUEST_TIMEOUT
+
+    def test_session_mounts_retry_adapter_for_http_and_https(self):
+        session = DatasetFileDocument._retry_session()
+        try:
+            for prefix in ("http://";, "https://";):
+                retry = session.get_adapter(prefix).max_retries
+                assert retry.total == DatasetFileDocument._MAX_RETRIES
+                assert retry.connect == DatasetFileDocument._MAX_RETRIES
+                assert retry.read == DatasetFileDocument._MAX_RETRIES
+                assert set(retry.status_forcelist) == set(
+                    DatasetFileDocument._RETRY_STATUS_FORCELIST
+                )
+                # Only idempotent GETs should be retried.
+                assert retry.allowed_methods == frozenset({"GET"})
+        finally:
+            session.close()
+
+    def test_presigned_url_request_timeout_is_wrapped_in_runtime_error(
+        self, monkeypatch
+    ):
+        doc = self._make_doc(monkeypatch)
+        with patch(
+            "pytexera.storage.dataset_file_document.requests.Session.get"
+        ) as mock_get:
+            mock_get.side_effect = requests.exceptions.ReadTimeout("timed out")
+            with pytest.raises(RuntimeError, match="request failed"):
+                doc.get_presigned_url()
+
+    def test_download_request_timeout_is_wrapped_in_runtime_error(self, 
monkeypatch):
+        doc = self._make_doc(monkeypatch)
+        with patch(
+            "pytexera.storage.dataset_file_document.requests.Session.get"
+        ) as mock_get:
+            mock_get.side_effect = [
+                make_response(200, body={"presignedUrl": 
"https://signed.test/x"}),
+                requests.exceptions.ConnectionError("connection reset"),
+            ]
+            with pytest.raises(RuntimeError, match="Failed to retrieve file 
content"):
+                doc.read_file()

Reply via email to