Repository: arrow Updated Branches: refs/heads/master fd000964d -> 31a1f53f4
ARROW-710: [Python] Read/write with file-like Python objects from read_feather/write_feather cc @jreback Author: Wes McKinney <[email protected]> Closes #474 from wesm/ARROW-710 and squashes the following commits: 61d7218 [Wes McKinney] Do not close OutputStream in Feather writer. Read and write to file-like Python objects Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/31a1f53f Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/31a1f53f Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/31a1f53f Branch: refs/heads/master Commit: 31a1f53f4990d07a337ea0b000e04df2917b6d73 Parents: fd00096 Author: Wes McKinney <[email protected]> Authored: Sat Apr 1 11:19:40 2017 -0400 Committer: Wes McKinney <[email protected]> Committed: Sat Apr 1 11:19:40 2017 -0400 ---------------------------------------------------------------------- cpp/src/arrow/ipc/feather-test.cc | 3 +- cpp/src/arrow/ipc/feather.cc | 25 +--- cpp/src/arrow/ipc/feather.h | 7 +- python/CMakeLists.txt | 1 - python/pyarrow/_feather.pyx | 158 -------------------------- python/pyarrow/feather.py | 14 +-- python/pyarrow/includes/libarrow_ipc.pxd | 31 ++++- python/pyarrow/io.pyx | 101 +++++++++++++++- python/pyarrow/tests/test_feather.py | 17 ++- python/setup.py | 1 - 10 files changed, 160 insertions(+), 198 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/cpp/src/arrow/ipc/feather-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/feather-test.cc b/cpp/src/arrow/ipc/feather-test.cc index e181f69..077a44b 100644 --- a/cpp/src/arrow/ipc/feather-test.cc +++ b/cpp/src/arrow/ipc/feather-test.cc @@ -272,8 +272,7 @@ class TestTableWriter : public ::testing::Test { ASSERT_OK(stream_->Finish(&output_)); std::shared_ptr<io::BufferReader> buffer(new io::BufferReader(output_)); - reader_.reset(new TableReader()); - ASSERT_OK(reader_->Open(buffer)); + ASSERT_OK(TableReader::Open(buffer, &reader_)); } void CheckBatch(const RecordBatch& batch) { http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/cpp/src/arrow/ipc/feather.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc index 5820563..e838e1f 100644 --- a/cpp/src/arrow/ipc/feather.cc +++ b/cpp/src/arrow/ipc/feather.cc @@ -401,16 +401,10 @@ TableReader::TableReader() { TableReader::~TableReader() {} -Status TableReader::Open(const std::shared_ptr<io::RandomAccessFile>& source) { - return impl_->Open(source); -} - -Status TableReader::OpenFile( - const std::string& abspath, std::unique_ptr<TableReader>* out) { - std::shared_ptr<io::MemoryMappedFile> file; - RETURN_NOT_OK(io::MemoryMappedFile::Open(abspath, io::FileMode::READ, &file)); +Status TableReader::Open(const std::shared_ptr<io::RandomAccessFile>& source, + std::unique_ptr<TableReader>* out) { out->reset(new TableReader()); - return (*out)->Open(file); + return (*out)->impl_->Open(source); } bool TableReader::HasDescription() const { @@ -517,9 +511,8 @@ class TableWriter::TableWriterImpl : public ArrayVisitor { // Footer: metadata length, magic bytes RETURN_NOT_OK( stream_->Write(reinterpret_cast<const uint8_t*>(&buffer_size), sizeof(uint32_t))); - RETURN_NOT_OK(stream_->Write(reinterpret_cast<const uint8_t*>(kFeatherMagicBytes), - strlen(kFeatherMagicBytes))); - return stream_->Close(); + return stream_->Write( + reinterpret_cast<const uint8_t*>(kFeatherMagicBytes), strlen(kFeatherMagicBytes)); } Status LoadArrayMetadata(const Array& values, ArrayMetadata* meta) { @@ -700,14 +693,6 @@ Status TableWriter::Open( return (*out)->impl_->Open(stream); } -Status TableWriter::OpenFile( - const std::string& abspath, std::unique_ptr<TableWriter>* out) { - std::shared_ptr<io::FileOutputStream> file; - RETURN_NOT_OK(io::FileOutputStream::Open(abspath, &file)); - out->reset(new TableWriter()); - return (*out)->impl_->Open(file); -} - void TableWriter::SetDescription(const std::string& desc) { impl_->SetDescription(desc); } http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/cpp/src/arrow/ipc/feather.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/feather.h b/cpp/src/arrow/ipc/feather.h index 1e4ba58..8cc8ca0 100644 --- a/cpp/src/arrow/ipc/feather.h +++ b/cpp/src/arrow/ipc/feather.h @@ -54,9 +54,8 @@ class ARROW_EXPORT TableReader { TableReader(); ~TableReader(); - Status Open(const std::shared_ptr<io::RandomAccessFile>& source); - - static Status OpenFile(const std::string& abspath, std::unique_ptr<TableReader>* out); + static Status Open(const std::shared_ptr<io::RandomAccessFile>& source, + std::unique_ptr<TableReader>* out); // Optional table description // @@ -86,8 +85,6 @@ class ARROW_EXPORT TableWriter { static Status Open( const std::shared_ptr<io::OutputStream>& stream, std::unique_ptr<TableWriter>* out); - static Status OpenFile(const std::string& abspath, std::unique_ptr<TableWriter>* out); - void SetDescription(const std::string& desc); void SetNumRows(int64_t num_rows); http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 35a1a89..f315d01 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -268,7 +268,6 @@ set(CYTHON_EXTENSIONS config error io - _feather memory scalar schema http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/pyarrow/_feather.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/_feather.pyx b/python/pyarrow/_feather.pyx deleted file mode 100644 index beb4aaa..0000000 --- a/python/pyarrow/_feather.pyx +++ /dev/null @@ -1,158 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# cython: profile=False -# distutils: language = c++ -# cython: embedsignature = True - -from cython.operator cimport dereference as deref - -from pyarrow.includes.common cimport * -from pyarrow.includes.libarrow cimport CArray, CColumn, CSchema, CStatus -from pyarrow.includes.libarrow_io cimport RandomAccessFile, OutputStream - -from libcpp.string cimport string -from libcpp cimport bool as c_bool - -cimport cpython - -from pyarrow.compat import frombytes, tobytes, encode_file_path - -from pyarrow.array cimport Array -from pyarrow.error cimport check_status -from pyarrow.table cimport Column - -cdef extern from "arrow/ipc/feather.h" namespace "arrow::ipc::feather" nogil: - - cdef cppclass TableWriter: - @staticmethod - CStatus Open(const shared_ptr[OutputStream]& stream, - unique_ptr[TableWriter]* out) - - @staticmethod - CStatus OpenFile(const string& abspath, unique_ptr[TableWriter]* out) - - void SetDescription(const string& desc) - void SetNumRows(int64_t num_rows) - - CStatus Append(const string& name, const CArray& values) - CStatus Finalize() - - cdef cppclass TableReader: - TableReader(const shared_ptr[RandomAccessFile]& source) - - @staticmethod - CStatus OpenFile(const string& abspath, unique_ptr[TableReader]* out) - - string GetDescription() - c_bool HasDescription() - - int64_t num_rows() - int64_t num_columns() - - shared_ptr[CSchema] schema() - - CStatus GetColumn(int i, shared_ptr[CColumn]* out) - c_string GetColumnName(int i) - - -class FeatherError(Exception): - pass - - -cdef class FeatherWriter: - cdef: - unique_ptr[TableWriter] writer - - cdef public: - int64_t num_rows - - def __cinit__(self): - self.num_rows = -1 - - def open(self, object dest): - cdef: - string c_name = encode_file_path(dest) - - check_status(TableWriter.OpenFile(c_name, &self.writer)) - - def close(self): - if self.num_rows < 0: - self.num_rows = 0 - self.writer.get().SetNumRows(self.num_rows) - check_status(self.writer.get().Finalize()) - - def write_array(self, object name, object col, object mask=None): - cdef Array arr - - if self.num_rows >= 0: - if len(col) != self.num_rows: - raise ValueError('prior column had a different number of rows') - else: - self.num_rows = len(col) - - if isinstance(col, Array): - arr = col - else: - arr = Array.from_pandas(col, mask=mask) - - cdef c_string c_name = tobytes(name) - - with nogil: - check_status( - self.writer.get().Append(c_name, deref(arr.sp_array))) - - -cdef class FeatherReader: - cdef: - unique_ptr[TableReader] reader - - def __cinit__(self): - pass - - def open(self, source): - cdef: - string c_name = encode_file_path(source) - - check_status(TableReader.OpenFile(c_name, &self.reader)) - - property num_rows: - - def __get__(self): - return self.reader.get().num_rows() - - property num_columns: - - def __get__(self): - return self.reader.get().num_columns() - - def get_column_name(self, int i): - cdef c_string name = self.reader.get().GetColumnName(i) - return frombytes(name) - - def get_column(self, int i): - if i < 0 or i >= self.num_columns: - raise IndexError(i) - - cdef shared_ptr[CColumn] sp_column - with nogil: - check_status(self.reader.get() - .GetColumn(i, &sp_column)) - - cdef Column col = Column() - col.init(sp_column) - return col http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/pyarrow/feather.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/feather.py b/python/pyarrow/feather.py index 28424af..f87c7f3 100644 --- a/python/pyarrow/feather.py +++ b/python/pyarrow/feather.py @@ -20,9 +20,9 @@ from distutils.version import LooseVersion import pandas as pd from pyarrow.compat import pdapi -from pyarrow._feather import FeatherError # noqa +from pyarrow.io import FeatherError # noqa from pyarrow.table import Table -import pyarrow._feather as ext +import pyarrow.io as ext if LooseVersion(pd.__version__) < '0.17.0': @@ -54,12 +54,12 @@ class FeatherReader(ext.FeatherReader): return table.to_pandas() -def write_feather(df, path): +def write_feather(df, dest): ''' Write a pandas.DataFrame to Feather format ''' writer = ext.FeatherWriter() - writer.open(path) + writer.open(dest) if isinstance(df, pd.SparseDataFrame): df = df.to_dense() @@ -95,13 +95,13 @@ def write_feather(df, path): writer.close() -def read_feather(path, columns=None): +def read_feather(source, columns=None): """ Read a pandas.DataFrame from Feather format Parameters ---------- - path : string, path to read from + source : string file path, or file-like object columns : sequence, optional Only read a specific set of columns. If not provided, all columns are read @@ -110,5 +110,5 @@ def read_feather(path, columns=None): ------- df : pandas.DataFrame """ - reader = FeatherReader(path) + reader = FeatherReader(source) return reader.read(columns=columns) http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/pyarrow/includes/libarrow_ipc.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow_ipc.pxd b/python/pyarrow/includes/libarrow_ipc.pxd index 8b7d705..59fd90b 100644 --- a/python/pyarrow/includes/libarrow_ipc.pxd +++ b/python/pyarrow/includes/libarrow_ipc.pxd @@ -18,7 +18,7 @@ # distutils: language = c++ from pyarrow.includes.common cimport * -from pyarrow.includes.libarrow cimport (CArray, CSchema, CRecordBatch) +from pyarrow.includes.libarrow cimport (CArray, CColumn, CSchema, CRecordBatch) from pyarrow.includes.libarrow_io cimport (InputStream, OutputStream, RandomAccessFile) @@ -63,3 +63,32 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: int num_record_batches() CStatus GetRecordBatch(int i, shared_ptr[CRecordBatch]* batch) + +cdef extern from "arrow/ipc/feather.h" namespace "arrow::ipc::feather" nogil: + + cdef cppclass CFeatherWriter" arrow::ipc::feather::TableWriter": + @staticmethod + CStatus Open(const shared_ptr[OutputStream]& stream, + unique_ptr[CFeatherWriter]* out) + + void SetDescription(const c_string& desc) + void SetNumRows(int64_t num_rows) + + CStatus Append(const c_string& name, const CArray& values) + CStatus Finalize() + + cdef cppclass CFeatherReader" arrow::ipc::feather::TableReader": + @staticmethod + CStatus Open(const shared_ptr[RandomAccessFile]& file, + unique_ptr[CFeatherReader]* out) + + c_string GetDescription() + c_bool HasDescription() + + int64_t num_rows() + int64_t num_columns() + + shared_ptr[CSchema] schema() + + CStatus GetColumn(int i, shared_ptr[CColumn]* out) + c_string GetColumnName(int i) http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/pyarrow/io.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index d64427a..0b27379 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -32,10 +32,11 @@ from pyarrow.includes.libarrow_ipc cimport * cimport pyarrow.includes.pyarrow as pyarrow from pyarrow.compat import frombytes, tobytes, encode_file_path +from pyarrow.array cimport Array from pyarrow.error cimport check_status from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool from pyarrow.schema cimport Schema -from pyarrow.table cimport (RecordBatch, batch_from_cbatch, +from pyarrow.table cimport (Column, RecordBatch, batch_from_cbatch, table_from_ctable) cimport cpython as cp @@ -564,7 +565,9 @@ cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader): cdef get_writer(object source, shared_ptr[OutputStream]* writer): cdef NativeFile nf - if not isinstance(source, NativeFile) and hasattr(source, 'write'): + if isinstance(source, six.string_types): + source = OSFile(source, mode='w') + elif not isinstance(source, NativeFile) and hasattr(source, 'write'): # Optimistically hope this is file-like source = PythonFileInterface(source, mode='w') @@ -1047,3 +1050,97 @@ cdef class _FileReader: check_status(CTable.FromRecordBatches(batches, &table)) return table_from_ctable(table) + + +#---------------------------------------------------------------------- +# Implement legacy Feather file format + + +class FeatherError(Exception): + pass + + +cdef class FeatherWriter: + cdef: + unique_ptr[CFeatherWriter] writer + + cdef public: + int64_t num_rows + + def __cinit__(self): + self.num_rows = -1 + + def open(self, object dest): + cdef shared_ptr[OutputStream] sink + get_writer(dest, &sink) + + with nogil: + check_status(CFeatherWriter.Open(sink, &self.writer)) + + def close(self): + if self.num_rows < 0: + self.num_rows = 0 + self.writer.get().SetNumRows(self.num_rows) + check_status(self.writer.get().Finalize()) + + def write_array(self, object name, object col, object mask=None): + cdef Array arr + + if self.num_rows >= 0: + if len(col) != self.num_rows: + raise ValueError('prior column had a different number of rows') + else: + self.num_rows = len(col) + + if isinstance(col, Array): + arr = col + else: + arr = Array.from_pandas(col, mask=mask) + + cdef c_string c_name = tobytes(name) + + with nogil: + check_status( + self.writer.get().Append(c_name, deref(arr.sp_array))) + + +cdef class FeatherReader: + cdef: + unique_ptr[CFeatherReader] reader + + def __cinit__(self): + pass + + def open(self, source): + cdef shared_ptr[RandomAccessFile] reader + get_reader(source, &reader) + + with nogil: + check_status(CFeatherReader.Open(reader, &self.reader)) + + property num_rows: + + def __get__(self): + return self.reader.get().num_rows() + + property num_columns: + + def __get__(self): + return self.reader.get().num_columns() + + def get_column_name(self, int i): + cdef c_string name = self.reader.get().GetColumnName(i) + return frombytes(name) + + def get_column(self, int i): + if i < 0 or i >= self.num_columns: + raise IndexError(i) + + cdef shared_ptr[CColumn] sp_column + with nogil: + check_status(self.reader.get() + .GetColumn(i, &sp_column)) + + cdef Column col = Column() + col.init(sp_column) + return col http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/pyarrow/tests/test_feather.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_feather.py b/python/pyarrow/tests/test_feather.py index e4b6273..dd6888f 100644 --- a/python/pyarrow/tests/test_feather.py +++ b/python/pyarrow/tests/test_feather.py @@ -27,7 +27,7 @@ import pyarrow as pa from pyarrow.compat import guid from pyarrow.feather import (read_feather, write_feather, FeatherReader) -from pyarrow._feather import FeatherWriter +from pyarrow.io import FeatherWriter def random_path(): @@ -347,6 +347,21 @@ class TestFeatherReader(unittest.TestCase): df = pd.DataFrame({'ints': values[0: num_values//2]}) self._check_pandas_roundtrip(df, path=path) + def test_filelike_objects(self): + from io import BytesIO + + buf = BytesIO() + + # the copy makes it non-strided + df = pd.DataFrame(np.arange(12).reshape(4, 3), + columns=['a', 'b', 'c']).copy() + write_feather(df, buf) + + buf.seek(0) + + result = read_feather(buf) + assert_frame_equal(result, df) + def test_sparse_dataframe(self): # GH #221 data = {'A': [0, 1, 2], http://git-wip-us.apache.org/repos/asf/arrow/blob/31a1f53f/python/setup.py ---------------------------------------------------------------------- diff --git a/python/setup.py b/python/setup.py index 9ff0918..12b44e1 100644 --- a/python/setup.py +++ b/python/setup.py @@ -104,7 +104,6 @@ class build_ext(_build_ext): 'io', 'jemalloc', 'memory', - '_feather', '_parquet', 'scalar', 'schema',
