[ 
https://issues.apache.org/jira/browse/ARROW-1643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16396962#comment-16396962
 ] 

ASF GitHub Bot commented on ARROW-1643:
---------------------------------------

wesm closed pull request #1668: ARROW-1643: [Python] Accept hdfs:// prefixes in 
parquet.read_table and attempt to connect to HDFS
URL: https://github.com/apache/arrow/pull/1668
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 42c558b0b..fd9c740f1 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -21,6 +21,13 @@
 import json
 import re
 import six
+from six.moves.urllib.parse import urlparse
+# pathlib might not be available in Python 2
+try:
+    import pathlib
+    _has_pathlib = True
+except ImportError:
+    _has_pathlib = False
 
 import numpy as np
 
@@ -53,6 +60,7 @@ class ParquetFile(object):
     """
     def __init__(self, source, metadata=None, common_metadata=None):
         self.reader = ParquetReader()
+        source = _ensure_file(source)
         self.reader.open(source, metadata=metadata)
         self.common_metadata = common_metadata
         self._nested_paths_by_prefix = self._build_nested_paths()
@@ -279,8 +287,20 @@ def __init__(self, where, schema, flavor=None,
             self.schema_changed = False
 
         self.schema = schema
+        self.where = where
+
+        # If we open a file using an implied filesystem, so it can be assured
+        # to be closed
+        self.file_handle = None
+
+        if is_path(where):
+            fs = _get_fs_from_path(where)
+            sink = self.file_handle = fs.open(where, 'wb')
+        else:
+            sink = where
+
         self.writer = _parquet.ParquetWriter(
-            where, schema,
+            sink, schema,
             version=version,
             compression=compression,
             use_dictionary=use_dictionary,
@@ -310,6 +330,8 @@ def close(self):
         if self.is_open:
             self.writer.close()
             self.is_open = False
+        if self.file_handle is not None:
+            self.file_handle.close()
 
 
 def _get_pandas_index_columns(keyvalues):
@@ -559,8 +581,9 @@ def get_index(self, level, name, key):
         return self.levels[level].get_index(key)
 
 
-def is_string(x):
-    return isinstance(x, six.string_types)
+def is_path(x):
+    return (isinstance(x, six.string_types)
+            or (_has_pathlib and isinstance(x, pathlib.Path)))
 
 
 class ParquetManifest(object):
@@ -569,7 +592,7 @@ class ParquetManifest(object):
     """
     def __init__(self, dirpath, filesystem=None, pathsep='/',
                  partition_scheme='hive'):
-        self.filesystem = filesystem or LocalFileSystem.get_instance()
+        self.filesystem = filesystem or _get_fs_from_path(dirpath)
         self.pathsep = pathsep
         self.dirpath = dirpath
         self.partition_scheme = partition_scheme
@@ -692,7 +715,10 @@ class ParquetDataset(object):
     def __init__(self, path_or_paths, filesystem=None, schema=None,
                  metadata=None, split_row_groups=False, validate_schema=True):
         if filesystem is None:
-            self.fs = LocalFileSystem.get_instance()
+            a_path = path_or_paths
+            if isinstance(a_path, list):
+                a_path = a_path[0]
+            self.fs = _get_fs_from_path(a_path)
         else:
             self.fs = _ensure_filesystem(filesystem)
 
@@ -851,7 +877,7 @@ def _make_manifest(path_or_paths, fs, pathsep='/'):
         # Dask passes a directory as a list of length 1
         path_or_paths = path_or_paths[0]
 
-    if is_string(path_or_paths) and fs.isdir(path_or_paths):
+    if is_path(path_or_paths) and fs.isdir(path_or_paths):
         manifest = ParquetManifest(path_or_paths, filesystem=fs,
                                    pathsep=fs.pathsep)
         common_metadata_path = manifest.common_metadata_path
@@ -904,11 +930,11 @@ def _make_manifest(path_or_paths, fs, pathsep='/'):
 
 def read_table(source, columns=None, nthreads=1, metadata=None,
                use_pandas_metadata=False):
-    if is_string(source):
-        fs = LocalFileSystem.get_instance()
+    if is_path(source):
+        fs = _get_fs_from_path(source)
+
         if fs.isdir(source):
-            return fs.read_parquet(source, columns=columns,
-                                   metadata=metadata)
+            return fs.read_parquet(source, columns=columns, metadata=metadata)
 
     pf = ParquetFile(source, metadata=metadata)
     return pf.read(columns=columns, nthreads=nthreads,
@@ -957,7 +983,7 @@ def write_table(table, where, row_group_size=None, 
version='1.0',
                 **kwargs) as writer:
             writer.write_table(table, row_group_size=row_group_size)
     except Exception:
-        if isinstance(where, six.string_types):
+        if is_path(where):
             try:
                 os.remove(where)
             except os.error:
@@ -1026,7 +1052,7 @@ def write_to_dataset(table, root_path, 
partition_cols=None,
     )
 
     if filesystem is None:
-        fs = LocalFileSystem.get_instance()
+        fs = _get_fs_from_path(root_path)
     else:
         fs = _ensure_filesystem(filesystem)
 
@@ -1113,3 +1139,40 @@ def read_schema(where):
     schema : pyarrow.Schema
     """
     return ParquetFile(where).schema.to_arrow_schema()
+
+
+def _ensure_file(source):
+    if is_path(source):
+        fs = _get_fs_from_path(source)
+        try:
+            return fs.open(source)
+        except IOError as e:
+            raise lib.ArrowIOError("failed to open file {}, {}"
+                                   .format(source, e))
+    elif not hasattr(source, 'seek'):
+        raise ValueError('Source does not appear file-like')
+    else:
+        return source
+
+
+def _get_fs_from_path(path):
+    """
+    return filesystem from path which could be an HDFS URI
+    """
+    # input can be hdfs URI such as hdfs://host:port/myfile.parquet
+    if _has_pathlib and isinstance(path, pathlib.Path):
+        path = str(path)
+    parsed_uri = urlparse(path)
+    if parsed_uri.scheme == 'hdfs':
+        netloc_split = parsed_uri.netloc.split(':')
+        host = netloc_split[0]
+        if host == '':
+            host = 'default'
+        port = 0
+        if len(netloc_split) == 2 and netloc_split[1].isnumeric():
+            port = int(netloc_split[1])
+        fs = pa.hdfs.connect(host=host, port=port)
+    else:
+        fs = LocalFileSystem.get_instance()
+
+    return fs
diff --git a/python/pyarrow/tests/test_hdfs.py 
b/python/pyarrow/tests/test_hdfs.py
index 885272ba8..4840aee48 100644
--- a/python/pyarrow/tests/test_hdfs.py
+++ b/python/pyarrow/tests/test_hdfs.py
@@ -272,19 +272,11 @@ def test_read_whole_file(self):
 
         assert result == data
 
-    @test_parquet.parquet
-    def test_read_multiple_parquet_files(self):
+    def _write_multiple_hdfs_pq_files(self, tmpdir):
         import pyarrow.parquet as pq
-
         nfiles = 10
         size = 5
-
-        tmpdir = pjoin(self.tmp_path, 'multi-parquet-' + guid())
-
-        self.hdfs.mkdir(tmpdir)
-
         test_data = []
-        paths = []
         for i in range(nfiles):
             df = test_parquet._test_dataframe(size, seed=i)
 
@@ -300,15 +292,60 @@ def test_read_multiple_parquet_files(self):
                 pq.write_table(table, f)
 
             test_data.append(table)
-            paths.append(path)
 
-        result = self.hdfs.read_parquet(tmpdir)
         expected = pa.concat_tables(test_data)
+        return expected
+
+    @test_parquet.parquet
+    def test_read_multiple_parquet_files(self):
+
+        tmpdir = pjoin(self.tmp_path, 'multi-parquet-' + guid())
+
+        self.hdfs.mkdir(tmpdir)
+
+        expected = self._write_multiple_hdfs_pq_files(tmpdir)
+        result = self.hdfs.read_parquet(tmpdir)
 
         pdt.assert_frame_equal(result.to_pandas()
                                .sort_values(by='index').reset_index(drop=True),
                                expected.to_pandas())
 
+    @test_parquet.parquet
+    def test_read_multiple_parquet_files_with_uri(self):
+        import pyarrow.parquet as pq
+
+        tmpdir = pjoin(self.tmp_path, 'multi-parquet-uri-' + guid())
+
+        self.hdfs.mkdir(tmpdir)
+
+        expected = self._write_multiple_hdfs_pq_files(tmpdir)
+        path = _get_hdfs_uri(tmpdir)
+        result = pq.read_table(path)
+
+        pdt.assert_frame_equal(result.to_pandas()
+                               .sort_values(by='index').reset_index(drop=True),
+                               expected.to_pandas())
+
+    @test_parquet.parquet
+    def test_read_write_parquet_files_with_uri(self):
+        import pyarrow.parquet as pq
+
+        tmpdir = pjoin(self.tmp_path, 'uri-parquet-' + guid())
+        self.hdfs.mkdir(tmpdir)
+        path = _get_hdfs_uri(pjoin(tmpdir, 'test.parquet'))
+
+        size = 5
+        df = test_parquet._test_dataframe(size, seed=0)
+        # Hack so that we don't have a dtype cast in v1 files
+        df['uint32'] = df['uint32'].astype(np.int64)
+        table = pa.Table.from_pandas(df, preserve_index=False)
+
+        pq.write_table(table, path)
+
+        result = pq.read_table(path).to_pandas()
+
+        pdt.assert_frame_equal(result, df)
+
     @test_parquet.parquet
     def test_read_common_metadata_files(self):
         tmpdir = pjoin(self.tmp_path, 'common-metadata-' + guid())
@@ -357,3 +394,15 @@ class TestLibHdfs3(HdfsTestCases, unittest.TestCase):
     def check_driver(cls):
         if not pa.have_libhdfs3():
             pytest.skip('No libhdfs3 available on system')
+
+
+def _get_hdfs_uri(path):
+    host = os.environ.get('ARROW_HDFS_TEST_HOST', 'localhost')
+    try:
+        port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 0))
+    except ValueError:
+        raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not '
+                         'an integer')
+    uri = "hdfs://{}:{}{}".format(host, port, path)
+
+    return uri


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> [Python] Accept hdfs:// prefixes in parquet.read_table and attempt to connect 
> to HDFS
> -------------------------------------------------------------------------------------
>
>                 Key: ARROW-1643
>                 URL: https://issues.apache.org/jira/browse/ARROW-1643
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: Python
>            Reporter: Wes McKinney
>            Assignee: Ehsan Totoni
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.9.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to