This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new ba656191 Core: Pass REST auth manager to S3 signer (#2846)
ba656191 is described below
commit ba65619113ebfd56515975b2e31a3f3df14bed55
Author: Soham <[email protected]>
AuthorDate: Sun Jan 18 01:16:08 2026 +0530
Core: Pass REST auth manager to S3 signer (#2846)
What does this change do?
- RestCatalog now passes its AuthManager into FileIO so downstream
components can reuse a live token.
- S3V4RestSigner now calls the AuthManager’s auth_header() when no
static token is provided, ensuring the signer gets a fresh bearer token.
- Added a unit test to verify the signer pulls the Authorization header
from an AuthManager.
Why is this needed?
- After the AuthManager refactor, the signer no longer received a token,
causing remote signing to 401 for REST catalog users (e.g.,
Lakekeeper/MinIO). This restores token propagation and refresh.
How was this tested?
- make lint
- make test
- uv run python -m pytest tests/io/test_fsspec.py -k auth_manager -v
Closes #2544
---------
Co-authored-by: Soham <[email protected]>
Co-authored-by: Fokko Driesprong <[email protected]>
Co-authored-by: Kevin Liu <[email protected]>
---
pyiceberg/catalog/rest/__init__.py | 27 +++++++++++---------
pyiceberg/catalog/rest/auth.py | 2 ++
pyiceberg/io/fsspec.py | 15 ++++++++---
tests/io/test_fsspec.py | 51 ++++++++++++++++++++++++++++++++++++++
4 files changed, 80 insertions(+), 15 deletions(-)
diff --git a/pyiceberg/catalog/rest/__init__.py
b/pyiceberg/catalog/rest/__init__.py
index 84188e58..d7ef6ec8 100644
--- a/pyiceberg/catalog/rest/__init__.py
+++ b/pyiceberg/catalog/rest/__init__.py
@@ -27,15 +27,8 @@ from requests import HTTPError, Session
from tenacity import RetryCallState, retry, retry_if_exception_type,
stop_after_attempt
from pyiceberg import __version__
-from pyiceberg.catalog import (
- BOTOCORE_SESSION,
- TOKEN,
- URI,
- WAREHOUSE_LOCATION,
- Catalog,
- PropertiesUpdateSummary,
-)
-from pyiceberg.catalog.rest.auth import AuthManager, AuthManagerAdapter,
AuthManagerFactory, LegacyOAuth2AuthManager
+from pyiceberg.catalog import BOTOCORE_SESSION, TOKEN, URI,
WAREHOUSE_LOCATION, Catalog, PropertiesUpdateSummary
+from pyiceberg.catalog.rest.auth import AUTH_MANAGER, AuthManager,
AuthManagerAdapter, AuthManagerFactory, LegacyOAuth2AuthManager
from pyiceberg.catalog.rest.response import _handle_non_200_response
from pyiceberg.catalog.rest.scan_planning import (
FetchScanTasksRequest,
@@ -61,7 +54,7 @@ from pyiceberg.exceptions import (
TableAlreadyExistsError,
UnauthorizedError,
)
-from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY,
AWS_SESSION_TOKEN
+from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY,
AWS_SESSION_TOKEN, FileIO, load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC,
PartitionSpec, assign_fresh_partition_spec_ids
from pyiceberg.schema import Schema, assign_fresh_schema_ids
from pyiceberg.table import (
@@ -335,6 +328,7 @@ _PLANNING_RESPONSE_ADAPTER = TypeAdapter(PlanningResponse)
class RestCatalog(Catalog):
uri: str
_session: Session
+ _auth_manager: AuthManager | None
_supported_endpoints: set[Endpoint]
def __init__(self, name: str, **properties: str):
@@ -347,6 +341,7 @@ class RestCatalog(Catalog):
properties: Properties that are passed along to the configuration.
"""
super().__init__(name, **properties)
+ self._auth_manager: AuthManager | None = None
self.uri = properties[URI]
self._fetch_config()
self._session = self._create_session()
@@ -381,9 +376,11 @@ class RestCatalog(Catalog):
if auth_type != CUSTOM and auth_impl:
raise ValueError("auth.impl can only be specified when using
custom auth.type")
- session.auth =
AuthManagerAdapter(AuthManagerFactory.create(auth_impl or auth_type,
auth_type_config))
+ self._auth_manager = AuthManagerFactory.create(auth_impl or
auth_type, auth_type_config)
+ session.auth = AuthManagerAdapter(self._auth_manager)
else:
- session.auth =
AuthManagerAdapter(self._create_legacy_oauth2_auth_manager(session))
+ self._auth_manager =
self._create_legacy_oauth2_auth_manager(session)
+ session.auth = AuthManagerAdapter(self._auth_manager)
# Configure SigV4 Request Signing
if property_as_bool(self.properties, SIGV4, False):
@@ -391,6 +388,12 @@ class RestCatalog(Catalog):
return session
+ def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str
| None = None) -> FileIO:
+ merged_properties = {**self.properties, **properties}
+ if self._auth_manager:
+ merged_properties[AUTH_MANAGER] = self._auth_manager
+ return load_file_io(merged_properties, location)
+
def supports_server_side_planning(self) -> bool:
"""Check if the catalog supports server-side scan planning."""
return Capability.V1_SUBMIT_TABLE_SCAN_PLAN in
self._supported_endpoints and property_as_bool(
diff --git a/pyiceberg/catalog/rest/auth.py b/pyiceberg/catalog/rest/auth.py
index 3fdc837c..4f67ab90 100644
--- a/pyiceberg/catalog/rest/auth.py
+++ b/pyiceberg/catalog/rest/auth.py
@@ -31,6 +31,8 @@ from requests.auth import AuthBase
from pyiceberg.catalog.rest.response import TokenResponse,
_handle_non_200_response
from pyiceberg.exceptions import OAuthError
+AUTH_MANAGER = "auth.manager"
+
COLON = ":"
logger = logging.getLogger(__name__)
diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py
index 5898a226..eb5342c9 100644
--- a/pyiceberg/io/fsspec.py
+++ b/pyiceberg/io/fsspec.py
@@ -37,6 +37,7 @@ from fsspec.implementations.local import LocalFileSystem
from requests import HTTPError
from pyiceberg.catalog import TOKEN, URI
+from pyiceberg.catalog.rest.auth import AUTH_MANAGER
from pyiceberg.exceptions import SignError
from pyiceberg.io import (
ADLS_ACCOUNT_HOST,
@@ -121,9 +122,17 @@ class S3V4RestSigner(S3RequestSigner):
signer_url = self.properties.get(S3_SIGNER_URI,
self.properties[URI]).rstrip("/") # type: ignore
signer_endpoint = self.properties.get(S3_SIGNER_ENDPOINT,
S3_SIGNER_ENDPOINT_DEFAULT)
- signer_headers = {}
- if token := self.properties.get(TOKEN):
- signer_headers = {"Authorization": f"Bearer {token}"}
+ signer_headers: dict[str, str] = {}
+
+ auth_header: str | None = None
+ if auth_manager := self.properties.get(AUTH_MANAGER):
+ auth_header = auth_manager.auth_header()
+ elif token := self.properties.get(TOKEN):
+ auth_header = f"Bearer {token}"
+
+ if auth_header:
+ signer_headers["Authorization"] = auth_header
+
signer_headers.update(get_header_properties(self.properties))
signer_body = {
diff --git a/tests/io/test_fsspec.py b/tests/io/test_fsspec.py
index c28eb071..9e4a1da5 100644
--- a/tests/io/test_fsspec.py
+++ b/tests/io/test_fsspec.py
@@ -28,6 +28,7 @@ from fsspec.implementations.local import LocalFileSystem
from fsspec.spec import AbstractFileSystem
from requests_mock import Mocker
+from pyiceberg.catalog.rest.auth import AUTH_MANAGER
from pyiceberg.exceptions import SignError
from pyiceberg.io import fsspec
from pyiceberg.io.fsspec import FsspecFileIO, S3V4RestSigner
@@ -948,3 +949,53 @@ def test_s3v4_rest_signer_forbidden(requests_mock: Mocker)
-> None:
"""Failed to sign request 401: {'method': 'HEAD', 'region':
'us-west-2', 'uri':
'https://bucket/metadata/snap-8048355899640248710-1-a5c8ea2d-aa1f-48e8-89f4-1fa69db8c742.avro',
'headers': {'User-Agent': ['Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0']}}"""
in str(exc_info.value)
)
+
+
+def test_s3v4_rest_signer_uses_auth_manager(requests_mock: Mocker) -> None:
+ new_uri = "https://bucket/metadata/snap-signed.avro"
+ requests_mock.post(
+ f"{TEST_URI}/v1/aws/s3/sign",
+ json={
+ "uri": new_uri,
+ "headers": {
+ "Authorization": ["AWS4-HMAC-SHA256
Credential=ASIA.../s3/aws4_request, SignedHeaders=host, Signature=abc"],
+ "Host": ["bucket.s3.us-west-2.amazonaws.com"],
+ },
+ "extensions": {},
+ },
+ status_code=200,
+ )
+
+ request = AWSRequest(
+ method="HEAD",
+ url="https://bucket/metadata/snap-foo.avro",
+ headers={"User-Agent": "Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0"},
+ data=b"",
+ params={},
+ auth_path="/metadata/snap-foo.avro",
+ )
+ request.context = {
+ "client_region": "us-west-2",
+ "has_streaming_input": False,
+ "auth_type": None,
+ "signing": {"bucket": "bucket"},
+ "retries": {"attempt": 1, "invocation-id":
"75d143fb-0219-439b-872c-18213d1c8d54"},
+ }
+
+ class DummyAuthManager:
+ def __init__(self) -> None:
+ self.calls = 0
+
+ def auth_header(self) -> str:
+ self.calls += 1
+ return "Bearer via-manager"
+
+ auth_manager = DummyAuthManager()
+
+ signer = S3V4RestSigner(properties={AUTH_MANAGER: auth_manager, "uri":
TEST_URI})
+ signer(request)
+
+ assert auth_manager.calls == 1
+ assert requests_mock.last_request is not None
+ assert requests_mock.last_request.headers["Authorization"] == "Bearer
via-manager"
+ assert request.url == new_uri