rahulsmahadev commented on code in PR #3418:
URL: https://github.com/apache/iceberg-python/pull/3418#discussion_r3399417048
##########
mkdocs/docs/configuration.md:
##########
@@ -348,6 +348,32 @@ catalog:
| snapshot-loading-mode | refs | The snapshots to
return in the body of the metadata. Setting the value to `all` would return the
full set of snapshots currently valid for the table. Setting the value to
`refs` would load all snapshots referenced by branches or tags. |
| `header.X-Iceberg-Access-Delegation` | `vended-credentials` | Signal to the
server that the client supports delegated access via a comma-separated list of
access mechanisms. The server may choose to supply access via any or none of
the requested mechanisms. When using `vended-credentials`, the server provides
temporary credentials to the client. When using `remote-signing`, the server
signs requests on behalf of the client. (default: `vended-credentials`) |
+#### Retry and timeout
+
+The REST Catalog uses `requests` with no retries and no timeout by default, so
transient
+5xx / network failures bubble up immediately and slow servers can hang the
client indefinitely.
+Set a `connection:` block on the catalog to opt in to a per-request timeout
and a retry policy.
+Every key is optional; when none are set, the default `requests` behavior is
preserved.
Review Comment:
Dropped that sentence in 47a5382.
##########
pyiceberg/catalog/rest/__init__.py:
##########
@@ -396,6 +403,89 @@ class ListViewsResponse(IcebergBaseModel):
_PLANNING_RESPONSE_ADAPTER = TypeAdapter(PlanningResponse)
+class _RetryTimeoutHTTPAdapter(HTTPAdapter):
+ """HTTPAdapter that applies a default per-request timeout.
+
+ requests does not provide a way to set a default timeout on a Session;
+ without this adapter, every call would have to thread `timeout=` through.
+ The adapter applies `self._timeout` whenever a per-call timeout is not set.
+ """
+
+ def __init__(self, timeout: float | None = None, max_retries: Retry | int
| None = None) -> None:
+ self._timeout = timeout
+ if max_retries is not None:
+ super().__init__(max_retries=max_retries)
+ else:
+ super().__init__()
+
+ def send(
+ self,
+ request: PreparedRequest,
+ stream: bool = False,
+ timeout: None | float | tuple[float, float] | tuple[float, None] =
None,
+ verify: bool | str = True,
+ cert: None | bytes | str | tuple[bytes | str, bytes | str] = None,
+ proxies: Mapping[str, str] | None = None,
+ ) -> Response:
+ if timeout is None:
+ timeout = self._timeout
+ return super().send(request, stream=stream, timeout=timeout,
verify=verify, cert=cert, proxies=proxies)
+
+
+def _create_connection_adapter(properties: Properties) ->
_RetryTimeoutHTTPAdapter | None:
+ """Build a connection adapter from the optional `connection.*` properties.
+
+ Returns None when no `connection` block is supplied, leaving the default
+ Session behavior unchanged. Raises ValueError on invalid input.
+ """
+ connection_config = properties.get(CONNECTION)
+ if not connection_config:
+ return None
+ if not isinstance(connection_config, dict):
+ raise ValueError(f"`{CONNECTION}` must be a mapping, got:
{type(connection_config).__name__}")
+
+ timeout: float | None = None
+ if (raw_timeout := connection_config.get(CONNECTION_TIMEOUT)) is not None:
+ try:
+ timeout = float(raw_timeout)
+ except (TypeError, ValueError) as e:
+ raise ValueError(f"`{CONNECTION}.{CONNECTION_TIMEOUT}` must be a
number, got: {raw_timeout!r}") from e
+ if timeout <= 0:
+ raise ValueError(f"`{CONNECTION}.{CONNECTION_TIMEOUT}` must be a
positive number, got: {timeout}")
+
+ retries: int | None = None
+ if (raw_retries := connection_config.get(CONNECTION_RETRIES)) is not None:
+ try:
+ retries = int(raw_retries)
+ except (TypeError, ValueError) as e:
+ raise ValueError(f"`{CONNECTION}.{CONNECTION_RETRIES}` must be an
integer, got: {raw_retries!r}") from e
+ if retries < 0:
+ raise ValueError(f"`{CONNECTION}.{CONNECTION_RETRIES}` must be
non-negative, got: {retries}")
+
+ backoff_factor: float | None = None
+ if (raw_backoff := connection_config.get(CONNECTION_BACKOFF_FACTOR)) is
not None:
+ try:
+ backoff_factor = float(raw_backoff)
+ except (TypeError, ValueError) as e:
+ raise ValueError(f"`{CONNECTION}.{CONNECTION_BACKOFF_FACTOR}` must
be a number, got: {raw_backoff!r}") from e
+ if backoff_factor < 0:
+ raise ValueError(f"`{CONNECTION}.{CONNECTION_BACKOFF_FACTOR}` must
be non-negative, got: {backoff_factor}")
+
+ max_retries: Retry | None = None
+ if retries is not None or backoff_factor is not None:
+ max_retries = Retry(
+ total=retries if retries is not None else 0,
Review Comment:
Good call — initialized `retries` and `backoff_factor` to 0 in 47a5382, so
the inline conditional defaults are gone. A `Retry(total=0, backoff_factor=0)`
is a no-op, functionally equivalent to no Retry at all.
##########
tests/catalog/test_rest.py:
##########
@@ -2287,6 +2024,131 @@ def test_request_session_with_ssl_client_cert() -> None:
assert "Could not find the TLS certificate file, invalid path:
path_to_client_cert" in str(e.value)
+def test_session_without_connection_config_uses_default_adapter(rest_mock:
Mocker) -> None:
+ catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
+ for adapter in catalog._session.adapters.values():
+ assert not isinstance(adapter, _RetryTimeoutHTTPAdapter)
+
+
+def test_session_with_connection_timeout_and_retries(rest_mock: Mocker) ->
None:
+ catalog_properties = {
+ "uri": TEST_URI,
+ "token": TEST_TOKEN,
+ CONNECTION: {
+ CONNECTION_TIMEOUT: 60,
+ CONNECTION_RETRIES: 5,
+ CONNECTION_BACKOFF_FACTOR: 1.0,
+ },
+ }
+ catalog = RestCatalog("rest", **catalog_properties) # type: ignore
+
+ https_adapter = catalog._session.adapters["https://"]
+ http_adapter = catalog._session.adapters["http://"]
+ assert isinstance(https_adapter, _RetryTimeoutHTTPAdapter)
+ assert https_adapter is http_adapter
+ assert https_adapter._timeout == 60.0
+ assert https_adapter.max_retries.total == 5
+ assert https_adapter.max_retries.backoff_factor == 1.0
+ # Internal retry policy: transient codes and idempotent methods only.
+ assert https_adapter.max_retries.status_forcelist == [429, 500, 502, 503,
504]
+ allowed_methods = https_adapter.max_retries.allowed_methods or frozenset()
+ assert set(allowed_methods) == {"GET", "HEAD", "OPTIONS"}
+
+
+def test_session_with_connection_timeout_only(rest_mock: Mocker) -> None:
+ catalog_properties = {
+ "uri": TEST_URI,
+ "token": TEST_TOKEN,
+ CONNECTION: {CONNECTION_TIMEOUT: "30"},
+ }
+ catalog = RestCatalog("rest", **catalog_properties) # type: ignore
+ adapter = catalog._session.adapters["https://"]
+ assert isinstance(adapter, _RetryTimeoutHTTPAdapter)
+ assert adapter._timeout == 30.0
+ # No retry options set, so no Retry object is configured.
+ assert adapter.max_retries.total == 0
+
+
+def test_session_retries_on_transient_5xx_then_succeeds() -> None:
+ """Three real 503 responses followed by a 200; the catalog should make all
four attempts.
+
+ `requests_mock` would replace our HTTPAdapter, bypassing the retry logic
we want to exercise,
+ so this test stands up an actual `http.server` on a loopback port instead.
+ """
+ import json
+ import threading
+ from http.server import BaseHTTPRequestHandler, HTTPServer
+
+ state = {"namespace_calls": 0}
+ config_body = json.dumps(
+ {"defaults": {}, "overrides": {}, "endpoints": [str(endpoint) for
endpoint in TEST_SUPPORTED_ENDPOINTS]}
+ ).encode()
+
+ class _Handler(BaseHTTPRequestHandler):
Review Comment:
Done in 47a5382 — extracted the handler + threading setup into a
`_local_rest_server_503_then_200(num_failures)` context manager. The test body
is now just the catalog construction, `list_namespaces()`, and the two
assertions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]