Repository: arrow Updated Branches: refs/heads/master a73252d0e -> 362e754b3
ARROW-1103: [Python] Support read_pandas (with index metadata) on directory of Parquet files Also fixes ARROW-1041, a case where the `_metadata` file contains the pandas schema metadata but the individual dataset fragments do not. Author: Wes McKinney <[email protected]> Closes #862 from wesm/ARROW-1103 and squashes the following commits: 3f309166 [Wes McKinney] Add test for esoteric case where _metadata has pandas metadata but the individual Parquet dataset pieces don't 5985fc13 [Wes McKinney] Add experimental replace_schema_metadata functions, get basic read_pandas with metadata working on a multi-file dataset b362d60c [Wes McKinney] Initial refactor to support common metadata, read_pandas on a dataset Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/362e754b Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/362e754b Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/362e754b Branch: refs/heads/master Commit: 362e754b395787fd935d0b66a0cdcbec8aa13f85 Parents: a73252d Author: Wes McKinney <[email protected]> Authored: Tue Jul 18 13:52:17 2017 -0400 Committer: Wes McKinney <[email protected]> Committed: Tue Jul 18 13:52:17 2017 -0400 ---------------------------------------------------------------------- cpp/src/arrow/builder.cc | 6 +- cpp/src/arrow/ipc/metadata.h | 7 ++ cpp/src/arrow/python/builtin_convert.cc | 9 +- cpp/src/arrow/python/pandas_convert.cc | 20 ++-- cpp/src/arrow/table.cc | 16 +++ cpp/src/arrow/table.h | 68 +++++++---- cpp/src/arrow/type.cc | 14 ++- cpp/src/arrow/type.h | 30 ++++- cpp/src/arrow/util/logging.h | 3 +- cpp/src/plasma/protocol.cc | 4 +- python/pyarrow/_parquet.pyx | 10 +- python/pyarrow/filesystem.py | 8 +- python/pyarrow/includes/libarrow.pxd | 14 ++- python/pyarrow/parquet.py | 161 ++++++++++++++++++--------- python/pyarrow/table.pxi | 48 ++++++++ python/pyarrow/tests/test_parquet.py | 85 +++++++++++++- python/pyarrow/types.pxi | 4 +- 17 files changed, 390 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/362e754b/cpp/src/arrow/builder.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc index a2f24a7..ee363b9 100644 --- a/cpp/src/arrow/builder.cc +++ b/cpp/src/arrow/builder.cc @@ -983,9 +983,9 @@ Status DecimalBuilder::Finish(std::shared_ptr<Array>* out) { ListBuilder::ListBuilder(MemoryPool* pool, std::unique_ptr<ArrayBuilder> value_builder, const std::shared_ptr<DataType>& type) - : ArrayBuilder(pool, - type ? type : std::static_pointer_cast<DataType>( - std::make_shared<ListType>(value_builder->type()))), + : ArrayBuilder( + pool, type ? type : std::static_pointer_cast<DataType>( + std::make_shared<ListType>(value_builder->type()))), offsets_builder_(pool), value_builder_(std::move(value_builder)) {} http://git-wip-us.apache.org/repos/asf/arrow/blob/362e754b/cpp/src/arrow/ipc/metadata.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h index 64b2571..614f7a6 100644 --- a/cpp/src/arrow/ipc/metadata.h +++ b/cpp/src/arrow/ipc/metadata.h @@ -200,13 +200,20 @@ class ARROW_EXPORT Message { ARROW_EXPORT std::string FormatMessageType(Message::Type type); /// \brief Abstract interface for a sequence of messages +/// \since 0.5.0 class ARROW_EXPORT MessageReader { public: virtual ~MessageReader() = default; + /// \brief Read next Message from the interface + /// + /// \param[out] message an arrow::ipc::Message instance + /// \return Status virtual Status ReadNextMessage(std::unique_ptr<Message>* message) = 0; }; +/// \brief Implementation of MessageReader that reads from InputStream +/// \since 0.5.0 class ARROW_EXPORT InputStreamMessageReader : public MessageReader { public: explicit InputStreamMessageReader(const std::shared_ptr<io::InputStream>& stream) http://git-wip-us.apache.org/repos/asf/arrow/blob/362e754b/cpp/src/arrow/python/builtin_convert.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/python/builtin_convert.cc b/cpp/src/arrow/python/builtin_convert.cc index fa1c734..a76b6ba 100644 --- a/cpp/src/arrow/python/builtin_convert.cc +++ b/cpp/src/arrow/python/builtin_convert.cc @@ -44,8 +44,8 @@ static inline bool IsPyInteger(PyObject* obj) { #endif } -Status InvalidConversion(PyObject* obj, const std::string& expected_types, - std::ostream* out) { +Status InvalidConversion( + PyObject* obj, const std::string& expected_types, std::ostream* out) { OwnedRef type(PyObject_Type(obj)); RETURN_IF_PYERROR(); DCHECK_NE(type.obj(), nullptr); @@ -65,8 +65,7 @@ Status InvalidConversion(PyObject* obj, const std::string& expected_types, std::string cpp_type_name(bytes, size); (*out) << "Got Python object of type " << cpp_type_name - << " but can only handle these types: " - << expected_types; + << " but can only handle these types: " << expected_types; return Status::OK(); } @@ -104,7 +103,7 @@ class ScalarVisitor { } else { // TODO(wesm): accumulate error information somewhere static std::string supported_types = - "bool, float, integer, date, datetime, bytes, unicode"; + "bool, float, integer, date, datetime, bytes, unicode"; std::stringstream ss; ss << "Error inferring Arrow data type for collection of Python objects. "; RETURN_NOT_OK(InvalidConversion(obj, supported_types, &ss)); http://git-wip-us.apache.org/repos/asf/arrow/blob/362e754b/cpp/src/arrow/python/pandas_convert.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/python/pandas_convert.cc b/cpp/src/arrow/python/pandas_convert.cc index c520d8d..282b3a9 100644 --- a/cpp/src/arrow/python/pandas_convert.cc +++ b/cpp/src/arrow/python/pandas_convert.cc @@ -552,8 +552,7 @@ Status PandasConverter::ConvertDates() { RETURN_NOT_OK(date_builder.AppendNull()); } else { std::stringstream ss; - ss << "Error converting from Python objects to " - << type_->ToString() << ": "; + ss << "Error converting from Python objects to " << type_->ToString() << ": "; RETURN_NOT_OK(InvalidConversion(obj, "datetime.date", &ss)); return Status::Invalid(ss.str()); } @@ -608,8 +607,7 @@ Status PandasConverter::ConvertDecimals() { RETURN_NOT_OK(decimal_builder.AppendNull()); } else { std::stringstream ss; - ss << "Error converting from Python objects to " - << type_->ToString() << ": "; + ss << "Error converting from Python objects to " << type_->ToString() << ": "; RETURN_NOT_OK(InvalidConversion(object, "decimal.Decimal", &ss)); return Status::Invalid(ss.str()); } @@ -637,8 +635,7 @@ Status PandasConverter::ConvertTimes() { RETURN_NOT_OK(builder.AppendNull()); } else { std::stringstream ss; - ss << "Error converting from Python objects to " - << type_->ToString() << ": "; + ss << "Error converting from Python objects to " << type_->ToString() << ": "; RETURN_NOT_OK(InvalidConversion(obj, "datetime.time", &ss)); return Status::Invalid(ss.str()); } @@ -696,8 +693,7 @@ Status PandasConverter::ConvertObjectFloats() { RETURN_NOT_OK(builder.Append(val)); } else { std::stringstream ss; - ss << "Error converting from Python objects to " - << type_->ToString() << ": "; + ss << "Error converting from Python objects to " << type_->ToString() << ": "; RETURN_NOT_OK(InvalidConversion(obj, "float", &ss)); return Status::Invalid(ss.str()); } @@ -732,8 +728,7 @@ Status PandasConverter::ConvertObjectIntegers() { RETURN_NOT_OK(builder.Append(val)); } else { std::stringstream ss; - ss << "Error converting from Python objects to " - << type_->ToString() << ": "; + ss << "Error converting from Python objects to " << type_->ToString() << ": "; RETURN_NOT_OK(InvalidConversion(obj, "integer", &ss)); return Status::Invalid(ss.str()); } @@ -902,8 +897,7 @@ Status PandasConverter::ConvertBooleans() { BitUtil::SetBit(null_bitmap_data_, i); } else { std::stringstream ss; - ss << "Error converting from Python objects to " - << type_->ToString() << ": "; + ss << "Error converting from Python objects to " << type_->ToString() << ": "; RETURN_NOT_OK(InvalidConversion(obj, "bool", &ss)); return Status::Invalid(ss.str()); } @@ -990,7 +984,7 @@ Status PandasConverter::ConvertObjects() { return ConvertLists(inferred_type); } else { const std::string supported_types = - "string, bool, float, int, date, time, decimal, list, array"; + "string, bool, float, int, date, time, decimal, list, array"; std::stringstream ss; ss << "Error inferring Arrow type for Python object array. "; RETURN_NOT_OK(InvalidConversion(obj, supported_types, &ss)); http://git-wip-us.apache.org/repos/asf/arrow/blob/362e754b/cpp/src/arrow/table.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc index aa04243..c09628e 100644 --- a/cpp/src/arrow/table.cc +++ b/cpp/src/arrow/table.cc @@ -164,6 +164,10 @@ RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows std::vector<std::shared_ptr<internal::ArrayData>>&& columns) : schema_(schema), num_rows_(num_rows), columns_(std::move(columns)) {} +RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, + const std::vector<std::shared_ptr<internal::ArrayData>>& columns) + : schema_(schema), num_rows_(num_rows), columns_(columns) {} + std::shared_ptr<Array> RecordBatch::column(int i) const { std::shared_ptr<Array> result; DCHECK(MakeArray(columns_[i], &result).ok()); @@ -198,6 +202,12 @@ bool RecordBatch::ApproxEquals(const RecordBatch& other) const { return true; } +std::shared_ptr<RecordBatch> RecordBatch::ReplaceSchemaMetadata( + const std::shared_ptr<const KeyValueMetadata>& metadata) const { + auto new_schema = schema_->AddMetadata(metadata); + return std::make_shared<RecordBatch>(new_schema, num_rows_, columns_); +} + std::shared_ptr<RecordBatch> RecordBatch::Slice(int64_t offset) const { return Slice(offset, this->num_rows() - offset); } @@ -256,6 +266,12 @@ Table::Table(const std::shared_ptr<Schema>& schema, const std::vector<std::shared_ptr<Column>>& columns, int64_t num_rows) : schema_(schema), columns_(columns), num_rows_(num_rows) {} +std::shared_ptr<Table> Table::ReplaceSchemaMetadata( + const std::shared_ptr<const KeyValueMetadata>& metadata) const { + auto new_schema = schema_->AddMetadata(metadata); + return std::make_shared<Table>(new_schema, columns_); +} + Status Table::FromRecordBatches(const std::vector<std::shared_ptr<RecordBatch>>& batches, std::shared_ptr<Table>* table) { if (batches.size() == 0) { http://git-wip-us.apache.org/repos/asf/arrow/blob/362e754b/cpp/src/arrow/table.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h index 18315f3..3ec1df9 100644 --- a/cpp/src/arrow/table.h +++ b/cpp/src/arrow/table.h @@ -41,13 +41,13 @@ class Status; using ArrayVector = std::vector<std::shared_ptr<Array>>; -// A data structure managing a list of primitive Arrow arrays logically as one -// large array +/// \brief A data structure managing a list of primitive Arrow arrays logically +/// as one large array class ARROW_EXPORT ChunkedArray { public: explicit ChunkedArray(const ArrayVector& chunks); - // @returns: the total length of the chunked array; computed on construction + // \return the total length of the chunked array; computed on construction int64_t length() const { return length_; } int64_t null_count() const { return null_count_; } @@ -67,9 +67,8 @@ class ARROW_EXPORT ChunkedArray { int64_t null_count_; }; -// An immutable column data structure consisting of a field (type metadata) and -// a logical chunked data array (which can be validated as all being the same -// type). +/// \brief An immutable column data structure consisting of a field (type +/// metadata) and a logical chunked data array class ARROW_EXPORT Column { public: Column(const std::shared_ptr<Field>& field, const ArrayVector& chunks); @@ -86,13 +85,13 @@ class ARROW_EXPORT Column { std::shared_ptr<Field> field() const { return field_; } - // @returns: the column's name in the passed metadata + // \return the column's name in the passed metadata const std::string& name() const { return field_->name(); } - // @returns: the column's type according to the metadata + // \return the column's type according to the metadata std::shared_ptr<DataType> type() const { return field_->type(); } - // @returns: the column's data as a chunked logical array + // \return the column's data as a chunked logical array std::shared_ptr<ChunkedArray> data() const { return data_; } bool Equals(const Column& other) const; @@ -107,9 +106,11 @@ class ARROW_EXPORT Column { std::shared_ptr<ChunkedArray> data_; }; -// A record batch is a simpler and more rigid table data structure intended for -// use primarily in shared memory IPC. It contains a schema (metadata) and a -// corresponding sequence of equal-length Arrow arrays +/// \class RecordBatch +/// \brief Collection of equal-length arrays matching a particular Schema +/// +/// A record batch is table-like data structure consisting of an internal +/// sequence of fields, each a contiguous Arrow array class ARROW_EXPORT RecordBatch { public: /// num_rows is a parameter to allow for record batches of a particular size not @@ -124,6 +125,7 @@ class ARROW_EXPORT RecordBatch { std::vector<std::shared_ptr<Array>>&& columns); /// \brief Construct record batch from vector of internal data structures + /// \since 0.5.0 /// /// This class is only provided with an rvalue-reference for the input data, /// and is intended for internal use, or advanced users. @@ -135,14 +137,19 @@ class ARROW_EXPORT RecordBatch { RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, std::vector<std::shared_ptr<internal::ArrayData>>&& columns); + /// \brief Construct record batch by copying vector of array data + /// \since 0.5.0 + RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, + const std::vector<std::shared_ptr<internal::ArrayData>>& columns); + bool Equals(const RecordBatch& other) const; bool ApproxEquals(const RecordBatch& other) const; - // @returns: the table's schema + // \return the table's schema std::shared_ptr<Schema> schema() const { return schema_; } - // @returns: the i-th column + // \return the i-th column // Note: Does not boundscheck std::shared_ptr<Array> column(int i) const; @@ -150,18 +157,27 @@ class ARROW_EXPORT RecordBatch { const std::string& column_name(int i) const; - // @returns: the number of columns in the table + // \return the number of columns in the table int num_columns() const { return static_cast<int>(columns_.size()); } - // @returns: the number of rows (the corresponding length of each column) + // \return the number of rows (the corresponding length of each column) int64_t num_rows() const { return num_rows_; } + /// \brief Replace schema key-value metadata with new metadata (EXPERIMENTAL) + /// \since 0.5.0 + /// + /// \param[in] metadata new KeyValueMetadata + /// \return new RecordBatch + std::shared_ptr<RecordBatch> ReplaceSchemaMetadata( + const std::shared_ptr<const KeyValueMetadata>& metadata) const; + /// Slice each of the arrays in the record batch and construct a new RecordBatch object std::shared_ptr<RecordBatch> Slice(int64_t offset) const; std::shared_ptr<RecordBatch> Slice(int64_t offset, int64_t length) const; - /// Returns error status is there is something wrong with the record batch - /// contents, like a schema/array mismatch or inconsistent lengths + /// \brief Check for schema or length inconsistencies + /// + /// \return Status Status Validate() const; private: @@ -190,11 +206,11 @@ class ARROW_EXPORT Table { const std::vector<std::shared_ptr<RecordBatch>>& batches, std::shared_ptr<Table>* table); - // @returns: the table's schema + // \return the table's schema std::shared_ptr<Schema> schema() const { return schema_; } // Note: Does not boundscheck - // @returns: the i-th column + // \return the i-th column std::shared_ptr<Column> column(int i) const { return columns_[i]; } /// Remove column from the table, producing a new Table (because tables and @@ -205,10 +221,18 @@ class ARROW_EXPORT Table { Status AddColumn( int i, const std::shared_ptr<Column>& column, std::shared_ptr<Table>* out) const; - // @returns: the number of columns in the table + /// \brief Replace schema key-value metadata with new metadata (EXPERIMENTAL) + /// \since 0.5.0 + /// + /// \param[in] metadata new KeyValueMetadata + /// \return new Table + std::shared_ptr<Table> ReplaceSchemaMetadata( + const std::shared_ptr<const KeyValueMetadata>& metadata) const; + + // \return the number of columns in the table int num_columns() const { return static_cast<int>(columns_.size()); } - // @returns: the number of rows (the corresponding length of each column) + // \return the number of rows (the corresponding length of each column) int64_t num_rows() const { return num_rows_; } bool Equals(const Table& other) const; http://git-wip-us.apache.org/repos/asf/arrow/blob/362e754b/cpp/src/arrow/type.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc index 891045e..623c193 100644 --- a/cpp/src/arrow/type.cc +++ b/cpp/src/arrow/type.cc @@ -31,9 +31,14 @@ namespace arrow { +std::shared_ptr<Field> Field::AddMetadata( + const std::shared_ptr<const KeyValueMetadata>& metadata) const { + return std::make_shared<Field>(name_, type_, nullable_, metadata); +} + Status Field::AddMetadata(const std::shared_ptr<const KeyValueMetadata>& metadata, std::shared_ptr<Field>* out) const { - *out = std::make_shared<Field>(name_, type_, nullable_, metadata); + *out = AddMetadata(metadata); return Status::OK(); } @@ -294,9 +299,14 @@ Status Schema::AddField( return Status::OK(); } +std::shared_ptr<Schema> Schema::AddMetadata( + const std::shared_ptr<const KeyValueMetadata>& metadata) const { + return std::make_shared<Schema>(fields_, metadata); +} + Status Schema::AddMetadata(const std::shared_ptr<const KeyValueMetadata>& metadata, std::shared_ptr<Schema>* out) const { - *out = std::make_shared<Schema>(fields_, metadata); + *out = AddMetadata(metadata); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/arrow/blob/362e754b/cpp/src/arrow/type.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 70e8644..fffb840 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -210,9 +210,12 @@ class ARROW_EXPORT Field { std::shared_ptr<const KeyValueMetadata> metadata() const { return metadata_; } + /// \deprecated Status AddMetadata(const std::shared_ptr<const KeyValueMetadata>& metadata, std::shared_ptr<Field>* out) const; + std::shared_ptr<Field> AddMetadata( + const std::shared_ptr<const KeyValueMetadata>& metadata) const; std::shared_ptr<Field> RemoveMetadata() const; bool Equals(const Field& other) const; @@ -690,39 +693,56 @@ class ARROW_EXPORT DictionaryType : public FixedWidthType { // ---------------------------------------------------------------------- // Schema +/// \class Schema +/// \brief Sequence of arrow::Field objects describing the columns of a record +/// batch or table data structure class ARROW_EXPORT Schema { public: explicit Schema(const std::vector<std::shared_ptr<Field>>& fields, const std::shared_ptr<const KeyValueMetadata>& metadata = nullptr); virtual ~Schema() = default; - // Returns true if all of the schema fields are equal + /// Returns true if all of the schema fields are equal bool Equals(const Schema& other) const; - // Return the ith schema element. Does not boundscheck + /// Return the ith schema element. Does not boundscheck std::shared_ptr<Field> field(int i) const { return fields_[i]; } - // Returns nullptr if name not found + /// Returns nullptr if name not found std::shared_ptr<Field> GetFieldByName(const std::string& name) const; - // Returns -1 if name not found + /// Returns -1 if name not found int64_t GetFieldIndex(const std::string& name) const; const std::vector<std::shared_ptr<Field>>& fields() const { return fields_; } + + /// \brief The custom key-value metadata, if any + /// + /// \return metadata may be nullptr std::shared_ptr<const KeyValueMetadata> metadata() const { return metadata_; } - // Render a string representation of the schema suitable for debugging + /// \brief Render a string representation of the schema suitable for debugging std::string ToString() const; Status AddField( int i, const std::shared_ptr<Field>& field, std::shared_ptr<Schema>* out) const; Status RemoveField(int i, std::shared_ptr<Schema>* out) const; + /// \deprecated Status AddMetadata(const std::shared_ptr<const KeyValueMetadata>& metadata, std::shared_ptr<Schema>* out) const; + /// \brief Replace key-value metadata with new metadata + /// + /// \param[in] metadata new KeyValueMetadata + /// \return new Schema + std::shared_ptr<Schema> AddMetadata( + const std::shared_ptr<const KeyValueMetadata>& metadata) const; + + /// \brief Return copy of Schema without the KeyValueMetadata std::shared_ptr<Schema> RemoveMetadata() const; + /// \brief Return the number of fields (columns) in the schema int num_fields() const { return static_cast<int>(fields_.size()); } private: http://git-wip-us.apache.org/repos/asf/arrow/blob/362e754b/cpp/src/arrow/util/logging.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h index 2fec4fa..b618121 100644 --- a/cpp/src/arrow/util/logging.h +++ b/cpp/src/arrow/util/logging.h @@ -103,7 +103,8 @@ class NullLog { class CerrLog { public: CerrLog(int severity) // NOLINT(runtime/explicit) - : severity_(severity), has_logged_(false) {} + : severity_(severity), + has_logged_(false) {} virtual ~CerrLog() { if (has_logged_) { std::cerr << std::endl; } http://git-wip-us.apache.org/repos/asf/arrow/blob/362e754b/cpp/src/plasma/protocol.cc ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc index 9739d77..246aa29 100644 --- a/cpp/src/plasma/protocol.cc +++ b/cpp/src/plasma/protocol.cc @@ -38,8 +38,8 @@ to_flatbuffer(flatbuffers::FlatBufferBuilder* fbb, const ObjectID* object_ids, Status PlasmaReceive(int sock, int64_t message_type, std::vector<uint8_t>* buffer) { int64_t type; RETURN_NOT_OK(ReadMessage(sock, &type, buffer)); - ARROW_CHECK(type == message_type) - << "type = " << type << ", message_type = " << message_type; + ARROW_CHECK(type == message_type) << "type = " << type + << ", message_type = " << message_type; return Status::OK(); } http://git-wip-us.apache.org/repos/asf/arrow/blob/362e754b/python/pyarrow/_parquet.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 5d446a8..0e0d58e 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -423,11 +423,14 @@ cdef class ParquetReader: def set_num_threads(self, int nthreads): self.reader.get().set_num_threads(nthreads) - def read_row_group(self, int i, column_indices=None): + def read_row_group(self, int i, column_indices=None, nthreads=None): cdef: shared_ptr[CTable] ctable vector[int] c_column_indices + if nthreads: + self.set_num_threads(nthreads) + if column_indices is not None: for index in column_indices: c_column_indices.push_back(index) @@ -442,11 +445,14 @@ cdef class ParquetReader: .ReadRowGroup(i, &ctable)) return pyarrow_wrap_table(ctable) - def read_all(self, column_indices=None): + def read_all(self, column_indices=None, nthreads=None): cdef: shared_ptr[CTable] ctable vector[int] c_column_indices + if nthreads: + self.set_num_threads(nthreads) + if column_indices is not None: for index in column_indices: c_column_indices.push_back(index) http://git-wip-us.apache.org/repos/asf/arrow/blob/362e754b/python/pyarrow/filesystem.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py index 798d96b..9fa4f76 100644 --- a/python/pyarrow/filesystem.py +++ b/python/pyarrow/filesystem.py @@ -63,7 +63,7 @@ class Filesystem(object): raise NotImplementedError def read_parquet(self, path, columns=None, metadata=None, schema=None, - nthreads=1): + nthreads=1, use_pandas_metadata=False): """ Read Parquet data from path in file system. Can read from a single file or a directory of files @@ -82,6 +82,9 @@ class Filesystem(object): nthreads : int, default 1 Number of columns to read in parallel. If > 1, requires that the underlying file source is threadsafe + use_pandas_metadata : boolean, default False + If True and file has custom pandas schema metadata, ensure that + index columns are also loaded Returns ------- @@ -90,7 +93,8 @@ class Filesystem(object): from pyarrow.parquet import ParquetDataset dataset = ParquetDataset(path, schema=schema, metadata=metadata, filesystem=self) - return dataset.read(columns=columns, nthreads=nthreads) + return dataset.read(columns=columns, nthreads=nthreads, + use_pandas_metadata=use_pandas_metadata) @property def pathsep(self): http://git-wip-us.apache.org/repos/asf/arrow/blob/362e754b/python/pyarrow/includes/libarrow.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 44d83da..e1fe0c0 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -197,8 +197,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: c_bool nullable, const shared_ptr[CKeyValueMetadata]& metadata) # Removed const in Cython so don't have to cast to get code to generate - CStatus AddMetadata(const shared_ptr[CKeyValueMetadata]& metadata, - shared_ptr[CField]* out) + shared_ptr[CField] AddMetadata( + const shared_ptr[CKeyValueMetadata]& metadata) shared_ptr[CField] RemoveMetadata() @@ -224,8 +224,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: c_string ToString() # Removed const in Cython so don't have to cast to get code to generate - CStatus AddMetadata(const shared_ptr[CKeyValueMetadata]& metadata, - shared_ptr[CSchema]* out) + shared_ptr[CSchema] AddMetadata( + const shared_ptr[CKeyValueMetadata]& metadata) shared_ptr[CSchema] RemoveMetadata() cdef cppclass CBooleanArray" arrow::BooleanArray"(CArray): @@ -346,6 +346,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: int num_columns() int64_t num_rows() + shared_ptr[CRecordBatch] ReplaceSchemaMetadata( + const shared_ptr[CKeyValueMetadata]& metadata) + shared_ptr[CRecordBatch] Slice(int64_t offset) shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length) @@ -370,6 +373,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: shared_ptr[CTable]* out) CStatus RemoveColumn(int i, shared_ptr[CTable]* out) + shared_ptr[CTable] ReplaceSchemaMetadata( + const shared_ptr[CKeyValueMetadata]& metadata) + cdef cppclass CTensor" arrow::Tensor": shared_ptr[CDataType] type() shared_ptr[CBuffer] data() http://git-wip-us.apache.org/repos/asf/arrow/blob/362e754b/python/pyarrow/parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index aa2352c..06b3a3d 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -45,10 +45,14 @@ class ParquetFile(object): see pyarrow.io.PythonFileInterface or pyarrow.io.BufferReader. metadata : ParquetFileMetadata, default None Use existing metadata object, rather than reading from file. + common_metadata : ParquetFileMetadata, default None + Will be used in reads for pandas schema metadata if not found in the + main file's metadata, no other uses at the moment """ - def __init__(self, source, metadata=None): + def __init__(self, source, metadata=None, common_metadata=None): self.reader = ParquetReader() self.reader.open(source, metadata=metadata) + self.common_metadata = common_metadata @property def metadata(self): @@ -62,7 +66,8 @@ class ParquetFile(object): def num_row_groups(self): return self.reader.num_row_groups - def read_row_group(self, i, columns=None, nthreads=1): + def read_row_group(self, i, columns=None, nthreads=1, + use_pandas_metadata=False): """ Read a single row group from a Parquet file @@ -73,18 +78,21 @@ class ParquetFile(object): nthreads : int, default 1 Number of columns to read in parallel. If > 1, requires that the underlying file source is threadsafe + use_pandas_metadata : boolean, default False + If True and file has custom pandas schema metadata, ensure that + index columns are also loaded Returns ------- pyarrow.table.Table Content of the row group as a table (of columns) """ - column_indices = self._get_column_indices(columns) - if nthreads is not None: - self.reader.set_num_threads(nthreads) - return self.reader.read_row_group(i, column_indices=column_indices) + column_indices = self._get_column_indices( + columns, use_pandas_metadata=use_pandas_metadata) + return self.reader.read_row_group(i, column_indices=column_indices, + nthreads=nthreads) - def read(self, columns=None, nthreads=1): + def read(self, columns=None, nthreads=1, use_pandas_metadata=False): """ Read a Table from Parquet format @@ -95,40 +103,48 @@ class ParquetFile(object): nthreads : int, default 1 Number of columns to read in parallel. If > 1, requires that the underlying file source is threadsafe + use_pandas_metadata : boolean, default False + If True and file has custom pandas schema metadata, ensure that + index columns are also loaded Returns ------- pyarrow.table.Table Content of the file as a table (of columns) """ - column_indices = self._get_column_indices(columns) - if nthreads is not None: - self.reader.set_num_threads(nthreads) + column_indices = self._get_column_indices( + columns, use_pandas_metadata=use_pandas_metadata) + return self.reader.read_all(column_indices=column_indices, + nthreads=nthreads) - return self.reader.read_all(column_indices=column_indices) + def _get_column_indices(self, column_names, use_pandas_metadata=False): + if column_names is None: + return None - def read_pandas(self, columns=None, nthreads=1): - column_indices = self._get_column_indices(columns) - custom_metadata = self.metadata.metadata + indices = list(map(self.reader.column_name_idx, column_names)) - if custom_metadata and b'pandas' in custom_metadata: - index_columns = json.loads( - custom_metadata[b'pandas'].decode('utf8') - )['index_columns'] - else: - index_columns = [] + if use_pandas_metadata: + file_keyvalues = self.metadata.metadata + common_keyvalues = (self.common_metadata.metadata + if self.common_metadata is not None + else None) - if column_indices is not None and index_columns: - column_indices += map(self.reader.column_name_idx, index_columns) + if file_keyvalues and b'pandas' in file_keyvalues: + index_columns = _get_pandas_index_columns(file_keyvalues) + elif common_keyvalues and b'pandas' in common_keyvalues: + index_columns = _get_pandas_index_columns(common_keyvalues) + else: + index_columns = [] - if nthreads is not None: - self.reader.set_num_threads(nthreads) - return self.reader.read_all(column_indices=column_indices) + if indices is not None and index_columns: + indices += map(self.reader.column_name_idx, index_columns) + + return indices - def _get_column_indices(self, column_names): - if column_names is None: - return None - return list(map(self.reader.column_name_idx, column_names)) + +def _get_pandas_index_columns(keyvalues): + return (json.loads(keyvalues[b'pandas'].decode('utf8')) + ['index_columns']) # ---------------------------------------------------------------------- @@ -205,7 +221,7 @@ class ParquetDatasetPiece(object): return reader def read(self, columns=None, nthreads=1, partitions=None, - open_file_func=None, file=None): + open_file_func=None, file=None, use_pandas_metadata=False): """ Read this piece as a pyarrow.Table @@ -218,6 +234,8 @@ class ParquetDatasetPiece(object): open_file_func : function, default None A function that knows how to construct a ParquetFile object given the file path in this piece + file : file-like object + passed to ParquetFile Returns ------- @@ -231,11 +249,14 @@ class ParquetDatasetPiece(object): # try to read the local path reader = ParquetFile(self.path) + options = dict(columns=columns, + nthreads=nthreads, + use_pandas_metadata=use_pandas_metadata) + if self.row_group is not None: - table = reader.read_row_group(self.row_group, columns=columns, - nthreads=nthreads) + table = reader.read_row_group(self.row_group, **options) else: - table = reader.read(columns=columns, nthreads=nthreads) + table = reader.read(**options) if len(self.partition_keys) > 0: if partitions is None: @@ -509,6 +530,11 @@ class ParquetDataset(object): (self.pieces, self.partitions, self.metadata_path) = _make_manifest(path_or_paths, self.fs) + if self.metadata_path is not None: + self.common_metadata = ParquetFile(self.metadata_path).metadata + else: + self.common_metadata = None + self.metadata = metadata self.schema = schema @@ -540,7 +566,7 @@ class ParquetDataset(object): .format(piece, file_metadata.schema, self.schema)) - def read(self, columns=None, nthreads=1): + def read(self, columns=None, nthreads=1, use_pandas_metadata=False): """ Read multiple Parquet files as a single pyarrow.Table @@ -551,6 +577,8 @@ class ParquetDataset(object): nthreads : int, default 1 Number of columns to read in parallel. Requires that the underlying file source is threadsafe + use_pandas_metadata : bool, default False + Passed through to each dataset piece Returns ------- @@ -563,20 +591,54 @@ class ParquetDataset(object): for piece in self.pieces: table = piece.read(columns=columns, nthreads=nthreads, partitions=self.partitions, - open_file_func=open_file) + open_file_func=open_file, + use_pandas_metadata=use_pandas_metadata) tables.append(table) all_data = lib.concat_tables(tables) + + if use_pandas_metadata: + # We need to ensure that this metadata is set in the Table's schema + # so that Table.to_pandas will construct pandas.DataFrame with the + # right index + common_metadata = self._get_common_pandas_metadata() + current_metadata = all_data.schema.metadata or {} + + if common_metadata and b'pandas' not in current_metadata: + all_data = all_data.replace_schema_metadata({ + b'pandas': common_metadata}) + return all_data + def read_pandas(self, **kwargs): + """ + Read dataset including pandas metadata, if any. Other arguments passed + through to ParquetDataset.read, see docstring for further details + + Returns + ------- + pyarrow.Table + Content of the file as a table (of columns) + """ + return self.read(use_pandas_metadata=True, **kwargs) + + def _get_common_pandas_metadata(self): + if self.common_metadata is None: + return None + + keyvalues = self.common_metadata.metadata + return keyvalues.get(b'pandas', None) + def _get_open_file_func(self): if self.fs is None or isinstance(self.fs, LocalFilesystem): def open_file(path, meta=None): - return ParquetFile(path, metadata=meta) + return ParquetFile(path, metadata=meta, + common_metadata=self.common_metadata) else: def open_file(path, meta=None): return ParquetFile(self.fs.open(path, mode='rb'), - metadata=meta) + metadata=meta, + common_metadata=self.common_metadata) return open_file @@ -613,7 +675,8 @@ def _make_manifest(path_or_paths, fs, pathsep='/'): return pieces, partitions, metadata_path -def read_table(source, columns=None, nthreads=1, metadata=None): +def read_table(source, columns=None, nthreads=1, metadata=None, + use_pandas_metadata=False): """ Read a Table from Parquet format @@ -630,6 +693,9 @@ def read_table(source, columns=None, nthreads=1, metadata=None): file source is threadsafe metadata : FileMetaData If separately computed + use_pandas_metadata : boolean, default False + If True and file has custom pandas schema metadata, ensure that + index columns are also loaded Returns ------- @@ -643,13 +709,14 @@ def read_table(source, columns=None, nthreads=1, metadata=None): metadata=metadata) pf = ParquetFile(source, metadata=metadata) - return pf.read(columns=columns, nthreads=nthreads) + return pf.read(columns=columns, nthreads=nthreads, + use_pandas_metadata=use_pandas_metadata) def read_pandas(source, columns=None, nthreads=1, metadata=None): """ - Read a Table from Parquet format, reconstructing the index values if - available. + Read a Table from Parquet format, also reading DataFrame index values if + known in the file metadata Parameters ---------- @@ -671,16 +738,8 @@ def read_pandas(source, columns=None, nthreads=1, metadata=None): Content of the file as a Table of Columns, including DataFrame indexes as Columns. """ - if is_string(source): - fs = LocalFilesystem.get_instance() - if fs.isdir(source): - raise NotImplementedError( - 'Reading a directory of Parquet files with DataFrame index ' - 'metadata is not yet supported' - ) - - pf = ParquetFile(source, metadata=metadata) - return pf.read_pandas(columns=columns, nthreads=nthreads) + return read_table(source, columns=columns, nthreads=nthreads, + metadata=metadata, use_pandas_metadata=True) def write_table(table, where, row_group_size=None, version='1.0', http://git-wip-us.apache.org/repos/asf/arrow/blob/362e754b/python/pyarrow/table.pxi ---------------------------------------------------------------------- diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 575755d..d7a6060 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -388,6 +388,30 @@ cdef class RecordBatch: self._check_nullptr() return self.batch.num_rows() + def replace_schema_metadata(self, dict metadata=None): + """ + EXPERIMENTAL: Create shallow copy of record batch by replacing schema + key-value metadata with the indicated new metadata (which may be None, + which deletes any existing metadata + + Parameters + ---------- + metadata : dict, default None + + Returns + ------- + shallow_copy : RecordBatch + """ + cdef shared_ptr[CKeyValueMetadata] c_meta + if metadata is not None: + convert_metadata(metadata, &c_meta) + + cdef shared_ptr[CRecordBatch] new_batch + with nogil: + new_batch = self.batch.ReplaceSchemaMetadata(c_meta) + + return pyarrow_wrap_batch(new_batch) + @property def num_columns(self): """ @@ -624,6 +648,30 @@ cdef class Table: ) return 0 + def replace_schema_metadata(self, dict metadata=None): + """ + EXPERIMENTAL: Create shallow copy of table by replacing schema + key-value metadata with the indicated new metadata (which may be None, + which deletes any existing metadata + + Parameters + ---------- + metadata : dict, default None + + Returns + ------- + shallow_copy : Table + """ + cdef shared_ptr[CKeyValueMetadata] c_meta + if metadata is not None: + convert_metadata(metadata, &c_meta) + + cdef shared_ptr[CTable] new_table + with nogil: + new_table = self.table.ReplaceSchemaMetadata(c_meta) + + return pyarrow_wrap_table(new_table) + def equals(self, Table other): """ Check if contents of two tables are equal http://git-wip-us.apache.org/repos/asf/arrow/blob/362e754b/python/pyarrow/tests/test_parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 0f44d16..d17eb14 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -809,9 +809,6 @@ def test_read_multiple_files(tmpdir): assert result.equals(expected) - with pytest.raises(NotImplementedError): - pq.read_pandas(dirpath) - # Read with provided metadata metadata = pq.ParquetFile(paths[0]).metadata @@ -857,6 +854,88 @@ def test_read_multiple_files(tmpdir): @parquet +def test_dataset_read_pandas(tmpdir): + import pyarrow.parquet as pq + + nfiles = 5 + size = 5 + + dirpath = tmpdir.join(guid()).strpath + os.mkdir(dirpath) + + test_data = [] + frames = [] + paths = [] + for i in range(nfiles): + df = _test_dataframe(size, seed=i) + df.index = np.arange(i * size, (i + 1) * size) + df.index.name = 'index' + + path = pjoin(dirpath, '{0}.parquet'.format(i)) + + table = pa.Table.from_pandas(df) + _write_table(table, path) + test_data.append(table) + frames.append(df) + paths.append(path) + + dataset = pq.ParquetDataset(dirpath) + columns = ['uint8', 'strings'] + result = dataset.read_pandas(columns=columns).to_pandas() + expected = pd.concat([x[columns] for x in frames]) + + tm.assert_frame_equal(result, expected) + + +@parquet +def test_dataset_read_pandas_common_metadata(tmpdir): + # ARROW-1103 + import pyarrow.parquet as pq + + nfiles = 5 + size = 5 + + dirpath = tmpdir.join(guid()).strpath + os.mkdir(dirpath) + + test_data = [] + frames = [] + paths = [] + for i in range(nfiles): + df = _test_dataframe(size, seed=i) + df.index = pd.Index(np.arange(i * size, (i + 1) * size)) + df.index.name = 'index' + + path = pjoin(dirpath, '{0}.parquet'.format(i)) + + df_ex_index = df.reset_index(drop=True) + df_ex_index['index'] = df.index + table = pa.Table.from_pandas(df_ex_index, + preserve_index=False) + + # Obliterate metadata + table = table.replace_schema_metadata(None) + assert table.schema.metadata is None + + _write_table(table, path) + test_data.append(table) + frames.append(df) + paths.append(path) + + # Write _metadata common file + table_for_metadata = pa.Table.from_pandas(df) + pq.write_metadata(table_for_metadata.schema, + pjoin(dirpath, '_metadata')) + + dataset = pq.ParquetDataset(dirpath) + columns = ['uint8', 'strings'] + result = dataset.read_pandas(columns=columns).to_pandas() + expected = pd.concat([x[columns] for x in frames]) + + tm.assert_frame_equal(result, expected) + + +@parquet def test_ignore_private_directories(tmpdir): import pyarrow.parquet as pq http://git-wip-us.apache.org/repos/asf/arrow/blob/362e754b/python/pyarrow/types.pxi ---------------------------------------------------------------------- diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi index 95bfbfb..a8d7aa0 100644 --- a/python/pyarrow/types.pxi +++ b/python/pyarrow/types.pxi @@ -255,7 +255,7 @@ cdef class Field: cdef shared_ptr[CField] new_field with nogil: - check_status(self.field.AddMetadata(c_meta, &new_field)) + new_field = self.field.AddMetadata(c_meta) return pyarrow_wrap_field(new_field) @@ -368,7 +368,7 @@ cdef class Schema: cdef shared_ptr[CSchema] new_schema with nogil: - check_status(self.schema.AddMetadata(c_meta, &new_schema)) + new_schema = self.schema.AddMetadata(c_meta) return pyarrow_wrap_schema(new_schema)
