Repository: arrow Updated Branches: refs/heads/master b109a246f -> 6b3ae2aec
ARROW-605: [C++] Refactor IPC adapter code into generic ArrayLoader class. Add Date32Type These are various changes introduced to support the Feather merge in ARROW-452 #361 Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #365 from wesm/array-loader and squashes the following commits: bc22872 [Wes McKinney] Revert Array::type_id to type_enum since Parquet uses this API 344e6b1 [Wes McKinney] fix compiler warning 997b7a2 [Wes McKinney] Refactor IPC adapter code into generic ArrayLoader class. Add Date32Type Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/6b3ae2ae Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/6b3ae2ae Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/6b3ae2ae Branch: refs/heads/master Commit: 6b3ae2aecc8cd31425035a021fa04b9ed3385a8d Parents: b109a24 Author: Wes McKinney <wes.mckin...@twosigma.com> Authored: Thu Mar 9 14:00:48 2017 -0500 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Thu Mar 9 14:00:48 2017 -0500 ---------------------------------------------------------------------- cpp/CMakeLists.txt | 1 + cpp/src/arrow/CMakeLists.txt | 5 +- cpp/src/arrow/array.cc | 52 +---- cpp/src/arrow/array.h | 17 +- cpp/src/arrow/builder.cc | 1 + cpp/src/arrow/builder.h | 1 + cpp/src/arrow/column.cc | 3 + cpp/src/arrow/column.h | 3 + cpp/src/arrow/compare.cc | 8 +- cpp/src/arrow/io/memory.cc | 19 +- cpp/src/arrow/io/memory.h | 6 + cpp/src/arrow/ipc/adapter.cc | 252 ++------------------ cpp/src/arrow/ipc/adapter.h | 8 +- cpp/src/arrow/ipc/metadata.cc | 1 + cpp/src/arrow/ipc/metadata.h | 7 +- cpp/src/arrow/loader.cc | 285 +++++++++++++++++++++++ cpp/src/arrow/loader.h | 89 +++++++ cpp/src/arrow/pretty_print.cc | 6 +- cpp/src/arrow/type.cc | 22 +- cpp/src/arrow/type.h | 38 ++- cpp/src/arrow/type_fwd.h | 5 + cpp/src/arrow/type_traits.h | 12 + python/pyarrow/array.pyx | 31 ++- python/pyarrow/table.pyx | 3 +- python/pyarrow/tests/test_convert_pandas.py | 6 +- python/src/pyarrow/adapters/pandas.cc | 17 +- 26 files changed, 558 insertions(+), 340 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 22c6e9a..294c439 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -799,6 +799,7 @@ set(ARROW_SRCS src/arrow/builder.cc src/arrow/column.cc src/arrow/compare.cc + src/arrow/loader.cc src/arrow/memory_pool.cc src/arrow/pretty_print.cc src/arrow/schema.cc http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index d1efa02..ddeb81c 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -19,10 +19,11 @@ install(FILES api.h array.h - column.h - compare.h buffer.h builder.h + column.h + compare.h + loader.h memory_pool.h pretty_print.h schema.h http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/array.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc index 284bb57..49da6bb 100644 --- a/cpp/src/arrow/array.cc +++ b/cpp/src/arrow/array.cc @@ -165,6 +165,7 @@ template class NumericArray<Int32Type>; template class NumericArray<Int64Type>; template class NumericArray<TimestampType>; template class NumericArray<DateType>; +template class NumericArray<Date32Type>; template class NumericArray<TimeType>; template class NumericArray<HalfFloatType>; template class NumericArray<FloatType>; @@ -193,7 +194,7 @@ std::shared_ptr<Array> BooleanArray::Slice(int64_t offset, int64_t length) const Status ListArray::Validate() const { if (length_ < 0) { return Status::Invalid("Length was negative"); } - if (!value_offsets_) { return Status::Invalid("value_offsets_ was null"); } + if (length_ && !value_offsets_) { return Status::Invalid("value_offsets_ was null"); } if (value_offsets_->size() / static_cast<int>(sizeof(int32_t)) < length_) { std::stringstream ss; ss << "offset buffer size (bytes): " << value_offsets_->size() @@ -425,20 +426,6 @@ std::shared_ptr<Array> UnionArray::Slice(int64_t offset, int64_t length) const { // ---------------------------------------------------------------------- // DictionaryArray -Status DictionaryArray::FromBuffer(const std::shared_ptr<DataType>& type, int64_t length, - const std::shared_ptr<Buffer>& indices, const std::shared_ptr<Buffer>& null_bitmap, - int64_t null_count, int64_t offset, std::shared_ptr<DictionaryArray>* out) { - DCHECK_EQ(type->type, Type::DICTIONARY); - const auto& dict_type = static_cast<const DictionaryType*>(type.get()); - - std::shared_ptr<Array> boxed_indices; - RETURN_NOT_OK(MakePrimitiveArray(dict_type->index_type(), length, indices, null_bitmap, - null_count, offset, &boxed_indices)); - - *out = std::make_shared<DictionaryArray>(type, boxed_indices); - return Status::OK(); -} - DictionaryArray::DictionaryArray( const std::shared_ptr<DataType>& type, const std::shared_ptr<Array>& indices) : Array(type, indices->length(), indices->null_bitmap(), indices->null_count(), @@ -470,40 +457,6 @@ std::shared_ptr<Array> DictionaryArray::Slice(int64_t offset, int64_t length) co } // ---------------------------------------------------------------------- - -#define MAKE_PRIMITIVE_ARRAY_CASE(ENUM, ArrayType) \ - case Type::ENUM: \ - out->reset(new ArrayType(type, length, data, null_bitmap, null_count, offset)); \ - break; - -Status MakePrimitiveArray(const std::shared_ptr<DataType>& type, int64_t length, - const std::shared_ptr<Buffer>& data, const std::shared_ptr<Buffer>& null_bitmap, - int64_t null_count, int64_t offset, std::shared_ptr<Array>* out) { - switch (type->type) { - MAKE_PRIMITIVE_ARRAY_CASE(BOOL, BooleanArray); - MAKE_PRIMITIVE_ARRAY_CASE(UINT8, UInt8Array); - MAKE_PRIMITIVE_ARRAY_CASE(INT8, Int8Array); - MAKE_PRIMITIVE_ARRAY_CASE(UINT16, UInt16Array); - MAKE_PRIMITIVE_ARRAY_CASE(INT16, Int16Array); - MAKE_PRIMITIVE_ARRAY_CASE(UINT32, UInt32Array); - MAKE_PRIMITIVE_ARRAY_CASE(INT32, Int32Array); - MAKE_PRIMITIVE_ARRAY_CASE(UINT64, UInt64Array); - MAKE_PRIMITIVE_ARRAY_CASE(INT64, Int64Array); - MAKE_PRIMITIVE_ARRAY_CASE(FLOAT, FloatArray); - MAKE_PRIMITIVE_ARRAY_CASE(DOUBLE, DoubleArray); - MAKE_PRIMITIVE_ARRAY_CASE(TIME, Int64Array); - MAKE_PRIMITIVE_ARRAY_CASE(TIMESTAMP, TimestampArray); - default: - return Status::NotImplemented(type->ToString()); - } -#ifdef NDEBUG - return Status::OK(); -#else - return (*out)->Validate(); -#endif -} - -// ---------------------------------------------------------------------- // Default implementations of ArrayVisitor methods #define ARRAY_VISITOR_DEFAULT(ARRAY_CLASS) \ @@ -527,6 +480,7 @@ ARRAY_VISITOR_DEFAULT(DoubleArray); ARRAY_VISITOR_DEFAULT(StringArray); ARRAY_VISITOR_DEFAULT(BinaryArray); ARRAY_VISITOR_DEFAULT(DateArray); +ARRAY_VISITOR_DEFAULT(Date32Array); ARRAY_VISITOR_DEFAULT(TimeArray); ARRAY_VISITOR_DEFAULT(TimestampArray); ARRAY_VISITOR_DEFAULT(IntervalArray); http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/array.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h index f20f212..f111609 100644 --- a/cpp/src/arrow/array.h +++ b/cpp/src/arrow/array.h @@ -58,6 +58,7 @@ class ARROW_EXPORT ArrayVisitor { virtual Status Visit(const StringArray& array); virtual Status Visit(const BinaryArray& array); virtual Status Visit(const DateArray& array); + virtual Status Visit(const Date32Array& array); virtual Status Visit(const TimeArray& array); virtual Status Visit(const TimestampArray& array); virtual Status Visit(const IntervalArray& array); @@ -485,12 +486,6 @@ class ARROW_EXPORT DictionaryArray : public Array { DictionaryArray( const std::shared_ptr<DataType>& type, const std::shared_ptr<Array>& indices); - // Alternate ctor; other attributes (like null count) are inherited from the - // passed indices array - static Status FromBuffer(const std::shared_ptr<DataType>& type, int64_t length, - const std::shared_ptr<Buffer>& indices, const std::shared_ptr<Buffer>& null_bitmap, - int64_t null_count, int64_t offset, std::shared_ptr<DictionaryArray>* out); - Status Validate() const override; std::shared_ptr<Array> indices() const { return indices_; } @@ -531,21 +526,13 @@ extern template class ARROW_EXPORT NumericArray<FloatType>; extern template class ARROW_EXPORT NumericArray<DoubleType>; extern template class ARROW_EXPORT NumericArray<TimestampType>; extern template class ARROW_EXPORT NumericArray<DateType>; +extern template class ARROW_EXPORT NumericArray<Date32Type>; extern template class ARROW_EXPORT NumericArray<TimeType>; #if defined(__GNUC__) && !defined(__clang__) #pragma GCC diagnostic pop #endif -// ---------------------------------------------------------------------- -// Helper functions - -// Create new arrays for logical types that are backed by primitive arrays. -Status ARROW_EXPORT MakePrimitiveArray(const std::shared_ptr<DataType>& type, - int64_t length, const std::shared_ptr<Buffer>& data, - const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset, - std::shared_ptr<Array>* out); - } // namespace arrow #endif http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/builder.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc index 9086598..4372925 100644 --- a/cpp/src/arrow/builder.cc +++ b/cpp/src/arrow/builder.cc @@ -238,6 +238,7 @@ template class PrimitiveBuilder<Int16Type>; template class PrimitiveBuilder<Int32Type>; template class PrimitiveBuilder<Int64Type>; template class PrimitiveBuilder<DateType>; +template class PrimitiveBuilder<Date32Type>; template class PrimitiveBuilder<TimestampType>; template class PrimitiveBuilder<TimeType>; template class PrimitiveBuilder<HalfFloatType>; http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/builder.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h index e642d3c..ebc683a 100644 --- a/cpp/src/arrow/builder.h +++ b/cpp/src/arrow/builder.h @@ -233,6 +233,7 @@ using Int64Builder = NumericBuilder<Int64Type>; using TimestampBuilder = NumericBuilder<TimestampType>; using TimeBuilder = NumericBuilder<TimeType>; using DateBuilder = NumericBuilder<DateType>; +using Date32Builder = NumericBuilder<Date32Type>; using HalfFloatBuilder = NumericBuilder<HalfFloatType>; using FloatBuilder = NumericBuilder<FloatType>; http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/column.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/column.cc b/cpp/src/arrow/column.cc index 1822870..78501f9 100644 --- a/cpp/src/arrow/column.cc +++ b/cpp/src/arrow/column.cc @@ -97,6 +97,9 @@ Column::Column(const std::shared_ptr<Field>& field, const std::shared_ptr<Array> } } +Column::Column(const std::string& name, const std::shared_ptr<Array>& data) + : Column(::arrow::field(name, data->type()), data) {} + Column::Column( const std::shared_ptr<Field>& field, const std::shared_ptr<ChunkedArray>& data) : field_(field), data_(data) {} http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/column.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/column.h b/cpp/src/arrow/column.h index 93a34c7..bfcfd8e 100644 --- a/cpp/src/arrow/column.h +++ b/cpp/src/arrow/column.h @@ -69,6 +69,9 @@ class ARROW_EXPORT Column { Column(const std::shared_ptr<Field>& field, const std::shared_ptr<Array>& data); + /// Construct from name and array + Column(const std::string& name, const std::shared_ptr<Array>& data); + int64_t length() const { return data_->length(); } int64_t null_count() const { return data_->null_count(); } http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/compare.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc index f38f8d6..17b8833 100644 --- a/cpp/src/arrow/compare.cc +++ b/cpp/src/arrow/compare.cc @@ -145,6 +145,10 @@ class RangeEqualsVisitor : public ArrayVisitor { Status Visit(const DateArray& left) override { return CompareValues<DateArray>(left); } + Status Visit(const Date32Array& left) override { + return CompareValues<Date32Array>(left); + } + Status Visit(const TimeArray& left) override { return CompareValues<TimeArray>(left); } Status Visit(const TimestampArray& left) override { @@ -381,6 +385,8 @@ class ArrayEqualsVisitor : public RangeEqualsVisitor { Status Visit(const DateArray& left) override { return ComparePrimitive(left); } + Status Visit(const Date32Array& left) override { return ComparePrimitive(left); } + Status Visit(const TimeArray& left) override { return ComparePrimitive(left); } Status Visit(const TimestampArray& left) override { return ComparePrimitive(left); } @@ -622,7 +628,7 @@ class TypeEqualsVisitor : public TypeVisitor { Status Visit(const TimestampType& left) override { const auto& right = static_cast<const TimestampType&>(right_); - result_ = left.unit == right.unit; + result_ = left.unit == right.unit && left.timezone == right.timezone; return Status::OK(); } http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/io/memory.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index 1339a99..5b5c864 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -28,6 +28,7 @@ #include "arrow/buffer.h" #include "arrow/io/interfaces.h" #include "arrow/status.h" +#include "arrow/util/logging.h" namespace arrow { namespace io { @@ -43,9 +44,17 @@ BufferOutputStream::BufferOutputStream(const std::shared_ptr<ResizableBuffer>& b position_(0), mutable_data_(buffer->mutable_data()) {} +Status BufferOutputStream::Create(int64_t initial_capacity, MemoryPool* pool, + std::shared_ptr<BufferOutputStream>* out) { + std::shared_ptr<ResizableBuffer> buffer; + RETURN_NOT_OK(AllocateResizableBuffer(pool, initial_capacity, &buffer)); + *out = std::make_shared<BufferOutputStream>(buffer); + return Status::OK(); +} + BufferOutputStream::~BufferOutputStream() { // This can fail, better to explicitly call close - Close(); + if (buffer_) { Close(); } } Status BufferOutputStream::Close() { @@ -56,12 +65,20 @@ Status BufferOutputStream::Close() { } } +Status BufferOutputStream::Finish(std::shared_ptr<Buffer>* result) { + RETURN_NOT_OK(Close()); + *result = buffer_; + buffer_ = nullptr; + return Status::OK(); +} + Status BufferOutputStream::Tell(int64_t* position) { *position = position_; return Status::OK(); } Status BufferOutputStream::Write(const uint8_t* data, int64_t nbytes) { + DCHECK(buffer_); RETURN_NOT_OK(Reserve(nbytes)); std::memcpy(mutable_data_ + position_, data, nbytes); position_ += nbytes; http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/io/memory.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h index 2d3df42..8280750 100644 --- a/cpp/src/arrow/io/memory.h +++ b/cpp/src/arrow/io/memory.h @@ -43,6 +43,9 @@ class ARROW_EXPORT BufferOutputStream : public OutputStream { public: explicit BufferOutputStream(const std::shared_ptr<ResizableBuffer>& buffer); + static Status Create(int64_t initial_capacity, MemoryPool* pool, + std::shared_ptr<BufferOutputStream>* out); + ~BufferOutputStream(); // Implement the OutputStream interface @@ -50,6 +53,9 @@ class ARROW_EXPORT BufferOutputStream : public OutputStream { Status Tell(int64_t* position) override; Status Write(const uint8_t* data, int64_t nbytes) override; + /// Close the stream and return the buffer + Status Finish(std::shared_ptr<Buffer>* result); + private: // Ensures there is sufficient space available to write nbytes Status Reserve(int64_t nbytes); http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/ipc/adapter.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc index f11c88a..78d5810 100644 --- a/cpp/src/arrow/ipc/adapter.cc +++ b/cpp/src/arrow/ipc/adapter.cc @@ -32,6 +32,7 @@ #include "arrow/ipc/metadata-internal.h" #include "arrow/ipc/metadata.h" #include "arrow/ipc/util.h" +#include "arrow/loader.h" #include "arrow/memory_pool.h" #include "arrow/schema.h" #include "arrow/status.h" @@ -531,12 +532,12 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, MemoryPool* pool) { - DictionaryWriter writer(pool, buffer_start_offset, kMaxIpcRecursionDepth); + DictionaryWriter writer(pool, buffer_start_offset, kMaxNestingDepth); return writer.Write(dictionary_id, dictionary, dst, metadata_length, body_length); } Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) { - RecordBatchWriter writer(default_memory_pool(), 0, kMaxIpcRecursionDepth); + RecordBatchWriter writer(default_memory_pool(), 0, kMaxNestingDepth); RETURN_NOT_OK(writer.GetTotalSize(batch, size)); return Status::OK(); } @@ -544,235 +545,33 @@ Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) { // ---------------------------------------------------------------------- // Record batch read path -struct RecordBatchContext { - const RecordBatchMetadata* metadata; - int buffer_index; - int field_index; - int max_recursion_depth; -}; - -// Traverse the flattened record batch metadata and reassemble the -// corresponding array containers -class ArrayLoader : public TypeVisitor { +class IpcComponentSource : public ArrayComponentSource { public: - ArrayLoader( - const Field& field, RecordBatchContext* context, io::ReadableFileInterface* file) - : field_(field), context_(context), file_(file) {} - - Status Load(std::shared_ptr<Array>* out) { - if (context_->max_recursion_depth <= 0) { - return Status::Invalid("Max recursion depth reached"); - } - - // Load the array - RETURN_NOT_OK(field_.type->Accept(this)); + IpcComponentSource(const RecordBatchMetadata& metadata, io::ReadableFileInterface* file) + : metadata_(metadata), file_(file) {} - *out = std::move(result_); - return Status::OK(); - } - - private: - const Field& field_; - RecordBatchContext* context_; - io::ReadableFileInterface* file_; - - // Used in visitor pattern - std::shared_ptr<Array> result_; - - Status LoadChild(const Field& field, std::shared_ptr<Array>* out) { - ArrayLoader loader(field, context_, file_); - --context_->max_recursion_depth; - RETURN_NOT_OK(loader.Load(out)); - ++context_->max_recursion_depth; - return Status::OK(); - } - - Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) { - BufferMetadata metadata = context_->metadata->buffer(buffer_index); - - if (metadata.length == 0) { + Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) override { + BufferMetadata buffer_meta = metadata_.buffer(buffer_index); + if (buffer_meta.length == 0) { *out = nullptr; return Status::OK(); } else { - return file_->ReadAt(metadata.offset, metadata.length, out); + return file_->ReadAt(buffer_meta.offset, buffer_meta.length, out); } } - Status LoadCommon(FieldMetadata* field_meta, std::shared_ptr<Buffer>* null_bitmap) { + Status GetFieldMetadata(int field_index, FieldMetadata* metadata) override { // pop off a field - if (context_->field_index >= context_->metadata->num_fields()) { + if (field_index >= metadata_.num_fields()) { return Status::Invalid("Ran out of field metadata, likely malformed"); } - - // This only contains the length and null count, which we need to figure - // out what to do with the buffers. For example, if null_count == 0, then - // we can skip that buffer without reading from shared memory - *field_meta = context_->metadata->field(context_->field_index++); - - // extract null_bitmap which is common to all arrays - if (field_meta->null_count == 0) { - *null_bitmap = nullptr; - } else { - RETURN_NOT_OK(GetBuffer(context_->buffer_index, null_bitmap)); - } - context_->buffer_index++; - return Status::OK(); - } - - Status LoadPrimitive(const DataType& type) { - FieldMetadata field_meta; - std::shared_ptr<Buffer> null_bitmap, data; - - RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); - if (field_meta.length > 0) { - RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &data)); - } else { - context_->buffer_index++; - data.reset(new Buffer(nullptr, 0)); - } - return MakePrimitiveArray(field_.type, field_meta.length, data, null_bitmap, - field_meta.null_count, 0, &result_); - } - - template <typename CONTAINER> - Status LoadBinary() { - FieldMetadata field_meta; - std::shared_ptr<Buffer> null_bitmap, offsets, values; - - RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); - if (field_meta.length > 0) { - RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &offsets)); - RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &values)); - } else { - context_->buffer_index += 2; - offsets = values = nullptr; - } - - result_ = std::make_shared<CONTAINER>( - field_meta.length, offsets, values, null_bitmap, field_meta.null_count); - return Status::OK(); - } - - Status Visit(const BooleanType& type) override { return LoadPrimitive(type); } - - Status Visit(const Int8Type& type) override { return LoadPrimitive(type); } - - Status Visit(const Int16Type& type) override { return LoadPrimitive(type); } - - Status Visit(const Int32Type& type) override { return LoadPrimitive(type); } - - Status Visit(const Int64Type& type) override { return LoadPrimitive(type); } - - Status Visit(const UInt8Type& type) override { return LoadPrimitive(type); } - - Status Visit(const UInt16Type& type) override { return LoadPrimitive(type); } - - Status Visit(const UInt32Type& type) override { return LoadPrimitive(type); } - - Status Visit(const UInt64Type& type) override { return LoadPrimitive(type); } - - Status Visit(const HalfFloatType& type) override { return LoadPrimitive(type); } - - Status Visit(const FloatType& type) override { return LoadPrimitive(type); } - - Status Visit(const DoubleType& type) override { return LoadPrimitive(type); } - - Status Visit(const StringType& type) override { return LoadBinary<StringArray>(); } - - Status Visit(const BinaryType& type) override { return LoadBinary<BinaryArray>(); } - - Status Visit(const DateType& type) override { return LoadPrimitive(type); } - - Status Visit(const TimeType& type) override { return LoadPrimitive(type); } - - Status Visit(const TimestampType& type) override { return LoadPrimitive(type); } - - Status Visit(const ListType& type) override { - FieldMetadata field_meta; - std::shared_ptr<Buffer> null_bitmap, offsets; - - RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); - if (field_meta.length > 0) { - RETURN_NOT_OK(GetBuffer(context_->buffer_index, &offsets)); - } else { - offsets = nullptr; - } - ++context_->buffer_index; - - const int num_children = type.num_children(); - if (num_children != 1) { - std::stringstream ss; - ss << "Wrong number of children: " << num_children; - return Status::Invalid(ss.str()); - } - std::shared_ptr<Array> values_array; - - RETURN_NOT_OK(LoadChild(*type.child(0).get(), &values_array)); - - result_ = std::make_shared<ListArray>(field_.type, field_meta.length, offsets, - values_array, null_bitmap, field_meta.null_count); - return Status::OK(); - } - - Status LoadChildren(std::vector<std::shared_ptr<Field>> child_fields, - std::vector<std::shared_ptr<Array>>* arrays) { - arrays->reserve(static_cast<int>(child_fields.size())); - - for (const auto& child_field : child_fields) { - std::shared_ptr<Array> field_array; - RETURN_NOT_OK(LoadChild(*child_field.get(), &field_array)); - arrays->emplace_back(field_array); - } + *metadata = metadata_.field(field_index); return Status::OK(); } - Status Visit(const StructType& type) override { - FieldMetadata field_meta; - std::shared_ptr<Buffer> null_bitmap; - RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); - - std::vector<std::shared_ptr<Array>> fields; - RETURN_NOT_OK(LoadChildren(type.children(), &fields)); - - result_ = std::make_shared<StructArray>( - field_.type, field_meta.length, fields, null_bitmap, field_meta.null_count); - return Status::OK(); - } - - Status Visit(const UnionType& type) override { - FieldMetadata field_meta; - std::shared_ptr<Buffer> null_bitmap, type_ids, offsets; - - RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); - if (field_meta.length > 0) { - RETURN_NOT_OK(GetBuffer(context_->buffer_index, &type_ids)); - if (type.mode == UnionMode::DENSE) { - RETURN_NOT_OK(GetBuffer(context_->buffer_index + 1, &offsets)); - } - } - context_->buffer_index += type.mode == UnionMode::DENSE ? 2 : 1; - - std::vector<std::shared_ptr<Array>> fields; - RETURN_NOT_OK(LoadChildren(type.children(), &fields)); - - result_ = std::make_shared<UnionArray>(field_.type, field_meta.length, fields, - type_ids, offsets, null_bitmap, field_meta.null_count); - return Status::OK(); - } - - Status Visit(const DictionaryType& type) override { - FieldMetadata field_meta; - std::shared_ptr<Buffer> null_bitmap, indices_data; - RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); - RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &indices_data)); - - std::shared_ptr<Array> indices; - RETURN_NOT_OK(MakePrimitiveArray(type.index_type(), field_meta.length, indices_data, - null_bitmap, field_meta.null_count, 0, &indices)); - - result_ = std::make_shared<DictionaryArray>(field_.type, indices); - return Status::OK(); - }; + private: + const RecordBatchMetadata& metadata_; + io::ReadableFileInterface* file_; }; class RecordBatchReader { @@ -788,17 +587,15 @@ class RecordBatchReader { Status Read(std::shared_ptr<RecordBatch>* out) { std::vector<std::shared_ptr<Array>> arrays(schema_->num_fields()); - // The field_index and buffer_index are incremented in the ArrayLoader - // based on how much of the batch is "consumed" (through nested data - // reconstruction, for example) - context_.metadata = &metadata_; - context_.field_index = 0; - context_.buffer_index = 0; - context_.max_recursion_depth = max_recursion_depth_; + IpcComponentSource source(metadata_, file_); + ArrayLoaderContext context; + context.source = &source; + context.field_index = 0; + context.buffer_index = 0; + context.max_recursion_depth = max_recursion_depth_; for (int i = 0; i < schema_->num_fields(); ++i) { - ArrayLoader loader(*schema_->field(i).get(), &context_, file_); - RETURN_NOT_OK(loader.Load(&arrays[i])); + RETURN_NOT_OK(LoadArray(schema_->field(i)->type, &context, &arrays[i])); } *out = std::make_shared<RecordBatch>(schema_, metadata_.length(), arrays); @@ -806,7 +603,6 @@ class RecordBatchReader { } private: - RecordBatchContext context_; const RecordBatchMetadata& metadata_; std::shared_ptr<Schema> schema_; int max_recursion_depth_; @@ -816,7 +612,7 @@ class RecordBatchReader { Status ReadRecordBatch(const RecordBatchMetadata& metadata, const std::shared_ptr<Schema>& schema, io::ReadableFileInterface* file, std::shared_ptr<RecordBatch>* out) { - return ReadRecordBatch(metadata, schema, kMaxIpcRecursionDepth, file, out); + return ReadRecordBatch(metadata, schema, kMaxNestingDepth, file, out); } Status ReadRecordBatch(const RecordBatchMetadata& metadata, http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/ipc/adapter.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h index 933d3a4..21d814d 100644 --- a/cpp/src/arrow/ipc/adapter.h +++ b/cpp/src/arrow/ipc/adapter.h @@ -26,6 +26,7 @@ #include <vector> #include "arrow/ipc/metadata.h" +#include "arrow/loader.h" #include "arrow/util/visibility.h" namespace arrow { @@ -47,11 +48,6 @@ namespace ipc { // ---------------------------------------------------------------------- // Write path -// -// ARROW-109: We set this number arbitrarily to help catch user mistakes. For -// deeply nested schemas, it is expected the user will indicate explicitly the -// maximum allowed recursion depth -constexpr int kMaxIpcRecursionDepth = 64; // Write the RecordBatch (collection of equal-length Arrow arrays) to the // output stream in a contiguous block. The record batch metadata is written as @@ -75,7 +71,7 @@ constexpr int kMaxIpcRecursionDepth = 64; // padding bytes Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, - MemoryPool* pool, int max_recursion_depth = kMaxIpcRecursionDepth); + MemoryPool* pool, int max_recursion_depth = kMaxNestingDepth); // Write Array as a DictionaryBatch message Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary, http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/ipc/metadata.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc index 2ba44ac..695e788 100644 --- a/cpp/src/arrow/ipc/metadata.cc +++ b/cpp/src/arrow/ipc/metadata.cc @@ -289,6 +289,7 @@ FieldMetadata RecordBatchMetadata::field(int i) const { FieldMetadata result; result.length = node->length(); result.null_count = node->null_count(); + result.offset = 0; return result; } http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/ipc/metadata.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h index f12529b..f6a0a3a 100644 --- a/cpp/src/arrow/ipc/metadata.h +++ b/cpp/src/arrow/ipc/metadata.h @@ -25,6 +25,7 @@ #include <unordered_map> #include <vector> +#include "arrow/loader.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" @@ -135,12 +136,6 @@ class ARROW_EXPORT SchemaMetadata { DISALLOW_COPY_AND_ASSIGN(SchemaMetadata); }; -// Field metadata -struct ARROW_EXPORT FieldMetadata { - int32_t length; - int32_t null_count; -}; - struct ARROW_EXPORT BufferMetadata { int32_t page; int64_t offset; http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/loader.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/loader.cc b/cpp/src/arrow/loader.cc new file mode 100644 index 0000000..3cb51ae --- /dev/null +++ b/cpp/src/arrow/loader.cc @@ -0,0 +1,285 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/loader.h" + +#include <cstdint> +#include <memory> +#include <sstream> +#include <vector> + +#include "arrow/array.h" +#include "arrow/buffer.h" +#include "arrow/type.h" +#include "arrow/type_traits.h" +#include "arrow/util/logging.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class Array; +struct DataType; +class Status; + +class ArrayLoader : public TypeVisitor { + public: + ArrayLoader(const std::shared_ptr<DataType>& type, ArrayLoaderContext* context) + : type_(type), context_(context) {} + + Status Load(std::shared_ptr<Array>* out) { + if (context_->max_recursion_depth <= 0) { + return Status::Invalid("Max recursion depth reached"); + } + + // Load the array + RETURN_NOT_OK(type_->Accept(this)); + + *out = std::move(result_); + return Status::OK(); + } + + Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) { + return context_->source->GetBuffer(buffer_index, out); + } + + Status LoadCommon(FieldMetadata* field_meta, std::shared_ptr<Buffer>* null_bitmap) { + // This only contains the length and null count, which we need to figure + // out what to do with the buffers. For example, if null_count == 0, then + // we can skip that buffer without reading from shared memory + RETURN_NOT_OK( + context_->source->GetFieldMetadata(context_->field_index++, field_meta)); + + // extract null_bitmap which is common to all arrays + if (field_meta->null_count == 0) { + *null_bitmap = nullptr; + } else { + RETURN_NOT_OK(GetBuffer(context_->buffer_index, null_bitmap)); + } + context_->buffer_index++; + return Status::OK(); + } + + template <typename TYPE> + Status LoadPrimitive() { + using ArrayType = typename TypeTraits<TYPE>::ArrayType; + + FieldMetadata field_meta; + std::shared_ptr<Buffer> null_bitmap, data; + + RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); + if (field_meta.length > 0) { + RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &data)); + } else { + context_->buffer_index++; + data.reset(new Buffer(nullptr, 0)); + } + result_ = std::make_shared<ArrayType>(type_, field_meta.length, data, null_bitmap, + field_meta.null_count, field_meta.offset); + return Status::OK(); + } + + template <typename CONTAINER> + Status LoadBinary() { + FieldMetadata field_meta; + std::shared_ptr<Buffer> null_bitmap, offsets, values; + + RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); + if (field_meta.length > 0) { + RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &offsets)); + RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &values)); + } else { + context_->buffer_index += 2; + offsets = values = nullptr; + } + + result_ = std::make_shared<CONTAINER>( + field_meta.length, offsets, values, null_bitmap, field_meta.null_count); + return Status::OK(); + } + + Status LoadChild(const Field& field, std::shared_ptr<Array>* out) { + ArrayLoader loader(field.type, context_); + --context_->max_recursion_depth; + RETURN_NOT_OK(loader.Load(out)); + ++context_->max_recursion_depth; + return Status::OK(); + } + + Status LoadChildren(std::vector<std::shared_ptr<Field>> child_fields, + std::vector<std::shared_ptr<Array>>* arrays) { + arrays->reserve(static_cast<int>(child_fields.size())); + + for (const auto& child_field : child_fields) { + std::shared_ptr<Array> field_array; + RETURN_NOT_OK(LoadChild(*child_field.get(), &field_array)); + arrays->emplace_back(field_array); + } + return Status::OK(); + } + +#define VISIT_PRIMITIVE(TYPE) \ + Status Visit(const TYPE& type) override { return LoadPrimitive<TYPE>(); } + + VISIT_PRIMITIVE(BooleanType); + VISIT_PRIMITIVE(Int8Type); + VISIT_PRIMITIVE(Int16Type); + VISIT_PRIMITIVE(Int32Type); + VISIT_PRIMITIVE(Int64Type); + VISIT_PRIMITIVE(UInt8Type); + VISIT_PRIMITIVE(UInt16Type); + VISIT_PRIMITIVE(UInt32Type); + VISIT_PRIMITIVE(UInt64Type); + VISIT_PRIMITIVE(HalfFloatType); + VISIT_PRIMITIVE(FloatType); + VISIT_PRIMITIVE(DoubleType); + VISIT_PRIMITIVE(DateType); + VISIT_PRIMITIVE(Date32Type); + VISIT_PRIMITIVE(TimeType); + VISIT_PRIMITIVE(TimestampType); + +#undef VISIT_PRIMITIVE + + Status Visit(const StringType& type) override { return LoadBinary<StringArray>(); } + + Status Visit(const BinaryType& type) override { return LoadBinary<BinaryArray>(); } + + Status Visit(const ListType& type) override { + FieldMetadata field_meta; + std::shared_ptr<Buffer> null_bitmap, offsets; + + RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); + if (field_meta.length > 0) { + RETURN_NOT_OK(GetBuffer(context_->buffer_index, &offsets)); + } else { + offsets = nullptr; + } + ++context_->buffer_index; + + const int num_children = type.num_children(); + if (num_children != 1) { + std::stringstream ss; + ss << "Wrong number of children: " << num_children; + return Status::Invalid(ss.str()); + } + std::shared_ptr<Array> values_array; + + RETURN_NOT_OK(LoadChild(*type.child(0).get(), &values_array)); + + result_ = std::make_shared<ListArray>(type_, field_meta.length, offsets, values_array, + null_bitmap, field_meta.null_count); + return Status::OK(); + } + + Status Visit(const StructType& type) override { + FieldMetadata field_meta; + std::shared_ptr<Buffer> null_bitmap; + RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); + + std::vector<std::shared_ptr<Array>> fields; + RETURN_NOT_OK(LoadChildren(type.children(), &fields)); + + result_ = std::make_shared<StructArray>( + type_, field_meta.length, fields, null_bitmap, field_meta.null_count); + return Status::OK(); + } + + Status Visit(const UnionType& type) override { + FieldMetadata field_meta; + std::shared_ptr<Buffer> null_bitmap, type_ids, offsets; + + RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); + if (field_meta.length > 0) { + RETURN_NOT_OK(GetBuffer(context_->buffer_index, &type_ids)); + if (type.mode == UnionMode::DENSE) { + RETURN_NOT_OK(GetBuffer(context_->buffer_index + 1, &offsets)); + } + } + context_->buffer_index += type.mode == UnionMode::DENSE ? 2 : 1; + + std::vector<std::shared_ptr<Array>> fields; + RETURN_NOT_OK(LoadChildren(type.children(), &fields)); + + result_ = std::make_shared<UnionArray>(type_, field_meta.length, fields, type_ids, + offsets, null_bitmap, field_meta.null_count); + return Status::OK(); + } + + Status Visit(const DictionaryType& type) override { + std::shared_ptr<Array> indices; + RETURN_NOT_OK(LoadArray(type.index_type(), context_, &indices)); + result_ = std::make_shared<DictionaryArray>(type_, indices); + return Status::OK(); + }; + + std::shared_ptr<Array> result() const { return result_; } + + private: + const std::shared_ptr<DataType> type_; + ArrayLoaderContext* context_; + + // Used in visitor pattern + std::shared_ptr<Array> result_; +}; + +Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& type, + ArrayComponentSource* source, std::shared_ptr<Array>* out) { + ArrayLoaderContext context; + context.source = source; + context.field_index = context.buffer_index = 0; + context.max_recursion_depth = kMaxNestingDepth; + return LoadArray(type, &context, out); +} + +Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& type, + ArrayLoaderContext* context, std::shared_ptr<Array>* out) { + ArrayLoader loader(type, context); + RETURN_NOT_OK(loader.Load(out)); + + return Status::OK(); +} + +class InMemorySource : public ArrayComponentSource { + public: + InMemorySource(const std::vector<FieldMetadata>& fields, + const std::vector<std::shared_ptr<Buffer>>& buffers) + : fields_(fields), buffers_(buffers) {} + + Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) { + DCHECK(buffer_index < static_cast<int>(buffers_.size())); + *out = buffers_[buffer_index]; + return Status::OK(); + } + + Status GetFieldMetadata(int field_index, FieldMetadata* metadata) { + DCHECK(field_index < static_cast<int>(fields_.size())); + *metadata = fields_[field_index]; + return Status::OK(); + } + + private: + const std::vector<FieldMetadata>& fields_; + const std::vector<std::shared_ptr<Buffer>>& buffers_; +}; + +Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& type, + const std::vector<FieldMetadata>& fields, + const std::vector<std::shared_ptr<Buffer>>& buffers, std::shared_ptr<Array>* out) { + InMemorySource source(fields, buffers); + return LoadArray(type, &source, out); +} + +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/loader.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/loader.h b/cpp/src/arrow/loader.h new file mode 100644 index 0000000..b4949f2 --- /dev/null +++ b/cpp/src/arrow/loader.h @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Function for constructing Array array objects from metadata and raw memory +// buffers + +#ifndef ARROW_LOADER_H +#define ARROW_LOADER_H + +#include <cstdint> +#include <memory> +#include <string> +#include <vector> + +#include "arrow/status.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class Array; +class Buffer; +struct DataType; + +// ARROW-109: We set this number arbitrarily to help catch user mistakes. For +// deeply nested schemas, it is expected the user will indicate explicitly the +// maximum allowed recursion depth +constexpr int kMaxNestingDepth = 64; + +struct ARROW_EXPORT FieldMetadata { + int64_t length; + int64_t null_count; + int64_t offset; +}; + +/// Implement this to create new types of Arrow data loaders +class ARROW_EXPORT ArrayComponentSource { + public: + virtual ~ArrayComponentSource() = default; + + virtual Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) = 0; + virtual Status GetFieldMetadata(int field_index, FieldMetadata* metadata) = 0; +}; + +/// Bookkeeping struct for loading array objects from their constituent pieces of raw data +/// +/// The field_index and buffer_index are incremented in the ArrayLoader +/// based on how much of the batch is "consumed" (through nested data +/// reconstruction, for example) +struct ArrayLoaderContext { + ArrayComponentSource* source; + int buffer_index; + int field_index; + int max_recursion_depth; +}; + +/// Construct an Array container from type metadata and a collection of memory +/// buffers +/// +/// \param[in] field the data type of the array being loaded +/// \param[in] source an implementation of ArrayComponentSource +/// \param[out] out the constructed array +/// \return Status indicating success or failure +Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& type, + ArrayComponentSource* source, std::shared_ptr<Array>* out); + +Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& field, + ArrayLoaderContext* context, std::shared_ptr<Array>* out); + +Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& type, + const std::vector<FieldMetadata>& fields, + const std::vector<std::shared_ptr<Buffer>>& buffers, std::shared_ptr<Array>* out); + +} // namespace arrow + +#endif // ARROW_LOADER_H http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/pretty_print.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc index 7e69e42..2508fa5 100644 --- a/cpp/src/arrow/pretty_print.cc +++ b/cpp/src/arrow/pretty_print.cc @@ -145,9 +145,11 @@ class ArrayPrinter : public ArrayVisitor { Status Visit(const BinaryArray& array) override { return WriteVarBytes(array); } - Status Visit(const DateArray& array) override { return Status::NotImplemented("date"); } + Status Visit(const DateArray& array) override { return WritePrimitive(array); } - Status Visit(const TimeArray& array) override { return Status::NotImplemented("time"); } + Status Visit(const Date32Array& array) override { return WritePrimitive(array); } + + Status Visit(const TimeArray& array) override { return WritePrimitive(array); } Status Visit(const TimestampArray& array) override { return Status::NotImplemented("timestamp"); http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/type.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc index 7e5f13a..4679a2f 100644 --- a/cpp/src/arrow/type.cc +++ b/cpp/src/arrow/type.cc @@ -54,9 +54,7 @@ bool DataType::Equals(const DataType& other) const { } bool DataType::Equals(const std::shared_ptr<DataType>& other) const { - if (!other) { - return false; - } + if (!other) { return false; } return Equals(*other.get()); } @@ -106,6 +104,10 @@ std::string DateType::ToString() const { return std::string("date"); } +std::string Date32Type::ToString() const { + return std::string("date32"); +} + // ---------------------------------------------------------------------- // Union type @@ -135,11 +137,12 @@ std::string UnionType::ToString() const { // ---------------------------------------------------------------------- // DictionaryType -DictionaryType::DictionaryType( - const std::shared_ptr<DataType>& index_type, const std::shared_ptr<Array>& dictionary) +DictionaryType::DictionaryType(const std::shared_ptr<DataType>& index_type, + const std::shared_ptr<Array>& dictionary, bool ordered) : FixedWidthType(Type::DICTIONARY), index_type_(index_type), - dictionary_(dictionary) {} + dictionary_(dictionary), + ordered_(ordered) {} int DictionaryType::bit_width() const { return static_cast<const FixedWidthType*>(index_type_.get())->bit_width(); @@ -178,6 +181,7 @@ ACCEPT_VISITOR(StructType); ACCEPT_VISITOR(DecimalType); ACCEPT_VISITOR(UnionType); ACCEPT_VISITOR(DateType); +ACCEPT_VISITOR(Date32Type); ACCEPT_VISITOR(TimeType); ACCEPT_VISITOR(TimestampType); ACCEPT_VISITOR(IntervalType); @@ -205,11 +209,16 @@ TYPE_FACTORY(float64, DoubleType); TYPE_FACTORY(utf8, StringType); TYPE_FACTORY(binary, BinaryType); TYPE_FACTORY(date, DateType); +TYPE_FACTORY(date32, Date32Type); std::shared_ptr<DataType> timestamp(TimeUnit unit) { return std::make_shared<TimestampType>(unit); } +std::shared_ptr<DataType> timestamp(const std::string& timezone, TimeUnit unit) { + return std::make_shared<TimestampType>(timezone, unit); +} + std::shared_ptr<DataType> time(TimeUnit unit) { return std::make_shared<TimeType>(unit); } @@ -313,6 +322,7 @@ TYPE_VISITOR_DEFAULT(DoubleType); TYPE_VISITOR_DEFAULT(StringType); TYPE_VISITOR_DEFAULT(BinaryType); TYPE_VISITOR_DEFAULT(DateType); +TYPE_VISITOR_DEFAULT(Date32Type); TYPE_VISITOR_DEFAULT(TimeType); TYPE_VISITOR_DEFAULT(TimestampType); TYPE_VISITOR_DEFAULT(IntervalType); http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/type.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 9b1ab32..aa0d70e 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -67,9 +67,12 @@ struct Type { // Variable-length bytes (no guarantee of UTF8-ness) BINARY, - // By default, int32 days since the UNIX epoch + // int64_t milliseconds since the UNIX epoch DATE, + // int32_t days since the UNIX epoch + DATE32, + // Exact timestamp encoded with int64 since UNIX epoch // Default unit millisecond TIMESTAMP, @@ -132,6 +135,7 @@ class ARROW_EXPORT TypeVisitor { virtual Status Visit(const StringType& type); virtual Status Visit(const BinaryType& type); virtual Status Visit(const DateType& type); + virtual Status Visit(const Date32Type& type); virtual Status Visit(const TimeType& type); virtual Status Visit(const TimestampType& type); virtual Status Visit(const IntervalType& type); @@ -425,6 +429,7 @@ struct ARROW_EXPORT UnionType : public DataType { // ---------------------------------------------------------------------- // Date and time types +/// Date as int64_t milliseconds since UNIX epoch struct ARROW_EXPORT DateType : public FixedWidthType { static constexpr Type::type type_id = Type::DATE; @@ -439,6 +444,20 @@ struct ARROW_EXPORT DateType : public FixedWidthType { static std::string name() { return "date"; } }; +/// Date as int32_t days since UNIX epoch +struct ARROW_EXPORT Date32Type : public FixedWidthType { + static constexpr Type::type type_id = Type::DATE32; + + using c_type = int32_t; + + Date32Type() : FixedWidthType(Type::DATE32) {} + + int bit_width() const override { return static_cast<int>(sizeof(c_type) * 8); } + + Status Accept(TypeVisitor* visitor) const override; + std::string ToString() const override; +}; + enum class TimeUnit : char { SECOND = 0, MILLI = 1, MICRO = 2, NANO = 3 }; struct ARROW_EXPORT TimeType : public FixedWidthType { @@ -467,16 +486,20 @@ struct ARROW_EXPORT TimestampType : public FixedWidthType { int bit_width() const override { return static_cast<int>(sizeof(int64_t) * 8); } - TimeUnit unit; - explicit TimestampType(TimeUnit unit = TimeUnit::MILLI) : FixedWidthType(Type::TIMESTAMP), unit(unit) {} + explicit TimestampType(const std::string& timezone, TimeUnit unit = TimeUnit::MILLI) + : FixedWidthType(Type::TIMESTAMP), unit(unit), timezone(timezone) {} + TimestampType(const TimestampType& other) : TimestampType(other.unit) {} Status Accept(TypeVisitor* visitor) const override; std::string ToString() const override { return name(); } static std::string name() { return "timestamp"; } + + TimeUnit unit; + std::string timezone; }; struct ARROW_EXPORT IntervalType : public FixedWidthType { @@ -507,7 +530,7 @@ class ARROW_EXPORT DictionaryType : public FixedWidthType { static constexpr Type::type type_id = Type::DICTIONARY; DictionaryType(const std::shared_ptr<DataType>& index_type, - const std::shared_ptr<Array>& dictionary); + const std::shared_ptr<Array>& dictionary, bool ordered = false); int bit_width() const override; @@ -518,11 +541,13 @@ class ARROW_EXPORT DictionaryType : public FixedWidthType { Status Accept(TypeVisitor* visitor) const override; std::string ToString() const override; + bool ordered() const { return ordered_; } + private: // Must be an integer type (not currently checked) std::shared_ptr<DataType> index_type_; - std::shared_ptr<Array> dictionary_; + bool ordered_; }; // ---------------------------------------------------------------------- @@ -532,6 +557,8 @@ std::shared_ptr<DataType> ARROW_EXPORT list(const std::shared_ptr<Field>& value_ std::shared_ptr<DataType> ARROW_EXPORT list(const std::shared_ptr<DataType>& value_type); std::shared_ptr<DataType> ARROW_EXPORT timestamp(TimeUnit unit); +std::shared_ptr<DataType> ARROW_EXPORT timestamp( + const std::string& timezone, TimeUnit unit); std::shared_ptr<DataType> ARROW_EXPORT time(TimeUnit unit); std::shared_ptr<DataType> ARROW_EXPORT struct_( @@ -595,6 +622,7 @@ static inline bool is_primitive(Type::type type_id) { case Type::FLOAT: case Type::DOUBLE: case Type::DATE: + case Type::DATE32: case Type::TIMESTAMP: case Type::TIME: case Type::INTERVAL: http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/type_fwd.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/type_fwd.h b/cpp/src/arrow/type_fwd.h index fc4ad3d..e53afe1 100644 --- a/cpp/src/arrow/type_fwd.h +++ b/cpp/src/arrow/type_fwd.h @@ -95,6 +95,10 @@ struct DateType; using DateArray = NumericArray<DateType>; using DateBuilder = NumericBuilder<DateType>; +struct Date32Type; +using Date32Array = NumericArray<Date32Type>; +using Date32Builder = NumericBuilder<Date32Type>; + struct TimeType; using TimeArray = NumericArray<TimeType>; using TimeBuilder = NumericBuilder<TimeType>; @@ -125,6 +129,7 @@ std::shared_ptr<DataType> ARROW_EXPORT float64(); std::shared_ptr<DataType> ARROW_EXPORT utf8(); std::shared_ptr<DataType> ARROW_EXPORT binary(); std::shared_ptr<DataType> ARROW_EXPORT date(); +std::shared_ptr<DataType> ARROW_EXPORT date32(); } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/type_traits.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h index d6687c1..2cd1420 100644 --- a/cpp/src/arrow/type_traits.h +++ b/cpp/src/arrow/type_traits.h @@ -131,6 +131,18 @@ struct TypeTraits<DateType> { }; template <> +struct TypeTraits<Date32Type> { + using ArrayType = Date32Array; + using BuilderType = Date32Builder; + + static inline int64_t bytes_required(int64_t elements) { + return elements * sizeof(int32_t); + } + constexpr static bool is_parameter_free = true; + static inline std::shared_ptr<DataType> type_singleton() { return date32(); } +}; + +template <> struct TypeTraits<TimestampType> { using ArrayType = TimestampArray; // using BuilderType = TimestampBuilder; http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/python/pyarrow/array.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx index 7787e95..6a6b4ba 100644 --- a/python/pyarrow/array.pyx +++ b/python/pyarrow/array.pyx @@ -54,7 +54,8 @@ cdef class Array: self.type.init(self.sp_array.get().type()) @staticmethod - def from_pandas(obj, mask=None, timestamps_to_ms=False, Field field=None, MemoryPool memory_pool=None): + def from_pandas(obj, mask=None, timestamps_to_ms=False, Field field=None, + MemoryPool memory_pool=None): """ Convert pandas.Series to an Arrow Array. @@ -75,8 +76,9 @@ cdef class Array: Notes ----- - Localized timestamps will currently be returned as UTC (pandas's native representation). - Timezone-naive data will be implicitly interpreted as UTC. + Localized timestamps will currently be returned as UTC (pandas's native + representation). Timezone-naive data will be implicitly interpreted as + UTC. Examples -------- @@ -119,9 +121,9 @@ cdef class Array: series_values = get_series_values(obj) if isinstance(series_values, pd.Categorical): - return DictionaryArray.from_arrays(series_values.codes, - series_values.categories.values, - mask=mask, memory_pool=memory_pool) + return DictionaryArray.from_arrays( + series_values.codes, series_values.categories.values, + mask=mask, memory_pool=memory_pool) else: if series_values.dtype.type == np.datetime64 and timestamps_to_ms: series_values = series_values.astype('datetime64[ms]') @@ -134,7 +136,8 @@ cdef class Array: return box_array(out) @staticmethod - def from_list(object list_obj, DataType type=None, MemoryPool memory_pool=None): + def from_list(object list_obj, DataType type=None, + MemoryPool memory_pool=None): """ Convert Python list to Arrow array @@ -358,7 +361,8 @@ cdef class BinaryArray(Array): cdef class DictionaryArray(Array): @staticmethod - def from_arrays(indices, dictionary, mask=None, MemoryPool memory_pool=None): + def from_arrays(indices, dictionary, mask=None, + MemoryPool memory_pool=None): """ Construct Arrow DictionaryArray from array of indices (must be non-negative integers) and corresponding array of dictionary values @@ -380,8 +384,15 @@ cdef class DictionaryArray(Array): shared_ptr[CDataType] c_type shared_ptr[CArray] c_result - arrow_indices = Array.from_pandas(indices, mask=mask, memory_pool=memory_pool) - arrow_dictionary = Array.from_pandas(dictionary, memory_pool=memory_pool) + if mask is None: + mask = indices == -1 + else: + mask = mask | (indices == -1) + + arrow_indices = Array.from_pandas(indices, mask=mask, + memory_pool=memory_pool) + arrow_dictionary = Array.from_pandas(dictionary, + memory_pool=memory_pool) if not isinstance(arrow_indices, IntegerArray): raise ValueError('Indices must be integer type') http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/python/pyarrow/table.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx index 93bc6dd..ad5af1b 100644 --- a/python/pyarrow/table.pyx +++ b/python/pyarrow/table.pyx @@ -359,7 +359,8 @@ cdef class RecordBatch: """ Number of rows - Due to the definition of a RecordBatch, all columns have the same number of rows. + Due to the definition of a RecordBatch, all columns have the same + number of rows. Returns ------- http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/python/pyarrow/tests/test_convert_pandas.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py index 960653d..953fa2c 100644 --- a/python/pyarrow/tests/test_convert_pandas.py +++ b/python/pyarrow/tests/test_convert_pandas.py @@ -81,7 +81,11 @@ class TestPandasConversion(unittest.TestCase): arr = A.Array.from_pandas(values, timestamps_to_ms=timestamps_to_ms, field=field) result = arr.to_pandas() - tm.assert_series_equal(pd.Series(result), pd.Series(values), check_names=False) + + assert arr.null_count == pd.isnull(values).sum() + + tm.assert_series_equal(pd.Series(result), pd.Series(values), + check_names=False) def test_float_no_nulls(self): data = {} http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/python/src/pyarrow/adapters/pandas.cc ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index cadb53e..c707ada 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -34,6 +34,7 @@ #include <unordered_map> #include "arrow/api.h" +#include "arrow/loader.h" #include "arrow/status.h" #include "arrow/type_fwd.h" #include "arrow/type_traits.h" @@ -610,6 +611,7 @@ class PandasBlock { DOUBLE, BOOL, DATETIME, + DATETIME_WITH_TZ, CATEGORICAL }; @@ -1157,7 +1159,7 @@ class DataFrameBlockCreator { } int block_placement = 0; - if (column_type == Type::DICTIONARY) { + if (output_type == PandasBlock::CATEGORICAL) { std::shared_ptr<PandasBlock> block; RETURN_NOT_OK(MakeCategoricalBlock(col->type(), table_->num_rows(), &block)); categorical_blocks_[i] = block; @@ -1518,15 +1520,16 @@ inline Status ArrowSerializer<TYPE>::Convert(std::shared_ptr<Array>* out) { null_count = ValuesToBitmap<TYPE>(PyArray_DATA(arr_), length_, null_bitmap_data_); } - // For readability - constexpr int64_t kOffset = 0; - RETURN_NOT_OK(ConvertData()); std::shared_ptr<DataType> type; RETURN_NOT_OK(MakeDataType(&type)); - RETURN_NOT_OK( - MakePrimitiveArray(type, length_, data_, null_bitmap_, null_count, kOffset, out)); - return Status::OK(); + + std::vector<arrow::FieldMetadata> fields(1); + fields[0].length = length_; + fields[0].null_count = null_count; + fields[0].offset = 0; + + return arrow::LoadArray(type, fields, {null_bitmap_, data_}, out); } template <>