This is an automated email from the ASF dual-hosted git repository. blue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push: new 1b50317c70 Python: Fix the caching (#6010) 1b50317c70 is described below commit 1b50317c7090b67f915d712eaa1bb505efb112c3 Author: Fokko Driesprong <fo...@apache.org> AuthorDate: Fri Nov 4 21:10:01 2022 +0100 Python: Fix the caching (#6010) --- python/pyiceberg/io/pyarrow.py | 33 +++++++++++++++++++++------------ python/tests/io/test_pyarrow.py | 8 ++++---- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/python/pyiceberg/io/pyarrow.py b/python/pyiceberg/io/pyarrow.py index e70cec6349..bcc6dc94bf 100644 --- a/python/pyiceberg/io/pyarrow.py +++ b/python/pyiceberg/io/pyarrow.py @@ -37,6 +37,7 @@ from pyarrow.fs import ( FileInfo, FileSystem, FileType, + LocalFileSystem, S3FileSystem, ) @@ -197,22 +198,27 @@ class PyArrowFile(InputFile, OutputFile): class PyArrowFileIO(FileIO): def __init__(self, properties: Properties = EMPTY_DICT): - self.get_fs_and_path: Callable = lru_cache(self._get_fs_and_path) + self.get_fs: Callable = lru_cache(self._get_fs) super().__init__(properties=properties) - def _get_fs_and_path(self, location: str) -> Tuple[FileSystem, str]: - uri = urlparse(location) # Create a ParseResult from the URI - if not uri.scheme: # If no scheme, assume the path is to a local file - return FileSystem.from_uri(os.path.abspath(location)) - elif uri.scheme in {"s3", "s3a", "s3n"}: + @staticmethod + def parse_location(location: str) -> Tuple[str, str]: + """Returns the path without the scheme""" + uri = urlparse(location) + return uri.scheme or "file", os.path.abspath(location) if not uri.scheme else f"{uri.netloc}{uri.path}" + + def _get_fs(self, scheme: str) -> FileSystem: + if scheme in {"s3", "s3a", "s3n"}: client_kwargs = { "endpoint_override": self.properties.get("s3.endpoint"), "access_key": self.properties.get("s3.access-key-id"), "secret_key": self.properties.get("s3.secret-access-key"), } - return (S3FileSystem(**client_kwargs), uri.netloc + uri.path) + return S3FileSystem(**client_kwargs) + elif scheme == "file": + return LocalFileSystem() else: - return FileSystem.from_uri(location) # Infer the proper filesystem + raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") def new_input(self, location: str) -> PyArrowFile: """Get a PyArrowFile instance to read bytes from the file at the given location @@ -223,7 +229,8 @@ class PyArrowFileIO(FileIO): Returns: PyArrowFile: A PyArrowFile instance for the given location """ - fs, path = self.get_fs_and_path(location) + scheme, path = self.parse_location(location) + fs = self._get_fs(scheme) return PyArrowFile(fs=fs, location=location, path=path) def new_output(self, location: str) -> PyArrowFile: @@ -235,7 +242,8 @@ class PyArrowFileIO(FileIO): Returns: PyArrowFile: A PyArrowFile instance for the given location """ - fs, path = self.get_fs_and_path(location) + scheme, path = self.parse_location(location) + fs = self._get_fs(scheme) return PyArrowFile(fs=fs, location=location, path=path) def delete(self, location: Union[str, InputFile, OutputFile]) -> None: @@ -251,8 +259,9 @@ class PyArrowFileIO(FileIO): PermissionError: If the file at the provided location cannot be accessed due to a permission error such as an AWS error code 15 """ - str_path = location.location if isinstance(location, (InputFile, OutputFile)) else location - fs, path = self.get_fs_and_path(str_path) + str_location = location.location if isinstance(location, (InputFile, OutputFile)) else location + scheme, path = self.parse_location(str_location) + fs = self._get_fs(scheme) try: fs.delete_file(path) diff --git a/python/tests/io/test_pyarrow.py b/python/tests/io/test_pyarrow.py index 5a08011c03..db25c0d134 100644 --- a/python/tests/io/test_pyarrow.py +++ b/python/tests/io/test_pyarrow.py @@ -266,8 +266,8 @@ def test_deleting_s3_file_no_permission(): s3fs_mock = MagicMock() s3fs_mock.delete_file.side_effect = OSError("AWS Error [code 15]") - with patch.object(PyArrowFileIO, "_get_fs_and_path") as submocked: - submocked.return_value = (s3fs_mock, "bar/foo.txt") + with patch.object(PyArrowFileIO, "_get_fs") as submocked: + submocked.return_value = s3fs_mock with pytest.raises(PermissionError) as exc_info: PyArrowFileIO().delete("s3://foo/bar.txt") @@ -281,8 +281,8 @@ def test_deleting_s3_file_not_found(): s3fs_mock = MagicMock() s3fs_mock.delete_file.side_effect = OSError("Path does not exist") - with patch.object(PyArrowFileIO, "_get_fs_and_path") as submocked: - submocked.return_value = (s3fs_mock, "bar/foo.txt") + with patch.object(PyArrowFileIO, "_get_fs") as submocked: + submocked.return_value = s3fs_mock with pytest.raises(FileNotFoundError) as exc_info: PyArrowFileIO().delete("s3://foo/bar.txt")