This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b50317c70 Python: Fix the caching (#6010)
1b50317c70 is described below

commit 1b50317c7090b67f915d712eaa1bb505efb112c3
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Fri Nov 4 21:10:01 2022 +0100

    Python: Fix the caching (#6010)
---
 python/pyiceberg/io/pyarrow.py  | 33 +++++++++++++++++++++------------
 python/tests/io/test_pyarrow.py |  8 ++++----
 2 files changed, 25 insertions(+), 16 deletions(-)

diff --git a/python/pyiceberg/io/pyarrow.py b/python/pyiceberg/io/pyarrow.py
index e70cec6349..bcc6dc94bf 100644
--- a/python/pyiceberg/io/pyarrow.py
+++ b/python/pyiceberg/io/pyarrow.py
@@ -37,6 +37,7 @@ from pyarrow.fs import (
     FileInfo,
     FileSystem,
     FileType,
+    LocalFileSystem,
     S3FileSystem,
 )
 
@@ -197,22 +198,27 @@ class PyArrowFile(InputFile, OutputFile):
 
 class PyArrowFileIO(FileIO):
     def __init__(self, properties: Properties = EMPTY_DICT):
-        self.get_fs_and_path: Callable = lru_cache(self._get_fs_and_path)
+        self.get_fs: Callable = lru_cache(self._get_fs)
         super().__init__(properties=properties)
 
-    def _get_fs_and_path(self, location: str) -> Tuple[FileSystem, str]:
-        uri = urlparse(location)  # Create a ParseResult from the URI
-        if not uri.scheme:  # If no scheme, assume the path is to a local file
-            return FileSystem.from_uri(os.path.abspath(location))
-        elif uri.scheme in {"s3", "s3a", "s3n"}:
+    @staticmethod
+    def parse_location(location: str) -> Tuple[str, str]:
+        """Returns the path without the scheme"""
+        uri = urlparse(location)
+        return uri.scheme or "file", os.path.abspath(location) if not 
uri.scheme else f"{uri.netloc}{uri.path}"
+
+    def _get_fs(self, scheme: str) -> FileSystem:
+        if scheme in {"s3", "s3a", "s3n"}:
             client_kwargs = {
                 "endpoint_override": self.properties.get("s3.endpoint"),
                 "access_key": self.properties.get("s3.access-key-id"),
                 "secret_key": self.properties.get("s3.secret-access-key"),
             }
-            return (S3FileSystem(**client_kwargs), uri.netloc + uri.path)
+            return S3FileSystem(**client_kwargs)
+        elif scheme == "file":
+            return LocalFileSystem()
         else:
-            return FileSystem.from_uri(location)  # Infer the proper filesystem
+            raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")
 
     def new_input(self, location: str) -> PyArrowFile:
         """Get a PyArrowFile instance to read bytes from the file at the given 
location
@@ -223,7 +229,8 @@ class PyArrowFileIO(FileIO):
         Returns:
             PyArrowFile: A PyArrowFile instance for the given location
         """
-        fs, path = self.get_fs_and_path(location)
+        scheme, path = self.parse_location(location)
+        fs = self._get_fs(scheme)
         return PyArrowFile(fs=fs, location=location, path=path)
 
     def new_output(self, location: str) -> PyArrowFile:
@@ -235,7 +242,8 @@ class PyArrowFileIO(FileIO):
         Returns:
             PyArrowFile: A PyArrowFile instance for the given location
         """
-        fs, path = self.get_fs_and_path(location)
+        scheme, path = self.parse_location(location)
+        fs = self._get_fs(scheme)
         return PyArrowFile(fs=fs, location=location, path=path)
 
     def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
@@ -251,8 +259,9 @@ class PyArrowFileIO(FileIO):
             PermissionError: If the file at the provided location cannot be 
accessed due to a permission error such as
                 an AWS error code 15
         """
-        str_path = location.location if isinstance(location, (InputFile, 
OutputFile)) else location
-        fs, path = self.get_fs_and_path(str_path)
+        str_location = location.location if isinstance(location, (InputFile, 
OutputFile)) else location
+        scheme, path = self.parse_location(str_location)
+        fs = self._get_fs(scheme)
 
         try:
             fs.delete_file(path)
diff --git a/python/tests/io/test_pyarrow.py b/python/tests/io/test_pyarrow.py
index 5a08011c03..db25c0d134 100644
--- a/python/tests/io/test_pyarrow.py
+++ b/python/tests/io/test_pyarrow.py
@@ -266,8 +266,8 @@ def test_deleting_s3_file_no_permission():
     s3fs_mock = MagicMock()
     s3fs_mock.delete_file.side_effect = OSError("AWS Error [code 15]")
 
-    with patch.object(PyArrowFileIO, "_get_fs_and_path") as submocked:
-        submocked.return_value = (s3fs_mock, "bar/foo.txt")
+    with patch.object(PyArrowFileIO, "_get_fs") as submocked:
+        submocked.return_value = s3fs_mock
 
         with pytest.raises(PermissionError) as exc_info:
             PyArrowFileIO().delete("s3://foo/bar.txt")
@@ -281,8 +281,8 @@ def test_deleting_s3_file_not_found():
     s3fs_mock = MagicMock()
     s3fs_mock.delete_file.side_effect = OSError("Path does not exist")
 
-    with patch.object(PyArrowFileIO, "_get_fs_and_path") as submocked:
-        submocked.return_value = (s3fs_mock, "bar/foo.txt")
+    with patch.object(PyArrowFileIO, "_get_fs") as submocked:
+        submocked.return_value = s3fs_mock
 
         with pytest.raises(FileNotFoundError) as exc_info:
             PyArrowFileIO().delete("s3://foo/bar.txt")

Reply via email to