This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d28e8de5f9 Python: Fix PyArrow HDFS support (#8524)
d28e8de5f9 is described below

commit d28e8de5f9fb50e4c77da62efa29fe70acaf3d8d
Author: frankliee <[email protected]>
AuthorDate: Sun Sep 10 05:43:09 2023 +0800

    Python: Fix PyArrow HDFS support (#8524)
---
 python/pyiceberg/io/pyarrow.py  | 25 +++++++++++++++++--------
 python/tests/io/test_pyarrow.py | 16 ++++++++++++++++
 2 files changed, 33 insertions(+), 8 deletions(-)

diff --git a/python/pyiceberg/io/pyarrow.py b/python/pyiceberg/io/pyarrow.py
index 83bd79d2fe..3453b18e44 100644
--- a/python/pyiceberg/io/pyarrow.py
+++ b/python/pyiceberg/io/pyarrow.py
@@ -297,7 +297,12 @@ class PyArrowFileIO(FileIO):
     def parse_location(location: str) -> Tuple[str, str]:
         """Return the path without the scheme."""
         uri = urlparse(location)
-        return uri.scheme or "file", os.path.abspath(location) if not 
uri.scheme else f"{uri.netloc}{uri.path}"
+        if not uri.scheme:
+            return "file", os.path.abspath(location)
+        elif uri.scheme == "hdfs":
+            return uri.scheme, location
+        else:
+            return uri.scheme, f"{uri.netloc}{uri.path}"
 
     def _get_fs(self, scheme: str) -> FileSystem:
         if scheme in {"s3", "s3a", "s3n"}:
@@ -314,13 +319,17 @@ class PyArrowFileIO(FileIO):
 
             return S3FileSystem(**client_kwargs)
         elif scheme == "hdfs":
-            client_kwargs = {
-                "host": self.properties.get(HDFS_HOST),
-                "port": self.properties.get(HDFS_PORT),
-                "user": self.properties.get(HDFS_USER),
-                "kerb_ticket": self.properties.get(HDFS_KERB_TICKET),
-            }
-            return HadoopFileSystem(**client_kwargs)
+            hdfs_kwargs: Dict[str, Any] = {}
+            if host := self.properties.get(HDFS_HOST):
+                hdfs_kwargs["host"] = host
+            if port := self.properties.get(HDFS_PORT):
+                # port should be an integer type
+                hdfs_kwargs["port"] = int(port)
+            if user := self.properties.get(HDFS_USER):
+                hdfs_kwargs["user"] = user
+            if kerb_ticket := self.properties.get(HDFS_KERB_TICKET):
+                hdfs_kwargs["kerb_ticket"] = kerb_ticket
+            return HadoopFileSystem(**hdfs_kwargs)
         elif scheme in {"gs", "gcs"}:
             gcs_kwargs: Dict[str, Any] = {}
             if access_token := self.properties.get(GCS_TOKEN):
diff --git a/python/tests/io/test_pyarrow.py b/python/tests/io/test_pyarrow.py
index 366eda53f5..92300d2337 100644
--- a/python/tests/io/test_pyarrow.py
+++ b/python/tests/io/test_pyarrow.py
@@ -1527,3 +1527,19 @@ def 
test_writing_avro_file_gcs(generated_manifest_entry_file: str, pyarrow_filei
             assert b1 == b2  # Check that bytes of read from local avro file 
match bytes written to s3
 
     pyarrow_fileio_gcs.delete(f"gs://warehouse/{filename}")
+
+
+def test_parse_hdfs_location() -> None:
+    locations = ["hdfs://127.0.0.1:9000/root/foo.txt", 
"hdfs://127.0.0.1/root/foo.txt"]
+    for location in locations:
+        schema, path = PyArrowFileIO.parse_location(location)
+        assert schema == "hdfs"
+        assert location == path
+
+
+def test_parse_local_location() -> None:
+    locations = ["/root/foo.txt", "/root/tmp/foo.txt"]
+    for location in locations:
+        schema, path = PyArrowFileIO.parse_location(location)
+        assert schema == "file"
+        assert location == path

Reply via email to