This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new ba656191 Core: Pass REST auth manager to S3 signer (#2846)
ba656191 is described below

commit ba65619113ebfd56515975b2e31a3f3df14bed55
Author: Soham <[email protected]>
AuthorDate: Sun Jan 18 01:16:08 2026 +0530

    Core: Pass REST auth manager to S3 signer (#2846)
    
    What does this change do?
    - RestCatalog now passes its AuthManager into FileIO so downstream
    components can reuse a live token.
    - S3V4RestSigner now calls the AuthManager’s auth_header() when no
    static token is provided, ensuring the signer gets a fresh bearer token.
    - Added a unit test to verify the signer pulls the Authorization header
    from an AuthManager.
    
    Why is this needed?
    - After the AuthManager refactor, the signer no longer received a token,
    causing remote signing to 401 for REST catalog users (e.g.,
    Lakekeeper/MinIO). This restores token propagation and refresh.
    
    How was this tested?
    - make lint
    - make test
    - uv run python -m pytest tests/io/test_fsspec.py -k auth_manager -v
    
    Closes #2544
    
    ---------
    
    Co-authored-by: Soham <[email protected]>
    Co-authored-by: Fokko Driesprong <[email protected]>
    Co-authored-by: Kevin Liu <[email protected]>
---
 pyiceberg/catalog/rest/__init__.py | 27 +++++++++++---------
 pyiceberg/catalog/rest/auth.py     |  2 ++
 pyiceberg/io/fsspec.py             | 15 ++++++++---
 tests/io/test_fsspec.py            | 51 ++++++++++++++++++++++++++++++++++++++
 4 files changed, 80 insertions(+), 15 deletions(-)

diff --git a/pyiceberg/catalog/rest/__init__.py 
b/pyiceberg/catalog/rest/__init__.py
index 84188e58..d7ef6ec8 100644
--- a/pyiceberg/catalog/rest/__init__.py
+++ b/pyiceberg/catalog/rest/__init__.py
@@ -27,15 +27,8 @@ from requests import HTTPError, Session
 from tenacity import RetryCallState, retry, retry_if_exception_type, 
stop_after_attempt
 
 from pyiceberg import __version__
-from pyiceberg.catalog import (
-    BOTOCORE_SESSION,
-    TOKEN,
-    URI,
-    WAREHOUSE_LOCATION,
-    Catalog,
-    PropertiesUpdateSummary,
-)
-from pyiceberg.catalog.rest.auth import AuthManager, AuthManagerAdapter, 
AuthManagerFactory, LegacyOAuth2AuthManager
+from pyiceberg.catalog import BOTOCORE_SESSION, TOKEN, URI, 
WAREHOUSE_LOCATION, Catalog, PropertiesUpdateSummary
+from pyiceberg.catalog.rest.auth import AUTH_MANAGER, AuthManager, 
AuthManagerAdapter, AuthManagerFactory, LegacyOAuth2AuthManager
 from pyiceberg.catalog.rest.response import _handle_non_200_response
 from pyiceberg.catalog.rest.scan_planning import (
     FetchScanTasksRequest,
@@ -61,7 +54,7 @@ from pyiceberg.exceptions import (
     TableAlreadyExistsError,
     UnauthorizedError,
 )
-from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, 
AWS_SESSION_TOKEN
+from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, 
AWS_SESSION_TOKEN, FileIO, load_file_io
 from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, 
PartitionSpec, assign_fresh_partition_spec_ids
 from pyiceberg.schema import Schema, assign_fresh_schema_ids
 from pyiceberg.table import (
@@ -335,6 +328,7 @@ _PLANNING_RESPONSE_ADAPTER = TypeAdapter(PlanningResponse)
 class RestCatalog(Catalog):
     uri: str
     _session: Session
+    _auth_manager: AuthManager | None
     _supported_endpoints: set[Endpoint]
 
     def __init__(self, name: str, **properties: str):
@@ -347,6 +341,7 @@ class RestCatalog(Catalog):
             properties: Properties that are passed along to the configuration.
         """
         super().__init__(name, **properties)
+        self._auth_manager: AuthManager | None = None
         self.uri = properties[URI]
         self._fetch_config()
         self._session = self._create_session()
@@ -381,9 +376,11 @@ class RestCatalog(Catalog):
             if auth_type != CUSTOM and auth_impl:
                 raise ValueError("auth.impl can only be specified when using 
custom auth.type")
 
-            session.auth = 
AuthManagerAdapter(AuthManagerFactory.create(auth_impl or auth_type, 
auth_type_config))
+            self._auth_manager = AuthManagerFactory.create(auth_impl or 
auth_type, auth_type_config)
+            session.auth = AuthManagerAdapter(self._auth_manager)
         else:
-            session.auth = 
AuthManagerAdapter(self._create_legacy_oauth2_auth_manager(session))
+            self._auth_manager = 
self._create_legacy_oauth2_auth_manager(session)
+            session.auth = AuthManagerAdapter(self._auth_manager)
 
         # Configure SigV4 Request Signing
         if property_as_bool(self.properties, SIGV4, False):
@@ -391,6 +388,12 @@ class RestCatalog(Catalog):
 
         return session
 
+    def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str 
| None = None) -> FileIO:
+        merged_properties = {**self.properties, **properties}
+        if self._auth_manager:
+            merged_properties[AUTH_MANAGER] = self._auth_manager
+        return load_file_io(merged_properties, location)
+
     def supports_server_side_planning(self) -> bool:
         """Check if the catalog supports server-side scan planning."""
         return Capability.V1_SUBMIT_TABLE_SCAN_PLAN in 
self._supported_endpoints and property_as_bool(
diff --git a/pyiceberg/catalog/rest/auth.py b/pyiceberg/catalog/rest/auth.py
index 3fdc837c..4f67ab90 100644
--- a/pyiceberg/catalog/rest/auth.py
+++ b/pyiceberg/catalog/rest/auth.py
@@ -31,6 +31,8 @@ from requests.auth import AuthBase
 from pyiceberg.catalog.rest.response import TokenResponse, 
_handle_non_200_response
 from pyiceberg.exceptions import OAuthError
 
+AUTH_MANAGER = "auth.manager"
+
 COLON = ":"
 logger = logging.getLogger(__name__)
 
diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py
index 5898a226..eb5342c9 100644
--- a/pyiceberg/io/fsspec.py
+++ b/pyiceberg/io/fsspec.py
@@ -37,6 +37,7 @@ from fsspec.implementations.local import LocalFileSystem
 from requests import HTTPError
 
 from pyiceberg.catalog import TOKEN, URI
+from pyiceberg.catalog.rest.auth import AUTH_MANAGER
 from pyiceberg.exceptions import SignError
 from pyiceberg.io import (
     ADLS_ACCOUNT_HOST,
@@ -121,9 +122,17 @@ class S3V4RestSigner(S3RequestSigner):
         signer_url = self.properties.get(S3_SIGNER_URI, 
self.properties[URI]).rstrip("/")  # type: ignore
         signer_endpoint = self.properties.get(S3_SIGNER_ENDPOINT, 
S3_SIGNER_ENDPOINT_DEFAULT)
 
-        signer_headers = {}
-        if token := self.properties.get(TOKEN):
-            signer_headers = {"Authorization": f"Bearer {token}"}
+        signer_headers: dict[str, str] = {}
+
+        auth_header: str | None = None
+        if auth_manager := self.properties.get(AUTH_MANAGER):
+            auth_header = auth_manager.auth_header()
+        elif token := self.properties.get(TOKEN):
+            auth_header = f"Bearer {token}"
+
+        if auth_header:
+            signer_headers["Authorization"] = auth_header
+
         signer_headers.update(get_header_properties(self.properties))
 
         signer_body = {
diff --git a/tests/io/test_fsspec.py b/tests/io/test_fsspec.py
index c28eb071..9e4a1da5 100644
--- a/tests/io/test_fsspec.py
+++ b/tests/io/test_fsspec.py
@@ -28,6 +28,7 @@ from fsspec.implementations.local import LocalFileSystem
 from fsspec.spec import AbstractFileSystem
 from requests_mock import Mocker
 
+from pyiceberg.catalog.rest.auth import AUTH_MANAGER
 from pyiceberg.exceptions import SignError
 from pyiceberg.io import fsspec
 from pyiceberg.io.fsspec import FsspecFileIO, S3V4RestSigner
@@ -948,3 +949,53 @@ def test_s3v4_rest_signer_forbidden(requests_mock: Mocker) 
-> None:
         """Failed to sign request 401: {'method': 'HEAD', 'region': 
'us-west-2', 'uri': 
'https://bucket/metadata/snap-8048355899640248710-1-a5c8ea2d-aa1f-48e8-89f4-1fa69db8c742.avro',
 'headers': {'User-Agent': ['Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0']}}"""
         in str(exc_info.value)
     )
+
+
+def test_s3v4_rest_signer_uses_auth_manager(requests_mock: Mocker) -> None:
+    new_uri = "https://bucket/metadata/snap-signed.avro";
+    requests_mock.post(
+        f"{TEST_URI}/v1/aws/s3/sign",
+        json={
+            "uri": new_uri,
+            "headers": {
+                "Authorization": ["AWS4-HMAC-SHA256 
Credential=ASIA.../s3/aws4_request, SignedHeaders=host, Signature=abc"],
+                "Host": ["bucket.s3.us-west-2.amazonaws.com"],
+            },
+            "extensions": {},
+        },
+        status_code=200,
+    )
+
+    request = AWSRequest(
+        method="HEAD",
+        url="https://bucket/metadata/snap-foo.avro";,
+        headers={"User-Agent": "Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0"},
+        data=b"",
+        params={},
+        auth_path="/metadata/snap-foo.avro",
+    )
+    request.context = {
+        "client_region": "us-west-2",
+        "has_streaming_input": False,
+        "auth_type": None,
+        "signing": {"bucket": "bucket"},
+        "retries": {"attempt": 1, "invocation-id": 
"75d143fb-0219-439b-872c-18213d1c8d54"},
+    }
+
+    class DummyAuthManager:
+        def __init__(self) -> None:
+            self.calls = 0
+
+        def auth_header(self) -> str:
+            self.calls += 1
+            return "Bearer via-manager"
+
+    auth_manager = DummyAuthManager()
+
+    signer = S3V4RestSigner(properties={AUTH_MANAGER: auth_manager, "uri": 
TEST_URI})
+    signer(request)
+
+    assert auth_manager.calls == 1
+    assert requests_mock.last_request is not None
+    assert requests_mock.last_request.headers["Authorization"] == "Bearer 
via-manager"
+    assert request.url == new_uri

Reply via email to