Repository: arrow Updated Branches: refs/heads/master 5a161ebc1 -> 53a478dfb
ARROW-475: [Python] Add support for reading multiple Parquet files as a single pyarrow.Table Also fixes a serious bug in which the data source passed to the ParquetReader gets garbage collected prematurely Also implements ARROW-470 Author: Wes McKinney <[email protected]> Closes #296 from wesm/ARROW-475 and squashes the following commits: 894d2a2 [Wes McKinney] Implement Filesystem abstraction, add Filesystem.read_parquet. Implement rudimentary shim on local filesystem 3927c2c [Wes McKinney] Test read multiple Parquet from HDFS, fix premature garbage collection error 4904b3b [Wes McKinney] Implement read_multiple_files function for multiple Parquet files as a single Arrow table Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/53a478df Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/53a478df Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/53a478df Branch: refs/heads/master Commit: 53a478dfb278dcae5ca7f300b70857662553d118 Parents: 5a161eb Author: Wes McKinney <[email protected]> Authored: Mon Jan 23 06:41:35 2017 -0500 Committer: Wes McKinney <[email protected]> Committed: Mon Jan 23 06:41:35 2017 -0500 ---------------------------------------------------------------------- python/pyarrow/__init__.py | 6 +- python/pyarrow/_parquet.pyx | 3 + python/pyarrow/filesystem.py | 186 ++++++++++++++++++++++ python/pyarrow/includes/libarrow_io.pxd | 2 + python/pyarrow/io.pyx | 62 +++----- python/pyarrow/parquet.py | 88 ++++++++-- python/pyarrow/table.pyx | 60 ++++--- python/pyarrow/tests/test_column.py | 49 ------ python/pyarrow/tests/test_convert_builtin.py | 3 +- python/pyarrow/tests/test_convert_pandas.py | 8 +- python/pyarrow/tests/test_hdfs.py | 46 +++++- python/pyarrow/tests/test_parquet.py | 155 +++++++++++++----- python/pyarrow/tests/test_scalars.py | 2 +- python/pyarrow/tests/test_schema.py | 1 - python/pyarrow/tests/test_table.py | 50 ++++-- python/pyarrow/util.py | 25 +++ 16 files changed, 568 insertions(+), 178 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/__init__.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index efffbf2..d563c7a 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -42,7 +42,8 @@ from pyarrow.array import (Array, from pyarrow.error import ArrowException -from pyarrow.io import (HdfsClient, HdfsFile, NativeFile, PythonFileInterface, +from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem +from pyarrow.io import (HdfsFile, NativeFile, PythonFileInterface, Buffer, InMemoryOutputStream, BufferReader) from pyarrow.scalar import (ArrayValue, Scalar, NA, NAType, @@ -61,3 +62,6 @@ from pyarrow.schema import (null, bool_, DataType, Field, Schema, schema) from pyarrow.table import Column, RecordBatch, Table, concat_tables + + +localfs = LocalFilesystem() http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/_parquet.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 867fc4c..b11cee3 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -341,6 +341,7 @@ cdef logical_type_name_from_enum(ParquetLogicalType type_): cdef class ParquetReader: cdef: + object source MemoryPool* allocator unique_ptr[FileReader] reader column_idx_map @@ -360,6 +361,8 @@ cdef class ParquetReader: if metadata is not None: c_metadata = metadata.sp_metadata + self.source = source + get_reader(source, &rd_handle) with nogil: check_status(OpenFile(rd_handle, self.allocator, properties, http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/filesystem.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py new file mode 100644 index 0000000..82409b7 --- /dev/null +++ b/python/pyarrow/filesystem.py @@ -0,0 +1,186 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from os.path import join as pjoin +import os + +from pyarrow.util import implements +import pyarrow.io as io + + +class Filesystem(object): + """ + Abstract filesystem interface + """ + def ls(self, path): + """ + Return list of file paths + """ + raise NotImplementedError + + def delete(self, path, recursive=False): + """ + Delete the indicated file or directory + + Parameters + ---------- + path : string + recursive : boolean, default False + If True, also delete child paths for directories + """ + raise NotImplementedError + + def mkdir(self, path, create_parents=True): + raise NotImplementedError + + def exists(self, path): + raise NotImplementedError + + def isdir(self, path): + """ + Return True if path is a directory + """ + raise NotImplementedError + + def isfile(self, path): + """ + Return True if path is a file + """ + raise NotImplementedError + + def read_parquet(self, path, columns=None, schema=None): + """ + Read Parquet data from path in file system. Can read from a single file + or a directory of files + + Parameters + ---------- + path : str + Single file path or directory + columns : List[str], optional + Subset of columns to read + schema : pyarrow.parquet.Schema + Known schema to validate files against + + Returns + ------- + table : pyarrow.Table + """ + from pyarrow.parquet import read_multiple_files + + if self.isdir(path): + paths_to_read = [] + for path in self.ls(path): + if path == '_metadata' or path == '_common_metadata': + raise ValueError('No support yet for common metadata file') + paths_to_read.append(path) + else: + paths_to_read = [path] + + return read_multiple_files(paths_to_read, columns=columns, + filesystem=self, schema=schema) + + +class LocalFilesystem(Filesystem): + + @implements(Filesystem.ls) + def ls(self, path): + return sorted(pjoin(path, x) for x in os.listdir(path)) + + @implements(Filesystem.isdir) + def isdir(self, path): + return os.path.isdir(path) + + @implements(Filesystem.isfile) + def isfile(self, path): + return os.path.isfile(path) + + @implements(Filesystem.exists) + def exists(self, path): + return os.path.exists(path) + + def open(self, path, mode='rb'): + """ + Open file for reading or writing + """ + return open(path, mode=mode) + + +class HdfsClient(io._HdfsClient, Filesystem): + """ + Connect to an HDFS cluster. All parameters are optional and should + only be set if the defaults need to be overridden. + + Authentication should be automatic if the HDFS cluster uses Kerberos. + However, if a username is specified, then the ticket cache will likely + be required. + + Parameters + ---------- + host : NameNode. Set to "default" for fs.defaultFS from core-site.xml. + port : NameNode's port. Set to 0 for default or logical (HA) nodes. + user : Username when connecting to HDFS; None implies login user. + kerb_ticket : Path to Kerberos ticket cache. + driver : {'libhdfs', 'libhdfs3'}, default 'libhdfs' + Connect using libhdfs (JNI-based) or libhdfs3 (3rd-party C++ + library from Pivotal Labs) + + Notes + ----- + The first time you call this method, it will take longer than usual due + to JNI spin-up time. + + Returns + ------- + client : HDFSClient + """ + + def __init__(self, host="default", port=0, user=None, kerb_ticket=None, + driver='libhdfs'): + self._connect(host, port, user, kerb_ticket, driver) + + @implements(Filesystem.isdir) + def isdir(self, path): + return io._HdfsClient.isdir(self, path) + + @implements(Filesystem.isfile) + def isfile(self, path): + return io._HdfsClient.isfile(self, path) + + @implements(Filesystem.delete) + def delete(self, path, recursive=False): + return io._HdfsClient.delete(self, path, recursive) + + @implements(Filesystem.mkdir) + def mkdir(self, path, create_parents=True): + return io._HdfsClient.mkdir(self, path) + + def ls(self, path, full_info=False): + """ + Retrieve directory contents and metadata, if requested. + + Parameters + ---------- + path : HDFS path + full_info : boolean, default False + If False, only return list of paths + + Returns + ------- + result : list of dicts (full_info=True) or strings (full_info=False) + """ + return io._HdfsClient.ls(self, path, full_info) http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/includes/libarrow_io.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd index 417af7d..3137938 100644 --- a/python/pyarrow/includes/libarrow_io.pxd +++ b/python/pyarrow/includes/libarrow_io.pxd @@ -148,6 +148,8 @@ cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil: CStatus ListDirectory(const c_string& path, vector[HdfsPathInfo]* listing) + CStatus GetPathInfo(const c_string& path, HdfsPathInfo* info) + CStatus Rename(const c_string& src, const c_string& dst) CStatus OpenReadable(const c_string& path, http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/io.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index 0f626f1..2621512 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -463,42 +463,17 @@ def strip_hdfs_abspath(path): return path -cdef class HdfsClient: +cdef class _HdfsClient: cdef: shared_ptr[CHdfsClient] client cdef readonly: bint is_open - def __cinit__(self, host="default", port=0, user=None, kerb_ticket=None, - driver='libhdfs'): - """ - Connect to an HDFS cluster. All parameters are optional and should - only be set if the defaults need to be overridden. - - Authentication should be automatic if the HDFS cluster uses Kerberos. - However, if a username is specified, then the ticket cache will likely - be required. + def __cinit__(self): + pass - Parameters - ---------- - host : NameNode. Set to "default" for fs.defaultFS from core-site.xml. - port : NameNode's port. Set to 0 for default or logical (HA) nodes. - user : Username when connecting to HDFS; None implies login user. - kerb_ticket : Path to Kerberos ticket cache. - driver : {'libhdfs', 'libhdfs3'}, default 'libhdfs' - Connect using libhdfs (JNI-based) or libhdfs3 (3rd-party C++ - library from Pivotal Labs) - - Notes - ----- - The first time you call this method, it will take longer than usual due - to JNI spin-up time. - - Returns - ------- - client : HDFSClient - """ + def _connect(self, host, port, user, kerb_ticket, driver): cdef HdfsConnectionConfig conf if host is not None: @@ -556,20 +531,25 @@ cdef class HdfsClient: result = self.client.get().Exists(c_path) return result - def ls(self, path, bint full_info=True): - """ - Retrieve directory contents and metadata, if requested. + def isdir(self, path): + cdef HdfsPathInfo info + self._path_info(path, &info) + return info.kind == ObjectType_DIRECTORY - Parameters - ---------- - path : HDFS path - full_info : boolean, default True - If False, only return list of paths + def isfile(self, path): + cdef HdfsPathInfo info + self._path_info(path, &info) + return info.kind == ObjectType_FILE - Returns - ------- - result : list of dicts (full_info=True) or strings (full_info=False) - """ + cdef _path_info(self, path, HdfsPathInfo* info): + cdef c_string c_path = tobytes(path) + + with nogil: + check_status(self.client.get() + .GetPathInfo(c_path, info)) + + + def ls(self, path, bint full_info): cdef: c_string c_path = tobytes(path) vector[HdfsPathInfo] listing http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 2a1ac9d..cbe1c6e 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -15,8 +15,10 @@ # specific language governing permissions and limitations # under the License. -import pyarrow._parquet as _parquet -from pyarrow.table import Table +from pyarrow._parquet import (ParquetReader, FileMetaData, # noqa + RowGroupMetaData, Schema, ParquetWriter) +import pyarrow._parquet as _parquet # noqa +from pyarrow.table import Table, concat_tables class ParquetFile(object): @@ -32,7 +34,7 @@ class ParquetFile(object): Use existing metadata object, rather than reading from file. """ def __init__(self, source, metadata=None): - self.reader = _parquet.ParquetReader() + self.reader = ParquetReader() self.reader.open(source, metadata=metadata) @property @@ -67,10 +69,10 @@ class ParquetFile(object): for column in columns] arrays = [self.reader.read_column(column_idx) for column_idx in column_idxs] - return Table.from_arrays(columns, arrays) + return Table.from_arrays(arrays, names=columns) -def read_table(source, columns=None): +def read_table(source, columns=None, metadata=None): """ Read a Table from Parquet format @@ -81,17 +83,79 @@ def read_table(source, columns=None): pyarrow.io.PythonFileInterface or pyarrow.io.BufferReader. columns: list If not None, only these columns will be read from the file. + metadata : FileMetaData + If separately computed Returns ------- - pyarrow.table.Table + pyarrow.Table Content of the file as a table (of columns) """ - return ParquetFile(source).read(columns=columns) + return ParquetFile(source, metadata=metadata).read(columns=columns) -def write_table(table, sink, chunk_size=None, version=None, - use_dictionary=True, compression=None): +def read_multiple_files(paths, columns=None, filesystem=None, metadata=None, + schema=None): + """ + Read multiple Parquet files as a single pyarrow.Table + + Parameters + ---------- + paths : List[str] + List of file paths + columns : List[str] + Names of columns to read from the file + filesystem : Filesystem, default None + If nothing passed, paths assumed to be found in the local on-disk + filesystem + metadata : pyarrow.parquet.FileMetaData + Use metadata obtained elsewhere to validate file schemas + schema : pyarrow.parquet.Schema + Use schema obtained elsewhere to validate file schemas. Alternative to + metadata parameter + + Returns + ------- + pyarrow.Table + Content of the file as a table (of columns) + """ + if filesystem is None: + def open_file(path, meta=None): + return ParquetFile(path, metadata=meta) + else: + def open_file(path, meta=None): + return ParquetFile(filesystem.open(path, mode='rb'), metadata=meta) + + if len(paths) == 0: + raise ValueError('Must pass at least one file path') + + if metadata is None and schema is None: + schema = open_file(paths[0]).schema + elif schema is None: + schema = metadata.schema + + # Verify schemas are all equal + all_file_metadata = [] + for path in paths: + file_metadata = open_file(path).metadata + if not schema.equals(file_metadata.schema): + raise ValueError('Schema in {0} was different. {1!s} vs {2!s}' + .format(path, file_metadata.schema, schema)) + all_file_metadata.append(file_metadata) + + # Read the tables + tables = [] + for path, path_metadata in zip(paths, all_file_metadata): + reader = open_file(path, meta=path_metadata) + table = reader.read(columns=columns) + tables.append(table) + + all_data = concat_tables(tables) + return all_data + + +def write_table(table, sink, chunk_size=None, version='1.0', + use_dictionary=True, compression='snappy'): """ Write a Table to Parquet format @@ -110,7 +174,7 @@ def write_table(table, sink, chunk_size=None, version=None, compression : str or dict Specify the compression codec, either on a general basis or per-column. """ - writer = _parquet.ParquetWriter(sink, use_dictionary=use_dictionary, - compression=compression, - version=version) + writer = ParquetWriter(sink, use_dictionary=use_dictionary, + compression=compression, + version=version) writer.write_table(table, row_group_size=chunk_size) http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/table.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx index 0e3b2bd..9242330 100644 --- a/python/pyarrow/table.pyx +++ b/python/pyarrow/table.pyx @@ -265,16 +265,35 @@ cdef class Column: cdef _schema_from_arrays(arrays, names, shared_ptr[CSchema]* schema): cdef: Array arr + Column col c_string c_name vector[shared_ptr[CField]] fields + cdef shared_ptr[CDataType] type_ cdef int K = len(arrays) fields.resize(K) - for i in range(K): - arr = arrays[i] - c_name = tobytes(names[i]) - fields[i].reset(new CField(c_name, arr.type.sp_type, True)) + + if len(arrays) == 0: + raise ValueError('Must pass at least one array') + + if isinstance(arrays[0], Array): + if names is None: + raise ValueError('Must pass names when constructing ' + 'from Array objects') + for i in range(K): + arr = arrays[i] + type_ = arr.type.sp_type + c_name = tobytes(names[i]) + fields[i].reset(new CField(c_name, type_, True)) + elif isinstance(arrays[0], Column): + for i in range(K): + col = arrays[i] + type_ = col.sp_column.get().type() + c_name = tobytes(col.name) + fields[i].reset(new CField(c_name, type_, True)) + else: + raise TypeError(type(arrays[0])) schema.reset(new CSchema(fields)) @@ -429,19 +448,19 @@ cdef class RecordBatch: pyarrow.table.RecordBatch """ names, arrays = _dataframe_to_arrays(df, None, False, schema) - return cls.from_arrays(names, arrays) + return cls.from_arrays(arrays, names) @staticmethod - def from_arrays(names, arrays): + def from_arrays(arrays, names): """ Construct a RecordBatch from multiple pyarrow.Arrays Parameters ---------- - names: list of str - Labels for the columns arrays: list of pyarrow.Array column-wise data vectors + names: list of str + Labels for the columns Returns ------- @@ -594,20 +613,20 @@ cdef class Table: names, arrays = _dataframe_to_arrays(df, name=name, timestamps_to_ms=timestamps_to_ms, schema=schema) - return cls.from_arrays(names, arrays, name=name) + return cls.from_arrays(arrays, names=names, name=name) @staticmethod - def from_arrays(names, arrays, name=None): + def from_arrays(arrays, names=None, name=None): """ - Construct a Table from Arrow Arrays + Construct a Table from Arrow arrays or columns Parameters ---------- - - names: list of str - Names for the table columns - arrays: list of pyarrow.array.Array + arrays: list of pyarrow.Array or pyarrow.Column Equal-length arrays that should form the table. + names: list of str, optional + Names for the table columns. If Columns passed, will be + inferred. If Arrays passed, this argument is required name: str, optional name for the Table @@ -617,7 +636,6 @@ cdef class Table: """ cdef: - Array arr c_string c_name vector[shared_ptr[CField]] fields vector[shared_ptr[CColumn]] columns @@ -628,9 +646,15 @@ cdef class Table: cdef int K = len(arrays) columns.resize(K) + for i in range(K): - arr = arrays[i] - columns[i].reset(new CColumn(schema.get().field(i), arr.sp_array)) + if isinstance(arrays[i], Array): + columns[i].reset(new CColumn(schema.get().field(i), + (<Array> arrays[i]).sp_array)) + elif isinstance(arrays[i], Column): + columns[i] = (<Column> arrays[i]).sp_column + else: + raise ValueError(type(arrays[i])) if name is None: c_name = '' http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_column.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_column.py b/python/pyarrow/tests/test_column.py deleted file mode 100644 index 1a507c8..0000000 --- a/python/pyarrow/tests/test_column.py +++ /dev/null @@ -1,49 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from pyarrow.compat import unittest -import pyarrow as arrow - -A = arrow - -import pandas as pd - - -class TestColumn(unittest.TestCase): - - def test_basics(self): - data = [ - A.from_pylist([-10, -5, 0, 5, 10]) - ] - table = A.Table.from_arrays(('a'), data, 'table_name') - column = table.column(0) - assert column.name == 'a' - assert column.length() == 5 - assert len(column) == 5 - assert column.shape == (5,) - assert column.to_pylist() == [-10, -5, 0, 5, 10] - - def test_pandas(self): - data = [ - A.from_pylist([-10, -5, 0, 5, 10]) - ] - table = A.Table.from_arrays(('a'), data, 'table_name') - column = table.column(0) - series = column.to_pandas() - assert series.name == 'a' - assert series.shape == (5,) - assert series.iloc[0] == -10 http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_convert_builtin.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_convert_builtin.py b/python/pyarrow/tests/test_convert_builtin.py index 72e4389..c06d18d 100644 --- a/python/pyarrow/tests/test_convert_builtin.py +++ b/python/pyarrow/tests/test_convert_builtin.py @@ -16,11 +16,12 @@ # specific language governing permissions and limitations # under the License. -from pyarrow.compat import unittest, u +from pyarrow.compat import unittest, u # noqa import pyarrow import datetime + class TestConvertList(unittest.TestCase): def test_boolean(self): http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_convert_pandas.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py index a2f5062..30705c4 100644 --- a/python/pyarrow/tests/test_convert_pandas.py +++ b/python/pyarrow/tests/test_convert_pandas.py @@ -74,7 +74,7 @@ class TestPandasConversion(unittest.TestCase): tm.assert_frame_equal(result, expected) def _check_array_roundtrip(self, values, expected=None, - timestamps_to_ms=False, field=None): + timestamps_to_ms=False, field=None): arr = A.Array.from_pandas(values, timestamps_to_ms=timestamps_to_ms, field=field) result = arr.to_pandas() @@ -118,7 +118,7 @@ class TestPandasConversion(unittest.TestCase): ex_frame = pd.DataFrame(dict(zip(names, expected_cols)), columns=names) - table = A.Table.from_arrays(names, arrays) + table = A.Table.from_arrays(arrays, names) assert table.schema.equals(A.Schema.from_fields(fields)) result = table.to_pandas() tm.assert_frame_equal(result, ex_frame) @@ -169,7 +169,7 @@ class TestPandasConversion(unittest.TestCase): ex_frame = pd.DataFrame(dict(zip(int_dtypes, expected_cols)), columns=int_dtypes) - table = A.Table.from_arrays(int_dtypes, arrays) + table = A.Table.from_arrays(arrays, int_dtypes) result = table.to_pandas() tm.assert_frame_equal(result, ex_frame) @@ -201,7 +201,7 @@ class TestPandasConversion(unittest.TestCase): schema = A.Schema.from_fields([field]) ex_frame = pd.DataFrame({'bools': expected}) - table = A.Table.from_arrays(['bools'], [arr]) + table = A.Table.from_arrays([arr], ['bools']) assert table.schema.equals(schema) result = table.to_pandas() http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_hdfs.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_hdfs.py b/python/pyarrow/tests/test_hdfs.py index 2056f7a..cb24adb 100644 --- a/python/pyarrow/tests/test_hdfs.py +++ b/python/pyarrow/tests/test_hdfs.py @@ -21,9 +21,16 @@ import os import random import unittest +import numpy as np +import pandas.util.testing as pdt import pytest +from pyarrow.compat import guid +from pyarrow.filesystem import HdfsClient import pyarrow.io as io +import pyarrow as pa + +import pyarrow.tests.test_parquet as test_parquet # ---------------------------------------------------------------------- # HDFS tests @@ -38,7 +45,7 @@ def hdfs_test_client(driver='libhdfs'): raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not ' 'an integer') - return io.HdfsClient(host, port, user, driver=driver) + return HdfsClient(host, port, user, driver=driver) class HdfsTestCases(object): @@ -138,6 +145,43 @@ class HdfsTestCases(object): assert result == data + @test_parquet.parquet + def test_hdfs_read_multiple_parquet_files(self): + import pyarrow.parquet as pq + + nfiles = 10 + size = 5 + + tmpdir = pjoin(self.tmp_path, 'multi-parquet-' + guid()) + + self.hdfs.mkdir(tmpdir) + + test_data = [] + paths = [] + for i in range(nfiles): + df = test_parquet._test_dataframe(size, seed=i) + + df['index'] = np.arange(i * size, (i + 1) * size) + + # Hack so that we don't have a dtype cast in v1 files + df['uint32'] = df['uint32'].astype(np.int64) + + path = pjoin(tmpdir, '{0}.parquet'.format(i)) + + table = pa.Table.from_pandas(df) + with self.hdfs.open(path, 'wb') as f: + pq.write_table(table, f) + + test_data.append(table) + paths.append(path) + + result = self.hdfs.read_parquet(tmpdir) + expected = pa.concat_tables(test_data) + + pdt.assert_frame_equal(result.to_pandas() + .sort_values(by='index').reset_index(drop=True), + expected.to_pandas()) + class TestLibHdfs(HdfsTestCases, unittest.TestCase): http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 9cf860a..a94fe45 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -15,10 +15,13 @@ # specific language governing permissions and limitations # under the License. +from os.path import join as pjoin import io +import os import pytest -import pyarrow as A +from pyarrow.compat import guid +import pyarrow as pa import pyarrow.io as paio import numpy as np @@ -42,9 +45,9 @@ def test_single_pylist_column_roundtrip(tmpdir): for dtype in [int, float]: filename = tmpdir.join('single_{}_column.parquet' .format(dtype.__name__)) - data = [A.from_pylist(list(map(dtype, range(5))))] - table = A.Table.from_arrays(('a', 'b'), data, 'table_name') - A.parquet.write_table(table, filename.strpath) + data = [pa.from_pylist(list(map(dtype, range(5))))] + table = pa.Table.from_arrays(data, names=('a', 'b'), name='table_name') + pq.write_table(table, filename.strpath) table_read = pq.read_table(filename.strpath) for col_written, col_read in zip(table.itercolumns(), table_read.itercolumns()): @@ -85,8 +88,8 @@ def test_pandas_parquet_2_0_rountrip(tmpdir): df = alltypes_sample(size=10000) filename = tmpdir.join('pandas_rountrip.parquet') - arrow_table = A.Table.from_pandas(df, timestamps_to_ms=True) - A.parquet.write_table(arrow_table, filename.strpath, version="2.0") + arrow_table = pa.Table.from_pandas(df, timestamps_to_ms=True) + pq.write_table(arrow_table, filename.strpath, version="2.0") table_read = pq.read_table(filename.strpath) df_read = table_read.to_pandas() pdt.assert_frame_equal(df, df_read) @@ -113,8 +116,8 @@ def test_pandas_parquet_1_0_rountrip(tmpdir): 'empty_str': [''] * size }) filename = tmpdir.join('pandas_rountrip.parquet') - arrow_table = A.Table.from_pandas(df) - A.parquet.write_table(arrow_table, filename.strpath, version="1.0") + arrow_table = pa.Table.from_pandas(df) + pq.write_table(arrow_table, filename.strpath, version="1.0") table_read = pq.read_table(filename.strpath) df_read = table_read.to_pandas() @@ -133,28 +136,39 @@ def test_pandas_column_selection(tmpdir): 'uint16': np.arange(size, dtype=np.uint16) }) filename = tmpdir.join('pandas_rountrip.parquet') - arrow_table = A.Table.from_pandas(df) - A.parquet.write_table(arrow_table, filename.strpath) + arrow_table = pa.Table.from_pandas(df) + pq.write_table(arrow_table, filename.strpath) table_read = pq.read_table(filename.strpath, columns=['uint8']) df_read = table_read.to_pandas() pdt.assert_frame_equal(df[['uint8']], df_read) -def _test_dataframe(size=10000): - np.random.seed(0) +def _random_integers(size, dtype): + # We do not generate integers outside the int64 range + i64_info = np.iinfo('int64') + iinfo = np.iinfo(dtype) + return np.random.randint(max(iinfo.min, i64_info.min), + min(iinfo.max, i64_info.max), + size=size).astype(dtype) + + +def _test_dataframe(size=10000, seed=0): + np.random.seed(seed) df = pd.DataFrame({ - 'uint8': np.arange(size, dtype=np.uint8), - 'uint16': np.arange(size, dtype=np.uint16), - 'uint32': np.arange(size, dtype=np.uint32), - 'uint64': np.arange(size, dtype=np.uint64), - 'int8': np.arange(size, dtype=np.int16), - 'int16': np.arange(size, dtype=np.int16), - 'int32': np.arange(size, dtype=np.int32), - 'int64': np.arange(size, dtype=np.int64), - 'float32': np.arange(size, dtype=np.float32), + 'uint8': _random_integers(size, np.uint8), + 'uint16': _random_integers(size, np.uint16), + 'uint32': _random_integers(size, np.uint32), + 'uint64': _random_integers(size, np.uint64), + 'int8': _random_integers(size, np.int8), + 'int16': _random_integers(size, np.int16), + 'int32': _random_integers(size, np.int32), + 'int64': _random_integers(size, np.int64), + 'float32': np.random.randn(size).astype(np.float32), + 'float64': np.random.randn(size), 'float64': np.arange(size, dtype=np.float64), - 'bool': np.random.randn(size) > 0 + 'bool': np.random.randn(size) > 0, + 'strings': [pdt.rands(10) for i in range(size)] }) return df @@ -162,7 +176,7 @@ def _test_dataframe(size=10000): @parquet def test_pandas_parquet_native_file_roundtrip(tmpdir): df = _test_dataframe(10000) - arrow_table = A.Table.from_pandas(df) + arrow_table = pa.Table.from_pandas(df) imos = paio.InMemoryOutputStream() pq.write_table(arrow_table, imos, version="2.0") buf = imos.get_result() @@ -183,10 +197,10 @@ def test_pandas_parquet_pyfile_roundtrip(tmpdir): 'strings': ['foo', 'bar', None, 'baz', 'qux'] }) - arrow_table = A.Table.from_pandas(df) + arrow_table = pa.Table.from_pandas(df) with open(filename, 'wb') as f: - A.parquet.write_table(arrow_table, f, version="1.0") + pq.write_table(arrow_table, f, version="1.0") data = io.BytesIO(open(filename, 'rb').read()) @@ -213,31 +227,27 @@ def test_pandas_parquet_configuration_options(tmpdir): 'bool': np.random.randn(size) > 0 }) filename = tmpdir.join('pandas_rountrip.parquet') - arrow_table = A.Table.from_pandas(df) + arrow_table = pa.Table.from_pandas(df) for use_dictionary in [True, False]: - A.parquet.write_table( - arrow_table, - filename.strpath, - version="2.0", - use_dictionary=use_dictionary) + pq.write_table(arrow_table, filename.strpath, + version="2.0", + use_dictionary=use_dictionary) table_read = pq.read_table(filename.strpath) df_read = table_read.to_pandas() pdt.assert_frame_equal(df, df_read) for compression in ['NONE', 'SNAPPY', 'GZIP']: - A.parquet.write_table( - arrow_table, - filename.strpath, - version="2.0", - compression=compression) + pq.write_table(arrow_table, filename.strpath, + version="2.0", + compression=compression) table_read = pq.read_table(filename.strpath) df_read = table_read.to_pandas() pdt.assert_frame_equal(df, df_read) def make_sample_file(df): - a_table = A.Table.from_pandas(df, timestamps_to_ms=True) + a_table = pa.Table.from_pandas(df, timestamps_to_ms=True) buf = io.BytesIO() pq.write_table(a_table, buf, compression='SNAPPY', version='2.0') @@ -315,7 +325,7 @@ def test_pass_separate_metadata(): # ARROW-471 df = alltypes_sample(size=10000) - a_table = A.Table.from_pandas(df, timestamps_to_ms=True) + a_table = pa.Table.from_pandas(df, timestamps_to_ms=True) buf = io.BytesIO() pq.write_table(a_table, buf, compression='snappy', version='2.0') @@ -328,3 +338,72 @@ def test_pass_separate_metadata(): fileh = pq.ParquetFile(buf, metadata=metadata) pdt.assert_frame_equal(df, fileh.read().to_pandas()) + + +@parquet +def test_read_multiple_files(tmpdir): + nfiles = 10 + size = 5 + + dirpath = tmpdir.join(guid()).strpath + os.mkdir(dirpath) + + test_data = [] + paths = [] + for i in range(nfiles): + df = _test_dataframe(size, seed=i) + + # Hack so that we don't have a dtype cast in v1 files + df['uint32'] = df['uint32'].astype(np.int64) + + path = pjoin(dirpath, '{0}.parquet'.format(i)) + + table = pa.Table.from_pandas(df) + pq.write_table(table, path) + + test_data.append(table) + paths.append(path) + + result = pq.read_multiple_files(paths) + expected = pa.concat_tables(test_data) + + assert result.equals(expected) + + # Read with provided metadata + metadata = pq.ParquetFile(paths[0]).metadata + + result2 = pq.read_multiple_files(paths, metadata=metadata) + assert result2.equals(expected) + + result3 = pa.localfs.read_parquet(dirpath, schema=metadata.schema) + assert result3.equals(expected) + + # Read column subset + to_read = [result[0], result[3], result[6]] + result = pa.localfs.read_parquet( + dirpath, columns=[c.name for c in to_read]) + expected = pa.Table.from_arrays(to_read) + assert result.equals(expected) + + # Test failure modes with non-uniform metadata + bad_apple = _test_dataframe(size, seed=i).iloc[:, :4] + bad_apple_path = tmpdir.join('{0}.parquet'.format(guid())).strpath + + t = pa.Table.from_pandas(bad_apple) + pq.write_table(t, bad_apple_path) + + bad_meta = pq.ParquetFile(bad_apple_path).metadata + + with pytest.raises(ValueError): + pq.read_multiple_files(paths + [bad_apple_path]) + + with pytest.raises(ValueError): + pq.read_multiple_files(paths, metadata=bad_meta) + + mixed_paths = [bad_apple_path, paths[0]] + + with pytest.raises(ValueError): + pq.read_multiple_files(mixed_paths, schema=bad_meta.schema) + + with pytest.raises(ValueError): + pq.read_multiple_files(mixed_paths) http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_scalars.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_scalars.py b/python/pyarrow/tests/test_scalars.py index 62e51f8..ef600a0 100644 --- a/python/pyarrow/tests/test_scalars.py +++ b/python/pyarrow/tests/test_scalars.py @@ -32,7 +32,7 @@ class TestScalars(unittest.TestCase): v = arr[0] assert isinstance(v, A.BooleanValue) assert repr(v) == "True" - assert v.as_py() == True + assert v.as_py() is True assert arr[1] is A.NA http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_schema.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_schema.py b/python/pyarrow/tests/test_schema.py index 4aa8112..507ebb8 100644 --- a/python/pyarrow/tests/test_schema.py +++ b/python/pyarrow/tests/test_schema.py @@ -85,4 +85,3 @@ baz: list<item: int8>""" del fields[-1] sch3 = A.schema(fields) assert not sch1.equals(sch3) - http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_table.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 6f00c73..d49b33c 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -21,16 +21,43 @@ from pandas.util.testing import assert_frame_equal import pandas as pd import pytest +from pyarrow.compat import unittest import pyarrow as pa +class TestColumn(unittest.TestCase): + + def test_basics(self): + data = [ + pa.from_pylist([-10, -5, 0, 5, 10]) + ] + table = pa.Table.from_arrays(data, names=['a'], name='table_name') + column = table.column(0) + assert column.name == 'a' + assert column.length() == 5 + assert len(column) == 5 + assert column.shape == (5,) + assert column.to_pylist() == [-10, -5, 0, 5, 10] + + def test_pandas(self): + data = [ + pa.from_pylist([-10, -5, 0, 5, 10]) + ] + table = pa.Table.from_arrays(data, names=['a'], name='table_name') + column = table.column(0) + series = column.to_pandas() + assert series.name == 'a' + assert series.shape == (5,) + assert series.iloc[0] == -10 + + def test_recordbatch_basics(): data = [ pa.from_pylist(range(5)), pa.from_pylist([-10, -5, 0, 5, 10]) ] - batch = pa.RecordBatch.from_arrays(['c0', 'c1'], data) + batch = pa.RecordBatch.from_arrays(data, ['c0', 'c1']) assert len(batch) == 5 assert batch.num_rows == 5 @@ -95,7 +122,7 @@ def test_table_basics(): pa.from_pylist(range(5)), pa.from_pylist([-10, -5, 0, 5, 10]) ] - table = pa.Table.from_arrays(('a', 'b'), data, 'table_name') + table = pa.Table.from_arrays(data, names=('a', 'b'), name='table_name') assert table.name == 'table_name' assert len(table) == 5 assert table.num_rows == 5 @@ -121,19 +148,19 @@ def test_concat_tables(): [1., 2., 3., 4., 5.] ] - t1 = pa.Table.from_arrays(('a', 'b'), [pa.from_pylist(x) - for x in data], 'table_name') - t2 = pa.Table.from_arrays(('a', 'b'), [pa.from_pylist(x) - for x in data2], 'table_name') + t1 = pa.Table.from_arrays([pa.from_pylist(x) for x in data], + names=('a', 'b'), name='table_name') + t2 = pa.Table.from_arrays([pa.from_pylist(x) for x in data2], + names=('a', 'b'), name='table_name') result = pa.concat_tables([t1, t2], output_name='foo') assert result.name == 'foo' assert len(result) == 10 - expected = pa.Table.from_arrays( - ('a', 'b'), [pa.from_pylist(x + y) - for x, y in zip(data, data2)], - 'foo') + expected = pa.Table.from_arrays([pa.from_pylist(x + y) + for x, y in zip(data, data2)], + names=('a', 'b'), + name='foo') assert result.equals(expected) @@ -143,7 +170,8 @@ def test_table_pandas(): pa.from_pylist(range(5)), pa.from_pylist([-10, -5, 0, 5, 10]) ] - table = pa.Table.from_arrays(('a', 'b'), data, 'table_name') + table = pa.Table.from_arrays(data, names=('a', 'b'), + name='table_name') # TODO: Use this part once from_pandas is implemented # data = {'a': range(5), 'b': [-10, -5, 0, 5, 10]} http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/util.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/util.py b/python/pyarrow/util.py new file mode 100644 index 0000000..4b6a835 --- /dev/null +++ b/python/pyarrow/util.py @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Miscellaneous utility code + + +def implements(f): + def decorator(g): + g.__doc__ = f.__doc__ + return g + return decorator
