This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 95f6273b fix: Extract ADLS account_name from URI hostname in
FsspecFileIO (#3005)
95f6273b is described below
commit 95f6273b23524c6238aafb57fa06e693ef83d6ef
Author: antonlin1 <[email protected]>
AuthorDate: Fri Feb 20 21:47:08 2026 +0100
fix: Extract ADLS account_name from URI hostname in FsspecFileIO (#3005)
---
pyiceberg/io/fsspec.py | 37 +++++++++++++++++++++----------
tests/io/test_fsspec.py | 58 +++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 84 insertions(+), 11 deletions(-)
diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py
index ac108c80..63ec55ba 100644
--- a/pyiceberg/io/fsspec.py
+++ b/pyiceberg/io/fsspec.py
@@ -29,7 +29,7 @@ from typing import (
TYPE_CHECKING,
Any,
)
-from urllib.parse import urlparse
+from urllib.parse import ParseResult, urlparse
import requests
from fsspec import AbstractFileSystem
@@ -244,7 +244,7 @@ def _gs(properties: Properties) -> AbstractFileSystem:
)
-def _adls(properties: Properties) -> AbstractFileSystem:
+def _adls(properties: Properties, hostname: str | None = None) ->
AbstractFileSystem:
# https://fsspec.github.io/adlfs/api/
from adlfs import AzureBlobFileSystem
@@ -259,6 +259,10 @@ def _adls(properties: Properties) -> AbstractFileSystem:
if ADLS_SAS_TOKEN not in properties:
properties[ADLS_SAS_TOKEN] = sas_token
+ # Fallback: extract account_name from URI hostname (e.g.
"account.dfs.core.windows.net" -> "account")
+ if hostname and ADLS_ACCOUNT_NAME not in properties:
+ properties[ADLS_ACCOUNT_NAME] = hostname.split(".")[0]
+
class StaticTokenCredential(AsyncTokenCredential):
_DEFAULT_EXPIRY_SECONDS = 3600
@@ -300,7 +304,7 @@ def _hf(properties: Properties) -> AbstractFileSystem:
)
-SCHEME_TO_FS = {
+SCHEME_TO_FS: dict[str, Callable[..., AbstractFileSystem]] = {
"": _file,
"file": _file,
"s3": _s3,
@@ -313,6 +317,8 @@ SCHEME_TO_FS = {
"hf": _hf,
}
+_ADLS_SCHEMES = frozenset({"abfs", "abfss", "wasb", "wasbs"})
+
class FsspecInputFile(InputFile):
"""An input file implementation for the FsspecFileIO.
@@ -414,8 +420,7 @@ class FsspecFileIO(FileIO):
"""A FileIO implementation that uses fsspec."""
def __init__(self, properties: Properties):
- self._scheme_to_fs = {}
- self._scheme_to_fs.update(SCHEME_TO_FS)
+ self._scheme_to_fs: dict[str, Callable[..., AbstractFileSystem]] =
dict(SCHEME_TO_FS)
self._thread_locals = threading.local()
super().__init__(properties=properties)
@@ -429,7 +434,7 @@ class FsspecFileIO(FileIO):
FsspecInputFile: An FsspecInputFile instance for the given
location.
"""
uri = urlparse(location)
- fs = self.get_fs(uri.scheme)
+ fs = self._get_fs_from_uri(uri)
return FsspecInputFile(location=location, fs=fs)
def new_output(self, location: str) -> FsspecOutputFile:
@@ -442,7 +447,7 @@ class FsspecFileIO(FileIO):
FsspecOutputFile: An FsspecOutputFile instance for the given
location.
"""
uri = urlparse(location)
- fs = self.get_fs(uri.scheme)
+ fs = self._get_fs_from_uri(uri)
return FsspecOutputFile(location=location, fs=fs)
def delete(self, location: str | InputFile | OutputFile) -> None:
@@ -459,20 +464,30 @@ class FsspecFileIO(FileIO):
str_location = location
uri = urlparse(str_location)
- fs = self.get_fs(uri.scheme)
+ fs = self._get_fs_from_uri(uri)
fs.rm(str_location)
- def get_fs(self, scheme: str) -> AbstractFileSystem:
+ def _get_fs_from_uri(self, uri: "ParseResult") -> AbstractFileSystem:
+ """Get a filesystem from a parsed URI, using hostname for ADLS account
resolution."""
+ if uri.scheme in _ADLS_SCHEMES:
+ return self.get_fs(uri.scheme, uri.hostname)
+ return self.get_fs(uri.scheme)
+
+ def get_fs(self, scheme: str, hostname: str | None = None) ->
AbstractFileSystem:
"""Get a filesystem for a specific scheme, cached per thread."""
if not hasattr(self._thread_locals, "get_fs_cached"):
self._thread_locals.get_fs_cached = lru_cache(self._get_fs)
- return self._thread_locals.get_fs_cached(scheme)
+ return self._thread_locals.get_fs_cached(scheme, hostname)
- def _get_fs(self, scheme: str) -> AbstractFileSystem:
+ def _get_fs(self, scheme: str, hostname: str | None = None) ->
AbstractFileSystem:
"""Get a filesystem for a specific scheme."""
if scheme not in self._scheme_to_fs:
raise ValueError(f"No registered filesystem for scheme: {scheme}")
+
+ if scheme in _ADLS_SCHEMES:
+ return _adls(self.properties, hostname)
+
return self._scheme_to_fs[scheme](self.properties)
def __getstate__(self) -> dict[str, Any]:
diff --git a/tests/io/test_fsspec.py b/tests/io/test_fsspec.py
index 392fa60a..bb11fdd7 100644
--- a/tests/io/test_fsspec.py
+++ b/tests/io/test_fsspec.py
@@ -606,6 +606,64 @@ def test_adls_account_name_sas_token_extraction() -> None:
)
+def test_adls_account_name_extracted_from_uri_hostname() -> None:
+ """Test that account_name is extracted from the ABFSS URI hostname when
not in properties."""
+ session_properties: Properties = {
+ "adls.tenant-id": "test-tenant-id",
+ "adls.client-id": "test-client-id",
+ "adls.client-secret": "test-client-secret",
+ }
+
+ with mock.patch("adlfs.AzureBlobFileSystem") as mock_adlfs:
+ adls_fileio = FsspecFileIO(properties=session_properties)
+
+ adls_fileio.new_input(
+
location="abfss://[email protected]"
+ "/unified_datasets/aggregated/data/file.parquet"
+ )
+
+ mock_adlfs.assert_called_with(
+ connection_string=None,
+ credential=None,
+ account_name="usagestorageprod",
+ account_key=None,
+ sas_token=None,
+ tenant_id="test-tenant-id",
+ client_id="test-client-id",
+ client_secret="test-client-secret",
+ account_host=None,
+ anon=None,
+ )
+
+
+def test_adls_account_name_not_overridden_when_in_properties() -> None:
+ """Test that explicit adls.account-name in properties is not overridden by
URI hostname."""
+ session_properties: Properties = {
+ "adls.account-name": "explicitly-configured-account",
+ "adls.tenant-id": "test-tenant-id",
+ "adls.client-id": "test-client-id",
+ "adls.client-secret": "test-client-secret",
+ }
+
+ with mock.patch("adlfs.AzureBlobFileSystem") as mock_adlfs:
+ adls_fileio = FsspecFileIO(properties=session_properties)
+
+
adls_fileio.new_input(location="abfss://[email protected]/path/file.parquet")
+
+ mock_adlfs.assert_called_with(
+ connection_string=None,
+ credential=None,
+ account_name="explicitly-configured-account",
+ account_key=None,
+ sas_token=None,
+ tenant_id="test-tenant-id",
+ client_id="test-client-id",
+ client_secret="test-client-secret",
+ account_host=None,
+ anon=None,
+ )
+
+
@pytest.mark.gcs
def test_fsspec_new_input_file_gcs(fsspec_fileio_gcs: FsspecFileIO) -> None:
"""Test creating a new input file from a fsspec file-io"""