This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d28e8de5f9 Python: Fix PyArrow HDFS support (#8524)
d28e8de5f9 is described below
commit d28e8de5f9fb50e4c77da62efa29fe70acaf3d8d
Author: frankliee <[email protected]>
AuthorDate: Sun Sep 10 05:43:09 2023 +0800
Python: Fix PyArrow HDFS support (#8524)
---
python/pyiceberg/io/pyarrow.py | 25 +++++++++++++++++--------
python/tests/io/test_pyarrow.py | 16 ++++++++++++++++
2 files changed, 33 insertions(+), 8 deletions(-)
diff --git a/python/pyiceberg/io/pyarrow.py b/python/pyiceberg/io/pyarrow.py
index 83bd79d2fe..3453b18e44 100644
--- a/python/pyiceberg/io/pyarrow.py
+++ b/python/pyiceberg/io/pyarrow.py
@@ -297,7 +297,12 @@ class PyArrowFileIO(FileIO):
def parse_location(location: str) -> Tuple[str, str]:
"""Return the path without the scheme."""
uri = urlparse(location)
- return uri.scheme or "file", os.path.abspath(location) if not
uri.scheme else f"{uri.netloc}{uri.path}"
+ if not uri.scheme:
+ return "file", os.path.abspath(location)
+ elif uri.scheme == "hdfs":
+ return uri.scheme, location
+ else:
+ return uri.scheme, f"{uri.netloc}{uri.path}"
def _get_fs(self, scheme: str) -> FileSystem:
if scheme in {"s3", "s3a", "s3n"}:
@@ -314,13 +319,17 @@ class PyArrowFileIO(FileIO):
return S3FileSystem(**client_kwargs)
elif scheme == "hdfs":
- client_kwargs = {
- "host": self.properties.get(HDFS_HOST),
- "port": self.properties.get(HDFS_PORT),
- "user": self.properties.get(HDFS_USER),
- "kerb_ticket": self.properties.get(HDFS_KERB_TICKET),
- }
- return HadoopFileSystem(**client_kwargs)
+ hdfs_kwargs: Dict[str, Any] = {}
+ if host := self.properties.get(HDFS_HOST):
+ hdfs_kwargs["host"] = host
+ if port := self.properties.get(HDFS_PORT):
+ # port should be an integer type
+ hdfs_kwargs["port"] = int(port)
+ if user := self.properties.get(HDFS_USER):
+ hdfs_kwargs["user"] = user
+ if kerb_ticket := self.properties.get(HDFS_KERB_TICKET):
+ hdfs_kwargs["kerb_ticket"] = kerb_ticket
+ return HadoopFileSystem(**hdfs_kwargs)
elif scheme in {"gs", "gcs"}:
gcs_kwargs: Dict[str, Any] = {}
if access_token := self.properties.get(GCS_TOKEN):
diff --git a/python/tests/io/test_pyarrow.py b/python/tests/io/test_pyarrow.py
index 366eda53f5..92300d2337 100644
--- a/python/tests/io/test_pyarrow.py
+++ b/python/tests/io/test_pyarrow.py
@@ -1527,3 +1527,19 @@ def
test_writing_avro_file_gcs(generated_manifest_entry_file: str, pyarrow_filei
assert b1 == b2 # Check that bytes of read from local avro file
match bytes written to s3
pyarrow_fileio_gcs.delete(f"gs://warehouse/{filename}")
+
+
+def test_parse_hdfs_location() -> None:
+ locations = ["hdfs://127.0.0.1:9000/root/foo.txt",
"hdfs://127.0.0.1/root/foo.txt"]
+ for location in locations:
+ schema, path = PyArrowFileIO.parse_location(location)
+ assert schema == "hdfs"
+ assert location == path
+
+
+def test_parse_local_location() -> None:
+ locations = ["/root/foo.txt", "/root/tmp/foo.txt"]
+ for location in locations:
+ schema, path = PyArrowFileIO.parse_location(location)
+ assert schema == "file"
+ assert location == path