Repository: arrow Updated Branches: refs/heads/master 5e279f0a7 -> d28f1c1e0
http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/stream.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/stream.cc b/cpp/src/arrow/ipc/stream.cc index 72eb134..7f5c993 100644 --- a/cpp/src/arrow/ipc/stream.cc +++ b/cpp/src/arrow/ipc/stream.cc @@ -20,17 +20,20 @@ #include <cstdint> #include <cstring> #include <sstream> +#include <string> #include <vector> #include "arrow/buffer.h" #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" #include "arrow/ipc/adapter.h" +#include "arrow/ipc/metadata-internal.h" #include "arrow/ipc/metadata.h" #include "arrow/ipc/util.h" #include "arrow/memory_pool.h" #include "arrow/schema.h" #include "arrow/status.h" +#include "arrow/table.h" #include "arrow/util/logging.h" namespace arrow { @@ -39,11 +42,10 @@ namespace ipc { // ---------------------------------------------------------------------- // Stream writer implementation -StreamWriter::~StreamWriter() {} - StreamWriter::StreamWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema) : sink_(sink), schema_(schema), + dictionary_memo_(std::make_shared<DictionaryMemo>()), pool_(default_memory_pool()), position_(-1), started_(false) {} @@ -107,7 +109,7 @@ Status StreamWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& Status StreamWriter::Start() { std::shared_ptr<Buffer> schema_fb; - RETURN_NOT_OK(WriteSchema(*schema_, &schema_fb)); + RETURN_NOT_OK(WriteSchemaMessage(*schema_, dictionary_memo_.get(), &schema_fb)); int32_t flatbuffer_size = schema_fb->size(); RETURN_NOT_OK( @@ -115,14 +117,41 @@ Status StreamWriter::Start() { // Write the flatbuffer RETURN_NOT_OK(Write(schema_fb->data(), flatbuffer_size)); + + // If there are any dictionaries, write them as the next messages + RETURN_NOT_OK(WriteDictionaries()); + started_ = true; return Status::OK(); } Status StreamWriter::WriteRecordBatch(const RecordBatch& batch) { - // Pass FileBlock, but results not used - FileBlock dummy_block; - return WriteRecordBatch(batch, &dummy_block); + // Push an empty FileBlock. Can be written in the footer later + record_batches_.emplace_back(0, 0, 0); + return WriteRecordBatch(batch, &record_batches_[record_batches_.size() - 1]); +} + +Status StreamWriter::WriteDictionaries() { + const DictionaryMap& id_to_dictionary = dictionary_memo_->id_to_dictionary(); + + dictionaries_.resize(id_to_dictionary.size()); + + // TODO(wesm): does sorting by id yield any benefit? + int dict_index = 0; + for (const auto& entry : id_to_dictionary) { + FileBlock* block = &dictionaries_[dict_index++]; + + block->offset = position_; + + // Frame of reference in file format is 0, see ARROW-384 + const int64_t buffer_start_offset = 0; + RETURN_NOT_OK(WriteDictionary(entry.first, entry.second, buffer_start_offset, sink_, + &block->metadata_length, &block->body_length, pool_)); + RETURN_NOT_OK(UpdatePosition()); + DCHECK(position_ % 8 == 0) << "WriteDictionary did not perform aligned writes"; + } + + return Status::OK(); } Status StreamWriter::Close() { @@ -134,81 +163,147 @@ Status StreamWriter::Close() { // ---------------------------------------------------------------------- // StreamReader implementation -StreamReader::StreamReader(const std::shared_ptr<io::InputStream>& stream) - : stream_(stream), schema_(nullptr) {} - -StreamReader::~StreamReader() {} - -Status StreamReader::Open(const std::shared_ptr<io::InputStream>& stream, - std::shared_ptr<StreamReader>* reader) { - // Private ctor - *reader = std::shared_ptr<StreamReader>(new StreamReader(stream)); - return (*reader)->ReadSchema(); +static inline std::string message_type_name(Message::Type type) { + switch (type) { + case Message::SCHEMA: + return "schema"; + case Message::RECORD_BATCH: + return "record batch"; + case Message::DICTIONARY_BATCH: + return "dictionary"; + default: + break; + } + return "unknown"; } -Status StreamReader::ReadSchema() { - std::shared_ptr<Message> message; - RETURN_NOT_OK(ReadNextMessage(&message)); +class StreamReader::StreamReaderImpl { + public: + StreamReaderImpl() {} + ~StreamReaderImpl() {} - if (message->type() != Message::SCHEMA) { - return Status::IOError("First message was not schema type"); + Status Open(const std::shared_ptr<io::InputStream>& stream) { + stream_ = stream; + return ReadSchema(); } - SchemaMetadata schema_meta(message); + Status ReadNextMessage(Message::Type expected_type, std::shared_ptr<Message>* message) { + std::shared_ptr<Buffer> buffer; + RETURN_NOT_OK(stream_->Read(sizeof(int32_t), &buffer)); - // TODO(wesm): If the schema contains dictionaries, we must read all the - // dictionaries from the stream before constructing the final Schema - return schema_meta.GetSchema(&schema_); -} + if (buffer->size() != sizeof(int32_t)) { + *message = nullptr; + return Status::OK(); + } + + int32_t message_length = *reinterpret_cast<const int32_t*>(buffer->data()); + + RETURN_NOT_OK(stream_->Read(message_length, &buffer)); + if (buffer->size() != message_length) { + return Status::IOError("Unexpected end of stream trying to read message"); + } -Status StreamReader::ReadNextMessage(std::shared_ptr<Message>* message) { - std::shared_ptr<Buffer> buffer; - RETURN_NOT_OK(stream_->Read(sizeof(int32_t), &buffer)); + RETURN_NOT_OK(Message::Open(buffer, 0, message)); - if (buffer->size() != sizeof(int32_t)) { - *message = nullptr; + if ((*message)->type() != expected_type) { + std::stringstream ss; + ss << "Message not expected type: " << message_type_name(expected_type) + << ", was: " << (*message)->type(); + return Status::IOError(ss.str()); + } return Status::OK(); } - int32_t message_length = *reinterpret_cast<const int32_t*>(buffer->data()); + Status ReadExact(int64_t size, std::shared_ptr<Buffer>* buffer) { + RETURN_NOT_OK(stream_->Read(size, buffer)); - RETURN_NOT_OK(stream_->Read(message_length, &buffer)); - if (buffer->size() != message_length) { - return Status::IOError("Unexpected end of stream trying to read message"); + if ((*buffer)->size() < size) { + return Status::IOError("Unexpected EOS when reading buffer"); + } + return Status::OK(); } - return Message::Open(buffer, 0, message); -} -std::shared_ptr<Schema> StreamReader::schema() const { - return schema_; -} + Status ReadNextDictionary() { + std::shared_ptr<Message> message; + RETURN_NOT_OK(ReadNextMessage(Message::DICTIONARY_BATCH, &message)); -Status StreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) { - std::shared_ptr<Message> message; - RETURN_NOT_OK(ReadNextMessage(&message)); + DictionaryBatchMetadata metadata(message); - if (message == nullptr) { - // End of stream - *batch = nullptr; - return Status::OK(); + std::shared_ptr<Buffer> batch_body; + RETURN_NOT_OK(ReadExact(message->body_length(), &batch_body)) + io::BufferReader reader(batch_body); + + std::shared_ptr<Array> dictionary; + RETURN_NOT_OK(ReadDictionary(metadata, dictionary_types_, &reader, &dictionary)); + return dictionary_memo_.AddDictionary(metadata.id(), dictionary); } - if (message->type() != Message::RECORD_BATCH) { - return Status::IOError("Metadata not record batch"); + Status ReadSchema() { + std::shared_ptr<Message> message; + RETURN_NOT_OK(ReadNextMessage(Message::SCHEMA, &message)); + + SchemaMetadata schema_meta(message); + RETURN_NOT_OK(schema_meta.GetDictionaryTypes(&dictionary_types_)); + + // TODO(wesm): In future, we may want to reconcile the ids in the stream with + // those found in the schema + int num_dictionaries = static_cast<int>(dictionary_types_.size()); + for (int i = 0; i < num_dictionaries; ++i) { + RETURN_NOT_OK(ReadNextDictionary()); + } + + return schema_meta.GetSchema(dictionary_memo_, &schema_); } - auto batch_metadata = std::make_shared<RecordBatchMetadata>(message); + Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) { + std::shared_ptr<Message> message; + RETURN_NOT_OK(ReadNextMessage(Message::RECORD_BATCH, &message)); + + if (message == nullptr) { + // End of stream + *batch = nullptr; + return Status::OK(); + } - std::shared_ptr<Buffer> batch_body; - RETURN_NOT_OK(stream_->Read(message->body_length(), &batch_body)); + RecordBatchMetadata batch_metadata(message); - if (batch_body->size() < message->body_length()) { - return Status::IOError("Unexpected EOS when reading message body"); + std::shared_ptr<Buffer> batch_body; + RETURN_NOT_OK(ReadExact(message->body_length(), &batch_body)); + io::BufferReader reader(batch_body); + return ReadRecordBatch(batch_metadata, schema_, &reader, batch); } - io::BufferReader reader(batch_body); + std::shared_ptr<Schema> schema() const { return schema_; } + + private: + // dictionary_id -> type + DictionaryTypeMap dictionary_types_; + + DictionaryMemo dictionary_memo_; + + std::shared_ptr<io::InputStream> stream_; + std::shared_ptr<Schema> schema_; +}; + +StreamReader::StreamReader() { + impl_.reset(new StreamReaderImpl()); +} + +StreamReader::~StreamReader() {} + +Status StreamReader::Open(const std::shared_ptr<io::InputStream>& stream, + std::shared_ptr<StreamReader>* reader) { + // Private ctor + *reader = std::shared_ptr<StreamReader>(new StreamReader()); + return (*reader)->impl_->Open(stream); +} + +std::shared_ptr<Schema> StreamReader::schema() const { + return impl_->schema(); +} - return ReadRecordBatch(batch_metadata, schema_, &reader, batch); +Status StreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) { + return impl_->GetNextRecordBatch(batch); } } // namespace ipc http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/stream.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/stream.h b/cpp/src/arrow/ipc/stream.h index 12414fa..1c3f65e 100644 --- a/cpp/src/arrow/ipc/stream.h +++ b/cpp/src/arrow/ipc/stream.h @@ -22,7 +22,9 @@ #include <cstdint> #include <memory> +#include <vector> +#include "arrow/ipc/metadata.h" #include "arrow/util/visibility.h" namespace arrow { @@ -44,12 +46,19 @@ class OutputStream; namespace ipc { -struct FileBlock; -class Message; +struct ARROW_EXPORT FileBlock { + FileBlock() {} + FileBlock(int64_t offset, int32_t metadata_length, int64_t body_length) + : offset(offset), metadata_length(metadata_length), body_length(body_length) {} + + int64_t offset; + int32_t metadata_length; + int64_t body_length; +}; class ARROW_EXPORT StreamWriter { public: - virtual ~StreamWriter(); + virtual ~StreamWriter() = default; static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, std::shared_ptr<StreamWriter>* out); @@ -72,6 +81,8 @@ class ARROW_EXPORT StreamWriter { Status CheckStarted(); Status UpdatePosition(); + Status WriteDictionaries(); + Status WriteRecordBatch(const RecordBatch& batch, FileBlock* block); // Adds padding bytes if necessary to ensure all memory blocks are written on @@ -87,10 +98,17 @@ class ARROW_EXPORT StreamWriter { io::OutputStream* sink_; std::shared_ptr<Schema> schema_; + // When writing out the schema, we keep track of all the dictionaries we + // encounter, as they must be written out first in the stream + std::shared_ptr<DictionaryMemo> dictionary_memo_; + MemoryPool* pool_; int64_t position_; bool started_; + + std::vector<FileBlock> dictionaries_; + std::vector<FileBlock> record_batches_; }; class ARROW_EXPORT StreamReader { @@ -107,14 +125,10 @@ class ARROW_EXPORT StreamReader { Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch); private: - explicit StreamReader(const std::shared_ptr<io::InputStream>& stream); - - Status ReadSchema(); + StreamReader(); - Status ReadNextMessage(std::shared_ptr<Message>* message); - - std::shared_ptr<io::InputStream> stream_; - std::shared_ptr<Schema> schema_; + class ARROW_NO_EXPORT StreamReaderImpl; + std::unique_ptr<StreamReaderImpl> impl_; }; } // namespace ipc http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/test-common.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index b4930c4..07f786c 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -345,6 +345,86 @@ Status MakeUnion(std::shared_ptr<RecordBatch>* out) { return Status::OK(); } +Status MakeDictionary(std::shared_ptr<RecordBatch>* out) { + const int32_t length = 6; + + std::vector<bool> is_valid = {true, true, false, true, true, true}; + std::shared_ptr<Array> dict1, dict2; + + std::vector<std::string> dict1_values = {"foo", "bar", "baz"}; + std::vector<std::string> dict2_values = {"foo", "bar", "baz", "qux"}; + + ArrayFromVector<StringType, std::string>(dict1_values, &dict1); + ArrayFromVector<StringType, std::string>(dict2_values, &dict2); + + auto f0_type = arrow::dictionary(arrow::int32(), dict1); + auto f1_type = arrow::dictionary(arrow::int8(), dict1); + auto f2_type = arrow::dictionary(arrow::int32(), dict2); + + std::shared_ptr<Array> indices0, indices1, indices2; + std::vector<int32_t> indices0_values = {1, 2, -1, 0, 2, 0}; + std::vector<int8_t> indices1_values = {0, 0, 2, 2, 1, 1}; + std::vector<int32_t> indices2_values = {3, 0, 2, 1, 0, 2}; + + ArrayFromVector<Int32Type, int32_t>(is_valid, indices0_values, &indices0); + ArrayFromVector<Int8Type, int8_t>(is_valid, indices1_values, &indices1); + ArrayFromVector<Int32Type, int32_t>(is_valid, indices2_values, &indices2); + + auto a0 = std::make_shared<DictionaryArray>(f0_type, indices0); + auto a1 = std::make_shared<DictionaryArray>(f1_type, indices1); + auto a2 = std::make_shared<DictionaryArray>(f2_type, indices2); + + // List of dictionary-encoded string + auto f3_type = list(f1_type); + + std::vector<int32_t> list_offsets = {0, 0, 2, 2, 5, 6, 9}; + std::shared_ptr<Array> offsets, indices3; + ArrayFromVector<Int32Type, int32_t>( + std::vector<bool>(list_offsets.size(), true), list_offsets, &offsets); + + std::vector<int8_t> indices3_values = {0, 1, 2, 0, 1, 2, 0, 1, 2}; + std::vector<bool> is_valid3(9, true); + ArrayFromVector<Int8Type, int8_t>(is_valid3, indices3_values, &indices3); + + std::shared_ptr<Buffer> null_bitmap; + RETURN_NOT_OK(test::GetBitmapFromBoolVector(is_valid, &null_bitmap)); + + std::shared_ptr<Array> a3 = std::make_shared<ListArray>(f3_type, length, + std::static_pointer_cast<PrimitiveArray>(offsets)->data(), + std::make_shared<DictionaryArray>(f1_type, indices3), null_bitmap, 1); + + // Dictionary-encoded list of integer + auto f4_value_type = list(int8()); + + std::shared_ptr<Array> offsets4, values4, indices4; + + std::vector<int32_t> list_offsets4 = {0, 2, 2, 3}; + ArrayFromVector<Int32Type, int32_t>( + std::vector<bool>(4, true), list_offsets4, &offsets4); + + std::vector<int8_t> list_values4 = {0, 1, 2}; + ArrayFromVector<Int8Type, int8_t>(std::vector<bool>(3, true), list_values4, &values4); + + auto dict3 = std::make_shared<ListArray>(f4_value_type, 3, + std::static_pointer_cast<PrimitiveArray>(offsets4)->data(), values4); + + std::vector<int8_t> indices4_values = {0, 1, 2, 0, 1, 2}; + ArrayFromVector<Int8Type, int8_t>(is_valid, indices4_values, &indices4); + + auto f4_type = dictionary(int8(), dict3); + auto a4 = std::make_shared<DictionaryArray>(f4_type, indices4); + + // construct batch + std::shared_ptr<Schema> schema(new Schema({field("dict1", f0_type), + field("sparse", f1_type), field("dense", f2_type), + field("list of encoded string", f3_type), field("encoded list<int8>", f4_type)})); + + std::vector<std::shared_ptr<Array>> arrays = {a0, a1, a2, a3, a4}; + + out->reset(new RecordBatch(schema, length, arrays)); + return Status::OK(); +} + } // namespace ipc } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/type.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc index a1c2b79..b97b465 100644 --- a/cpp/src/arrow/type.cc +++ b/cpp/src/arrow/type.cc @@ -29,7 +29,7 @@ namespace arrow { bool Field::Equals(const Field& other) const { return (this == &other) || (this->name == other.name && this->nullable == other.nullable && - this->dictionary == dictionary && this->type->Equals(*other.type.get())); + this->type->Equals(*other.type.get())); } bool Field::Equals(const std::shared_ptr<Field>& other) const { @@ -234,8 +234,8 @@ std::shared_ptr<DataType> dictionary(const std::shared_ptr<DataType>& index_type } std::shared_ptr<Field> field( - const std::string& name, const TypePtr& type, bool nullable, int64_t dictionary) { - return std::make_shared<Field>(name, type, nullable, dictionary); + const std::string& name, const TypePtr& type, bool nullable) { + return std::make_shared<Field>(name, type, nullable); } static const BufferDescr kValidityBuffer(BufferType::VALIDITY, 1); http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/type.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 927b8a4..b15aa27 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -114,6 +114,8 @@ class BufferDescr { class TypeVisitor { public: + virtual ~TypeVisitor() = default; + virtual Status Visit(const NullType& type) = 0; virtual Status Visit(const BooleanType& type) = 0; virtual Status Visit(const Int8Type& type) = 0; @@ -205,13 +207,9 @@ struct ARROW_EXPORT Field { // Fields can be nullable bool nullable; - // optional dictionary id if the field is dictionary encoded - // 0 means it's not dictionary encoded - int64_t dictionary; - Field(const std::string& name, const std::shared_ptr<DataType>& type, - bool nullable = true, int64_t dictionary = 0) - : name(name), type(type), nullable(nullable), dictionary(dictionary) {} + bool nullable = true) + : name(name), type(type), nullable(nullable) {} bool operator==(const Field& other) const { return this->Equals(other); } bool operator!=(const Field& other) const { return !this->Equals(other); } @@ -556,8 +554,8 @@ std::shared_ptr<DataType> ARROW_EXPORT union_( std::shared_ptr<DataType> ARROW_EXPORT dictionary( const std::shared_ptr<DataType>& index_type, const std::shared_ptr<Array>& values); -std::shared_ptr<Field> ARROW_EXPORT field(const std::string& name, - const std::shared_ptr<DataType>& type, bool nullable = true, int64_t dictionary = 0); +std::shared_ptr<Field> ARROW_EXPORT field( + const std::string& name, const std::shared_ptr<DataType>& type, bool nullable = true); // ---------------------------------------------------------------------- // http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/python/pyarrow/includes/libarrow_ipc.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow_ipc.pxd b/python/pyarrow/includes/libarrow_ipc.pxd index 5ab9815..afc7dbd 100644 --- a/python/pyarrow/includes/libarrow_ipc.pxd +++ b/python/pyarrow/includes/libarrow_ipc.pxd @@ -63,7 +63,6 @@ cdef extern from "arrow/ipc/file.h" namespace "arrow::ipc" nogil: shared_ptr[CSchema] schema() - int num_dictionaries() int num_record_batches() CStatus GetRecordBatch(int i, shared_ptr[CRecordBatch]* batch) http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/python/pyarrow/io.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index 89ce6e7..4acef21 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -995,11 +995,6 @@ cdef class _FileReader: else: check_status(CFileReader.Open(reader, &self.reader)) - property num_dictionaries: - - def __get__(self): - return self.reader.get().num_dictionaries() - property num_record_batches: def __get__(self):
