http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/ipc/metadata.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc index 49c24c7..20fd280 100644 --- a/cpp/src/arrow/ipc/metadata.cc +++ b/cpp/src/arrow/ipc/metadata.cc @@ -58,8 +58,8 @@ static constexpr flatbuf::MetadataVersion kCurrentMetadataVersion = static constexpr flatbuf::MetadataVersion kMinMetadataVersion = flatbuf::MetadataVersion_V3; -static Status IntFromFlatbuffer( - const flatbuf::Int* int_data, std::shared_ptr<DataType>* out) { +static Status IntFromFlatbuffer(const flatbuf::Int* int_data, + std::shared_ptr<DataType>* out) { if (int_data->bitWidth() > 64) { return Status::NotImplemented("Integers with more than 64 bits not implemented"); } @@ -86,8 +86,8 @@ static Status IntFromFlatbuffer( return Status::OK(); } -static Status FloatFromFlatuffer( - const flatbuf::FloatingPoint* float_data, std::shared_ptr<DataType>* out) { +static Status FloatFromFlatuffer(const flatbuf::FloatingPoint* float_data, + std::shared_ptr<DataType>* out) { if (float_data->precision() == flatbuf::Precision_HALF) { *out = float16(); } else if (float_data->precision() == flatbuf::Precision_SINGLE) { @@ -100,7 +100,7 @@ static Status FloatFromFlatuffer( // Forward declaration static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field, - DictionaryMemo* dictionary_memo, FieldOffset* offset); + DictionaryMemo* dictionary_memo, FieldOffset* offset); static Offset IntToFlatbuffer(FBB& fbb, int bitWidth, bool is_signed) { return flatbuf::CreateInt(fbb, bitWidth, is_signed).Union(); @@ -111,7 +111,8 @@ static Offset FloatToFlatbuffer(FBB& fbb, flatbuf::Precision precision) { } static Status AppendChildFields(FBB& fbb, const std::shared_ptr<DataType>& type, - std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo) { + std::vector<FieldOffset>* out_children, + DictionaryMemo* dictionary_memo) { FieldOffset field; for (int i = 0; i < type->num_children(); ++i) { RETURN_NOT_OK(FieldToFlatbuffer(fbb, type->child(i), dictionary_memo, &field)); @@ -121,16 +122,16 @@ static Status AppendChildFields(FBB& fbb, const std::shared_ptr<DataType>& type, } static Status ListToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type, - std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo, - Offset* offset) { + std::vector<FieldOffset>* out_children, + DictionaryMemo* dictionary_memo, Offset* offset) { RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo)); *offset = flatbuf::CreateList(fbb).Union(); return Status::OK(); } static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type, - std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo, - Offset* offset) { + std::vector<FieldOffset>* out_children, + DictionaryMemo* dictionary_memo, Offset* offset) { RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo)); *offset = flatbuf::CreateStruct_(fbb).Union(); return Status::OK(); @@ -140,7 +141,8 @@ static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type // Union implementation static Status UnionFromFlatbuffer(const flatbuf::Union* union_data, - const std::vector<std::shared_ptr<Field>>& children, std::shared_ptr<DataType>* out) { + const std::vector<std::shared_ptr<Field>>& children, + std::shared_ptr<DataType>* out) { UnionMode mode = union_data->mode() == flatbuf::UnionMode_Sparse ? UnionMode::SPARSE : UnionMode::DENSE; @@ -163,8 +165,8 @@ static Status UnionFromFlatbuffer(const flatbuf::Union* union_data, } static Status UnionToFlatBuffer(FBB& fbb, const std::shared_ptr<DataType>& type, - std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo, - Offset* offset) { + std::vector<FieldOffset>* out_children, + DictionaryMemo* dictionary_memo, Offset* offset) { RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo)); const auto& union_type = static_cast<const UnionType&>(*type); @@ -224,15 +226,16 @@ static inline TimeUnit::type FromFlatbufferUnit(flatbuf::TimeUnit unit) { } static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data, - const std::vector<std::shared_ptr<Field>>& children, std::shared_ptr<DataType>* out) { + const std::vector<std::shared_ptr<Field>>& children, + std::shared_ptr<DataType>* out) { switch (type) { case flatbuf::Type_NONE: return Status::Invalid("Type metadata cannot be none"); case flatbuf::Type_Int: return IntFromFlatbuffer(static_cast<const flatbuf::Int*>(type_data), out); case flatbuf::Type_FloatingPoint: - return FloatFromFlatuffer( - static_cast<const flatbuf::FloatingPoint*>(type_data), out); + return FloatFromFlatuffer(static_cast<const flatbuf::FloatingPoint*>(type_data), + out); case flatbuf::Type_Binary: *out = binary(); return Status::OK(); @@ -301,8 +304,8 @@ static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data, *out = std::make_shared<StructType>(children); return Status::OK(); case flatbuf::Type_Union: - return UnionFromFlatbuffer( - static_cast<const flatbuf::Union*>(type_data), children, out); + return UnionFromFlatbuffer(static_cast<const flatbuf::Union*>(type_data), children, + out); default: return Status::Invalid("Unrecognized type"); } @@ -310,15 +313,17 @@ static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data, // TODO(wesm): Convert this to visitor pattern static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type, - std::vector<FieldOffset>* children, std::vector<VectorLayoutOffset>* layout, - flatbuf::Type* out_type, DictionaryMemo* dictionary_memo, Offset* offset) { + std::vector<FieldOffset>* children, + std::vector<VectorLayoutOffset>* layout, + flatbuf::Type* out_type, DictionaryMemo* dictionary_memo, + Offset* offset) { if (type->id() == Type::DICTIONARY) { // In this library, the dictionary "type" is a logical construct. Here we // pass through to the value type, as we've already captured the index // type in the DictionaryEncoding metadata in the parent field const auto& dict_type = static_cast<const DictionaryType&>(*type); return TypeToFlatbuffer(fbb, dict_type.dictionary()->type(), children, layout, - out_type, dictionary_memo, offset); + out_type, dictionary_memo, offset); } std::vector<BufferDescr> buffer_layout = type->GetBufferLayout(); @@ -436,7 +441,7 @@ static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type, } static Status TensorTypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type, - flatbuf::Type* out_type, Offset* offset) { + flatbuf::Type* out_type, Offset* offset) { switch (type->id()) { case Type::UINT8: INT_TO_FB_CASE(8, false); @@ -475,8 +480,8 @@ static Status TensorTypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& return Status::OK(); } -static DictionaryOffset GetDictionaryEncoding( - FBB& fbb, const DictionaryType& type, DictionaryMemo* memo) { +static DictionaryOffset GetDictionaryEncoding(FBB& fbb, const DictionaryType& type, + DictionaryMemo* memo) { int64_t dictionary_id = memo->GetId(type.dictionary()); // We assume that the dictionary index type (as an integer) has already been @@ -491,7 +496,7 @@ static DictionaryOffset GetDictionaryEncoding( } static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field, - DictionaryMemo* dictionary_memo, FieldOffset* offset) { + DictionaryMemo* dictionary_memo, FieldOffset* offset) { auto fb_name = fbb.CreateString(field->name()); flatbuf::Type type_enum; @@ -500,8 +505,8 @@ static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field, std::vector<FieldOffset> children; std::vector<VectorLayoutOffset> layout; - RETURN_NOT_OK(TypeToFlatbuffer( - fbb, field->type(), &children, &layout, &type_enum, dictionary_memo, &type_offset)); + RETURN_NOT_OK(TypeToFlatbuffer(fbb, field->type(), &children, &layout, &type_enum, + dictionary_memo, &type_offset)); auto fb_children = fbb.CreateVector(children); auto fb_layout = fbb.CreateVector(layout); @@ -513,13 +518,14 @@ static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field, // TODO: produce the list of VectorTypes *offset = flatbuf::CreateField(fbb, fb_name, field->nullable(), type_enum, type_offset, - dictionary, fb_children, fb_layout); + dictionary, fb_children, fb_layout); return Status::OK(); } static Status FieldFromFlatbuffer(const flatbuf::Field* field, - const DictionaryMemo& dictionary_memo, std::shared_ptr<Field>* out) { + const DictionaryMemo& dictionary_memo, + std::shared_ptr<Field>* out) { std::shared_ptr<DataType> type; const flatbuf::DictionaryEncoding* encoding = field->dictionary(); @@ -551,8 +557,8 @@ static Status FieldFromFlatbuffer(const flatbuf::Field* field, return Status::OK(); } -static Status FieldFromFlatbufferDictionary( - const flatbuf::Field* field, std::shared_ptr<Field>* out) { +static Status FieldFromFlatbufferDictionary(const flatbuf::Field* field, + std::shared_ptr<Field>* out) { // Need an empty memo to pass down for constructing children DictionaryMemo dummy_memo; @@ -584,7 +590,8 @@ flatbuf::Endianness endianness() { } static Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema, - DictionaryMemo* dictionary_memo, flatbuffers::Offset<flatbuf::Schema>* out) { + DictionaryMemo* dictionary_memo, + flatbuffers::Offset<flatbuf::Schema>* out) { /// Fields std::vector<FieldOffset> field_offsets; for (int i = 0; i < schema.num_fields(); ++i) { @@ -609,8 +616,8 @@ static Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema, key_value_offsets.push_back( flatbuf::CreateKeyValue(fbb, fbb.CreateString(key), fbb.CreateString(value))); } - *out = flatbuf::CreateSchema( - fbb, endianness(), fb_offsets, fbb.CreateVector(key_value_offsets)); + *out = flatbuf::CreateSchema(fbb, endianness(), fb_offsets, + fbb.CreateVector(key_value_offsets)); } else { *out = flatbuf::CreateSchema(fbb, endianness(), fb_offsets); } @@ -631,15 +638,16 @@ static Status WriteFlatbufferBuilder(FBB& fbb, std::shared_ptr<Buffer>* out) { } static Status WriteFBMessage(FBB& fbb, flatbuf::MessageHeader header_type, - flatbuffers::Offset<void> header, int64_t body_length, std::shared_ptr<Buffer>* out) { - auto message = flatbuf::CreateMessage( - fbb, kCurrentMetadataVersion, header_type, header, body_length); + flatbuffers::Offset<void> header, int64_t body_length, + std::shared_ptr<Buffer>* out) { + auto message = flatbuf::CreateMessage(fbb, kCurrentMetadataVersion, header_type, header, + body_length); fbb.Finish(message); return WriteFlatbufferBuilder(fbb, out); } -Status WriteSchemaMessage( - const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out) { +Status WriteSchemaMessage(const Schema& schema, DictionaryMemo* dictionary_memo, + std::shared_ptr<Buffer>* out) { FBB fbb; flatbuffers::Offset<flatbuf::Schema> fb_schema; RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema)); @@ -650,8 +658,8 @@ using FieldNodeVector = flatbuffers::Offset<flatbuffers::Vector<const flatbuf::FieldNode*>>; using BufferVector = flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Buffer*>>; -static Status WriteFieldNodes( - FBB& fbb, const std::vector<FieldMetadata>& nodes, FieldNodeVector* out) { +static Status WriteFieldNodes(FBB& fbb, const std::vector<FieldMetadata>& nodes, + FieldNodeVector* out) { std::vector<flatbuf::FieldNode> fb_nodes; fb_nodes.reserve(nodes.size()); @@ -666,8 +674,8 @@ static Status WriteFieldNodes( return Status::OK(); } -static Status WriteBuffers( - FBB& fbb, const std::vector<BufferMetadata>& buffers, BufferVector* out) { +static Status WriteBuffers(FBB& fbb, const std::vector<BufferMetadata>& buffers, + BufferVector* out) { std::vector<flatbuf::Buffer> fb_buffers; fb_buffers.reserve(buffers.size()); @@ -680,8 +688,9 @@ static Status WriteBuffers( } static Status MakeRecordBatch(FBB& fbb, int64_t length, int64_t body_length, - const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, - RecordBatchOffset* offset) { + const std::vector<FieldMetadata>& nodes, + const std::vector<BufferMetadata>& buffers, + RecordBatchOffset* offset) { FieldNodeVector fb_nodes; BufferVector fb_buffers; @@ -693,17 +702,18 @@ static Status MakeRecordBatch(FBB& fbb, int64_t length, int64_t body_length, } Status WriteRecordBatchMessage(int64_t length, int64_t body_length, - const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, - std::shared_ptr<Buffer>* out) { + const std::vector<FieldMetadata>& nodes, + const std::vector<BufferMetadata>& buffers, + std::shared_ptr<Buffer>* out) { FBB fbb; RecordBatchOffset record_batch; RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch)); - return WriteFBMessage( - fbb, flatbuf::MessageHeader_RecordBatch, record_batch.Union(), body_length, out); + return WriteFBMessage(fbb, flatbuf::MessageHeader_RecordBatch, record_batch.Union(), + body_length, out); } -Status WriteTensorMessage( - const Tensor& tensor, int64_t buffer_start_offset, std::shared_ptr<Buffer>* out) { +Status WriteTensorMessage(const Tensor& tensor, int64_t buffer_start_offset, + std::shared_ptr<Buffer>* out) { using TensorDimOffset = flatbuffers::Offset<flatbuf::TensorDim>; using TensorOffset = flatbuffers::Offset<flatbuf::Tensor>; @@ -727,19 +737,20 @@ Status WriteTensorMessage( TensorOffset fb_tensor = flatbuf::CreateTensor(fbb, fb_type_type, fb_type, fb_shape, fb_strides, &buffer); - return WriteFBMessage( - fbb, flatbuf::MessageHeader_Tensor, fb_tensor.Union(), body_length, out); + return WriteFBMessage(fbb, flatbuf::MessageHeader_Tensor, fb_tensor.Union(), + body_length, out); } Status WriteDictionaryMessage(int64_t id, int64_t length, int64_t body_length, - const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, - std::shared_ptr<Buffer>* out) { + const std::vector<FieldMetadata>& nodes, + const std::vector<BufferMetadata>& buffers, + std::shared_ptr<Buffer>* out) { FBB fbb; RecordBatchOffset record_batch; RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch)); auto dictionary_batch = flatbuf::CreateDictionaryBatch(fbb, id, record_batch).Union(); - return WriteFBMessage( - fbb, flatbuf::MessageHeader_DictionaryBatch, dictionary_batch, body_length, out); + return WriteFBMessage(fbb, flatbuf::MessageHeader_DictionaryBatch, dictionary_batch, + body_length, out); } static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>> @@ -754,8 +765,8 @@ FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) { } Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries, - const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo, - io::OutputStream* out) { + const std::vector<FileBlock>& record_batches, + DictionaryMemo* dictionary_memo, io::OutputStream* out) { FBB fbb; flatbuffers::Offset<flatbuf::Schema> fb_schema; @@ -764,8 +775,8 @@ Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dicti auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries); auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches); - auto footer = flatbuf::CreateFooter( - fbb, kCurrentMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches); + auto footer = flatbuf::CreateFooter(fbb, kCurrentMetadataVersion, fb_schema, + fb_dictionaries, fb_record_batches); fbb.Finish(footer); @@ -780,8 +791,8 @@ Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dicti DictionaryMemo::DictionaryMemo() {} // Returns KeyError if dictionary not found -Status DictionaryMemo::GetDictionary( - int64_t id, std::shared_ptr<Array>* dictionary) const { +Status DictionaryMemo::GetDictionary(int64_t id, + std::shared_ptr<Array>* dictionary) const { auto it = id_to_dictionary_.find(id); if (it == id_to_dictionary_.end()) { std::stringstream ss; @@ -817,8 +828,8 @@ bool DictionaryMemo::HasDictionaryId(int64_t id) const { return it != id_to_dictionary_.end(); } -Status DictionaryMemo::AddDictionary( - int64_t id, const std::shared_ptr<Array>& dictionary) { +Status DictionaryMemo::AddDictionary(int64_t id, + const std::shared_ptr<Array>& dictionary) { if (HasDictionaryId(id)) { std::stringstream ss; ss << "Dictionary with id " << id << " already exists"; @@ -835,8 +846,8 @@ Status DictionaryMemo::AddDictionary( class Message::MessageImpl { public: - explicit MessageImpl( - const std::shared_ptr<Buffer>& metadata, const std::shared_ptr<Buffer>& body) + explicit MessageImpl(const std::shared_ptr<Buffer>& metadata, + const std::shared_ptr<Buffer>& body) : metadata_(metadata), message_(nullptr), body_(body) {} Status Open() { @@ -897,43 +908,35 @@ class Message::MessageImpl { std::shared_ptr<Buffer> body_; }; -Message::Message( - const std::shared_ptr<Buffer>& metadata, const std::shared_ptr<Buffer>& body) { +Message::Message(const std::shared_ptr<Buffer>& metadata, + const std::shared_ptr<Buffer>& body) { impl_.reset(new MessageImpl(metadata, body)); } Status Message::Open(const std::shared_ptr<Buffer>& metadata, - const std::shared_ptr<Buffer>& body, std::unique_ptr<Message>* out) { + const std::shared_ptr<Buffer>& body, std::unique_ptr<Message>* out) { out->reset(new Message(metadata, body)); return (*out)->impl_->Open(); } Message::~Message() {} -std::shared_ptr<Buffer> Message::body() const { - return impl_->body(); -} +std::shared_ptr<Buffer> Message::body() const { return impl_->body(); } -std::shared_ptr<Buffer> Message::metadata() const { - return impl_->metadata(); -} +std::shared_ptr<Buffer> Message::metadata() const { return impl_->metadata(); } -Message::Type Message::type() const { - return impl_->type(); -} +Message::Type Message::type() const { return impl_->type(); } -MetadataVersion Message::metadata_version() const { - return impl_->version(); -} +MetadataVersion Message::metadata_version() const { return impl_->version(); } -const void* Message::header() const { - return impl_->header(); -} +const void* Message::header() const { return impl_->header(); } bool Message::Equals(const Message& other) const { int64_t metadata_bytes = std::min(metadata()->size(), other.metadata()->size()); - if (!metadata()->Equals(*other.metadata(), metadata_bytes)) { return false; } + if (!metadata()->Equals(*other.metadata(), metadata_bytes)) { + return false; + } // Compare bodies, if they have them auto this_body = body(); @@ -1012,7 +1015,7 @@ Status GetDictionaryTypes(const void* opaque_schema, DictionaryTypeMap* id_to_fi } Status GetSchema(const void* opaque_schema, const DictionaryMemo& dictionary_memo, - std::shared_ptr<Schema>* out) { + std::shared_ptr<Schema>* out) { auto schema = static_cast<const flatbuf::Schema*>(opaque_schema); int num_fields = static_cast<int>(schema->fields()->size()); @@ -1036,8 +1039,8 @@ Status GetSchema(const void* opaque_schema, const DictionaryMemo& dictionary_mem } Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type, - std::vector<int64_t>* shape, std::vector<int64_t>* strides, - std::vector<std::string>* dim_names) { + std::vector<int64_t>* shape, std::vector<int64_t>* strides, + std::vector<std::string>* dim_names) { auto message = flatbuf::GetMessage(metadata.data()); auto tensor = reinterpret_cast<const flatbuf::Tensor*>(message->header()); @@ -1068,7 +1071,8 @@ Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type // Read and write messages static Status ReadFullMessage(const std::shared_ptr<Buffer>& metadata, - io::InputStream* stream, std::unique_ptr<Message>* message) { + io::InputStream* stream, + std::unique_ptr<Message>* message) { auto fb_message = flatbuf::GetMessage(metadata->data()); int64_t body_length = fb_message->bodyLength(); @@ -1087,7 +1091,7 @@ static Status ReadFullMessage(const std::shared_ptr<Buffer>& metadata, } Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file, - std::unique_ptr<Message>* message) { + std::unique_ptr<Message>* message) { std::shared_ptr<Buffer> buffer; RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer)); @@ -1141,8 +1145,8 @@ InputStreamMessageReader::~InputStreamMessageReader() {} // ---------------------------------------------------------------------- // Implement message writing -Status WriteMessage( - const Buffer& message, io::OutputStream* file, int32_t* message_length) { +Status WriteMessage(const Buffer& message, io::OutputStream* file, + int32_t* message_length) { // Need to write 4 bytes (message size), the message, plus padding to // end on an 8-byte offset int64_t start_offset; @@ -1151,7 +1155,9 @@ Status WriteMessage( int32_t padded_message_length = static_cast<int32_t>(message.size()) + 4; const int32_t remainder = (padded_message_length + static_cast<int32_t>(start_offset)) % 8; - if (remainder != 0) { padded_message_length += 8 - remainder; } + if (remainder != 0) { + padded_message_length += 8 - remainder; + } // The returned message size includes the length prefix, the flatbuffer, // plus padding @@ -1167,7 +1173,9 @@ Status WriteMessage( // Write any padding int32_t padding = padded_message_length - static_cast<int32_t>(message.size()) - 4; - if (padding > 0) { RETURN_NOT_OK(file->Write(kPaddingBytes, padding)); } + if (padding > 0) { + RETURN_NOT_OK(file->Write(kPaddingBytes, padding)); + } return Status::OK(); }
http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/ipc/metadata.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h index 614f7a6..90e4def 100644 --- a/cpp/src/arrow/ipc/metadata.h +++ b/cpp/src/arrow/ipc/metadata.h @@ -133,11 +133,14 @@ Status GetDictionaryTypes(const void* opaque_schema, DictionaryTypeMap* id_to_fi // Construct a complete Schema from the message. May be expensive for very // large schemas if you are only interested in a few fields Status ARROW_EXPORT GetSchema(const void* opaque_schema, - const DictionaryMemo& dictionary_memo, std::shared_ptr<Schema>* out); + const DictionaryMemo& dictionary_memo, + std::shared_ptr<Schema>* out); Status ARROW_EXPORT GetTensorMetadata(const Buffer& metadata, - std::shared_ptr<DataType>* type, std::vector<int64_t>* shape, - std::vector<int64_t>* strides, std::vector<std::string>* dim_names); + std::shared_ptr<DataType>* type, + std::vector<int64_t>* shape, + std::vector<int64_t>* strides, + std::vector<std::string>* dim_names); /// \brief An IPC message including metadata and body class ARROW_EXPORT Message { @@ -157,7 +160,7 @@ class ARROW_EXPORT Message { /// \param[in] body a buffer containing the message body, which may be nullptr /// \param[out] out the created message static Status Open(const std::shared_ptr<Buffer>& metadata, - const std::shared_ptr<Buffer>& body, std::unique_ptr<Message>* out); + const std::shared_ptr<Buffer>& body, std::unique_ptr<Message>* out); /// \brief Write length-prefixed metadata and body to output stream /// @@ -242,22 +245,23 @@ class ARROW_EXPORT InputStreamMessageReader : public MessageReader { /// \param[out] message the message read /// \return Status success or failure Status ARROW_EXPORT ReadMessage(int64_t offset, int32_t metadata_length, - io::RandomAccessFile* file, std::unique_ptr<Message>* message); + io::RandomAccessFile* file, + std::unique_ptr<Message>* message); /// \brief Read encapulated RPC message (metadata and body) from InputStream /// /// Read length-prefixed message with as-yet unknown length. Returns nullptr if /// there are not enough bytes available or the message length is 0 (e.g. EOS /// in a stream) -Status ARROW_EXPORT ReadMessage( - io::InputStream* stream, std::unique_ptr<Message>* message); +Status ARROW_EXPORT ReadMessage(io::InputStream* stream, + std::unique_ptr<Message>* message); /// Write a serialized message metadata with a length-prefix and padding to an /// 8-byte offset /// /// <message_size: int32><message: const void*><padding> -Status ARROW_EXPORT WriteMessage( - const Buffer& message, io::OutputStream* file, int32_t* message_length); +Status ARROW_EXPORT WriteMessage(const Buffer& message, io::OutputStream* file, + int32_t* message_length); // Serialize arrow::Schema as a Flatbuffer // @@ -266,23 +270,26 @@ Status ARROW_EXPORT WriteMessage( // dictionary ids // \param[out] out the serialized arrow::Buffer // \return Status outcome -Status ARROW_EXPORT WriteSchemaMessage( - const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out); +Status ARROW_EXPORT WriteSchemaMessage(const Schema& schema, + DictionaryMemo* dictionary_memo, + std::shared_ptr<Buffer>* out); Status ARROW_EXPORT WriteRecordBatchMessage(int64_t length, int64_t body_length, - const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, - std::shared_ptr<Buffer>* out); + const std::vector<FieldMetadata>& nodes, + const std::vector<BufferMetadata>& buffers, + std::shared_ptr<Buffer>* out); -Status ARROW_EXPORT WriteTensorMessage( - const Tensor& tensor, int64_t buffer_start_offset, std::shared_ptr<Buffer>* out); +Status ARROW_EXPORT WriteTensorMessage(const Tensor& tensor, int64_t buffer_start_offset, + std::shared_ptr<Buffer>* out); Status WriteDictionaryMessage(int64_t id, int64_t length, int64_t body_length, - const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, - std::shared_ptr<Buffer>* out); + const std::vector<FieldMetadata>& nodes, + const std::vector<BufferMetadata>& buffers, + std::shared_ptr<Buffer>* out); Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries, - const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo, - io::OutputStream* out); + const std::vector<FileBlock>& record_batches, + DictionaryMemo* dictionary_memo, io::OutputStream* out); } // namespace ipc } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/ipc/reader.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 88ab330..8ae8280 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -95,12 +95,12 @@ struct ArrayLoaderContext { }; static Status LoadArray(const std::shared_ptr<DataType>& type, - ArrayLoaderContext* context, internal::ArrayData* out); + ArrayLoaderContext* context, internal::ArrayData* out); class ArrayLoader { public: ArrayLoader(const std::shared_ptr<DataType>& type, internal::ArrayData* out, - ArrayLoaderContext* context) + ArrayLoaderContext* context) : type_(type), context_(context), out_(out) {} Status Load() { @@ -184,7 +184,7 @@ class ArrayLoader { typename std::enable_if<std::is_base_of<FixedWidthType, T>::value && !std::is_base_of<FixedSizeBinaryType, T>::value && !std::is_base_of<DictionaryType, T>::value, - Status>::type + Status>::type Visit(const T& type) { return LoadPrimitive<T>(); } @@ -252,18 +252,18 @@ class ArrayLoader { }; static Status LoadArray(const std::shared_ptr<DataType>& type, - ArrayLoaderContext* context, internal::ArrayData* out) { + ArrayLoaderContext* context, internal::ArrayData* out) { ArrayLoader loader(type, out, context); return loader.Load(); } Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& schema, - io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) { + io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) { return ReadRecordBatch(metadata, schema, kMaxNestingDepth, file, out); } Status ReadRecordBatch(const Message& message, const std::shared_ptr<Schema>& schema, - std::shared_ptr<RecordBatch>* out) { + std::shared_ptr<RecordBatch>* out) { io::BufferReader reader(message.body()); DCHECK_EQ(message.type(), Message::RECORD_BATCH); return ReadRecordBatch(*message.metadata(), schema, kMaxNestingDepth, &reader, out); @@ -273,8 +273,9 @@ Status ReadRecordBatch(const Message& message, const std::shared_ptr<Schema>& sc // Array loading static Status LoadRecordBatchFromSource(const std::shared_ptr<Schema>& schema, - int64_t num_rows, int max_recursion_depth, IpcComponentSource* source, - std::shared_ptr<RecordBatch>* out) { + int64_t num_rows, int max_recursion_depth, + IpcComponentSource* source, + std::shared_ptr<RecordBatch>* out) { ArrayLoaderContext context; context.source = source; context.field_index = 0; @@ -294,16 +295,17 @@ static Status LoadRecordBatchFromSource(const std::shared_ptr<Schema>& schema, } static inline Status ReadRecordBatch(const flatbuf::RecordBatch* metadata, - const std::shared_ptr<Schema>& schema, int max_recursion_depth, - io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) { + const std::shared_ptr<Schema>& schema, + int max_recursion_depth, io::RandomAccessFile* file, + std::shared_ptr<RecordBatch>* out) { IpcComponentSource source(metadata, file); - return LoadRecordBatchFromSource( - schema, metadata->length(), max_recursion_depth, &source, out); + return LoadRecordBatchFromSource(schema, metadata->length(), max_recursion_depth, + &source, out); } Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& schema, - int max_recursion_depth, io::RandomAccessFile* file, - std::shared_ptr<RecordBatch>* out) { + int max_recursion_depth, io::RandomAccessFile* file, + std::shared_ptr<RecordBatch>* out) { auto message = flatbuf::GetMessage(metadata.data()); if (message->header_type() != flatbuf::MessageHeader_RecordBatch) { DCHECK_EQ(message->header_type(), flatbuf::MessageHeader_RecordBatch); @@ -313,7 +315,8 @@ Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& sc } Status ReadDictionary(const Buffer& metadata, const DictionaryTypeMap& dictionary_types, - io::RandomAccessFile* file, int64_t* dictionary_id, std::shared_ptr<Array>* out) { + io::RandomAccessFile* file, int64_t* dictionary_id, + std::shared_ptr<Array>* out) { auto message = flatbuf::GetMessage(metadata.data()); auto dictionary_batch = reinterpret_cast<const flatbuf::DictionaryBatch*>(message->header()); @@ -347,7 +350,7 @@ Status ReadDictionary(const Buffer& metadata, const DictionaryTypeMap& dictionar } static Status ReadMessageAndValidate(MessageReader* reader, Message::Type expected_type, - bool allow_null, std::unique_ptr<Message>* message) { + bool allow_null, std::unique_ptr<Message>* message) { RETURN_NOT_OK(reader->ReadNextMessage(message)); if (!(*message) && !allow_null) { @@ -357,7 +360,9 @@ static Status ReadMessageAndValidate(MessageReader* reader, Message::Type expect return Status::Invalid(ss.str()); } - if ((*message) == nullptr) { return Status::OK(); } + if ((*message) == nullptr) { + return Status::OK(); + } if ((*message)->type() != expected_type) { std::stringstream ss; @@ -389,15 +394,15 @@ class RecordBatchStreamReader::RecordBatchStreamReaderImpl { Status ReadNextDictionary() { std::unique_ptr<Message> message; - RETURN_NOT_OK(ReadMessageAndValidate( - message_reader_.get(), Message::DICTIONARY_BATCH, false, &message)); + RETURN_NOT_OK(ReadMessageAndValidate(message_reader_.get(), Message::DICTIONARY_BATCH, + false, &message)); io::BufferReader reader(message->body()); std::shared_ptr<Array> dictionary; int64_t id; - RETURN_NOT_OK(ReadDictionary( - *message->metadata(), dictionary_types_, &reader, &id, &dictionary)); + RETURN_NOT_OK(ReadDictionary(*message->metadata(), dictionary_types_, &reader, &id, + &dictionary)); return dictionary_memo_.AddDictionary(id, dictionary); } @@ -420,8 +425,8 @@ class RecordBatchStreamReader::RecordBatchStreamReaderImpl { Status ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch) { std::unique_ptr<Message> message; - RETURN_NOT_OK(ReadMessageAndValidate( - message_reader_.get(), Message::RECORD_BATCH, true, &message)); + RETURN_NOT_OK(ReadMessageAndValidate(message_reader_.get(), Message::RECORD_BATCH, + true, &message)); if (message == nullptr) { // End of stream @@ -451,14 +456,14 @@ RecordBatchStreamReader::RecordBatchStreamReader() { RecordBatchStreamReader::~RecordBatchStreamReader() {} Status RecordBatchStreamReader::Open(std::unique_ptr<MessageReader> message_reader, - std::shared_ptr<RecordBatchStreamReader>* reader) { + std::shared_ptr<RecordBatchStreamReader>* reader) { // Private ctor *reader = std::shared_ptr<RecordBatchStreamReader>(new RecordBatchStreamReader()); return (*reader)->impl_->Open(std::move(message_reader)); } Status RecordBatchStreamReader::Open(const std::shared_ptr<io::InputStream>& stream, - std::shared_ptr<RecordBatchStreamReader>* out) { + std::shared_ptr<RecordBatchStreamReader>* out) { std::unique_ptr<MessageReader> message_reader(new InputStreamMessageReader(stream)); return Open(std::move(message_reader), out); } @@ -502,8 +507,8 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl { } // Now read the footer - RETURN_NOT_OK(file_->ReadAt( - footer_offset_ - footer_length - file_end_size, footer_length, &footer_buffer_)); + RETURN_NOT_OK(file_->ReadAt(footer_offset_ - footer_length - file_end_size, + footer_length, &footer_buffer_)); // TODO(wesm): Verify the footer footer_ = flatbuf::GetFooter(footer_buffer_->data()); @@ -568,7 +573,7 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl { std::shared_ptr<Array> dictionary; int64_t dictionary_id; RETURN_NOT_OK(ReadDictionary(*message->metadata(), dictionary_fields_, &reader, - &dictionary_id, &dictionary)); + &dictionary_id, &dictionary)); RETURN_NOT_OK(dictionary_memo_->AddDictionary(dictionary_id, dictionary)); } @@ -610,37 +615,34 @@ RecordBatchFileReader::RecordBatchFileReader() { RecordBatchFileReader::~RecordBatchFileReader() {} Status RecordBatchFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file, - std::shared_ptr<RecordBatchFileReader>* reader) { + std::shared_ptr<RecordBatchFileReader>* reader) { int64_t footer_offset; RETURN_NOT_OK(file->GetSize(&footer_offset)); return Open(file, footer_offset, reader); } Status RecordBatchFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file, - int64_t footer_offset, std::shared_ptr<RecordBatchFileReader>* reader) { + int64_t footer_offset, + std::shared_ptr<RecordBatchFileReader>* reader) { *reader = std::shared_ptr<RecordBatchFileReader>(new RecordBatchFileReader()); return (*reader)->impl_->Open(file, footer_offset); } -std::shared_ptr<Schema> RecordBatchFileReader::schema() const { - return impl_->schema(); -} +std::shared_ptr<Schema> RecordBatchFileReader::schema() const { return impl_->schema(); } int RecordBatchFileReader::num_record_batches() const { return impl_->num_record_batches(); } -MetadataVersion RecordBatchFileReader::version() const { - return impl_->version(); -} +MetadataVersion RecordBatchFileReader::version() const { return impl_->version(); } -Status RecordBatchFileReader::ReadRecordBatch( - int i, std::shared_ptr<RecordBatch>* batch) { +Status RecordBatchFileReader::ReadRecordBatch(int i, + std::shared_ptr<RecordBatch>* batch) { return impl_->ReadRecordBatch(i, batch); } -static Status ReadContiguousPayload( - int64_t offset, io::RandomAccessFile* file, std::unique_ptr<Message>* message) { +static Status ReadContiguousPayload(int64_t offset, io::RandomAccessFile* file, + std::unique_ptr<Message>* message) { std::shared_ptr<Buffer> buffer; RETURN_NOT_OK(file->Seek(offset)); RETURN_NOT_OK(ReadMessage(file, message)); @@ -652,16 +654,16 @@ static Status ReadContiguousPayload( } Status ReadRecordBatch(const std::shared_ptr<Schema>& schema, int64_t offset, - io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) { + io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) { std::unique_ptr<Message> message; RETURN_NOT_OK(ReadContiguousPayload(offset, file, &message)); io::BufferReader buffer_reader(message->body()); - return ReadRecordBatch( - *message->metadata(), schema, kMaxNestingDepth, &buffer_reader, out); + return ReadRecordBatch(*message->metadata(), schema, kMaxNestingDepth, &buffer_reader, + out); } -Status ReadTensor( - int64_t offset, io::RandomAccessFile* file, std::shared_ptr<Tensor>* out) { +Status ReadTensor(int64_t offset, io::RandomAccessFile* file, + std::shared_ptr<Tensor>* out) { // Respect alignment of Tensor messages (see WriteTensor) offset = PaddedLength(offset); std::unique_ptr<Message> message; http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/ipc/reader.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index d6c2614..c0d3fb1 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -72,7 +72,7 @@ class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader { /// \param(out) out the created RecordBatchStreamReader object /// \return Status static Status Open(std::unique_ptr<MessageReader> message_reader, - std::shared_ptr<RecordBatchStreamReader>* out); + std::shared_ptr<RecordBatchStreamReader>* out); /// \Create Record batch stream reader from InputStream /// @@ -80,7 +80,7 @@ class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader { /// \param(out) out the created RecordBatchStreamReader object /// \return Status static Status Open(const std::shared_ptr<io::InputStream>& stream, - std::shared_ptr<RecordBatchStreamReader>* out); + std::shared_ptr<RecordBatchStreamReader>* out); std::shared_ptr<Schema> schema() const override; Status ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch) override; @@ -103,7 +103,7 @@ class ARROW_EXPORT RecordBatchFileReader { // need only locate the end of the Arrow file stream to discover the metadata // and then proceed to read the data into memory. static Status Open(const std::shared_ptr<io::RandomAccessFile>& file, - std::shared_ptr<RecordBatchFileReader>* reader); + std::shared_ptr<RecordBatchFileReader>* reader); // If the file is embedded within some larger file or memory region, you can // pass the absolute memory offset to the end of the file (which contains the @@ -113,7 +113,8 @@ class ARROW_EXPORT RecordBatchFileReader { // @param file: the data source // @param footer_offset: the position of the end of the Arrow "file" static Status Open(const std::shared_ptr<io::RandomAccessFile>& file, - int64_t footer_offset, std::shared_ptr<RecordBatchFileReader>* reader); + int64_t footer_offset, + std::shared_ptr<RecordBatchFileReader>* reader); /// The schema includes any dictionaries std::shared_ptr<Schema> schema() const; @@ -148,8 +149,9 @@ class ARROW_EXPORT RecordBatchFileReader { /// \param(in) file a random access file /// \param(out) out the read record batch Status ARROW_EXPORT ReadRecordBatch(const Buffer& metadata, - const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file, - std::shared_ptr<RecordBatch>* out); + const std::shared_ptr<Schema>& schema, + io::RandomAccessFile* file, + std::shared_ptr<RecordBatch>* out); /// \brief Read record batch from fully encapulated Message /// @@ -158,7 +160,8 @@ Status ARROW_EXPORT ReadRecordBatch(const Buffer& metadata, /// \param[out] out the resulting RecordBatch /// \return Status Status ARROW_EXPORT ReadRecordBatch(const Message& message, - const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* out); + const std::shared_ptr<Schema>& schema, + std::shared_ptr<RecordBatch>* out); /// Read record batch from file given metadata and schema /// @@ -168,8 +171,9 @@ Status ARROW_EXPORT ReadRecordBatch(const Message& message, /// \param(in) max_recursion_depth the maximum permitted nesting depth /// \param(out) out the read record batch Status ARROW_EXPORT ReadRecordBatch(const Buffer& metadata, - const std::shared_ptr<Schema>& schema, int max_recursion_depth, - io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out); + const std::shared_ptr<Schema>& schema, + int max_recursion_depth, io::RandomAccessFile* file, + std::shared_ptr<RecordBatch>* out); /// Read record batch as encapsulated IPC message with metadata size prefix and /// header @@ -179,15 +183,16 @@ Status ARROW_EXPORT ReadRecordBatch(const Buffer& metadata, /// \param(in) file the file where the batch is located /// \param(out) out the read record batch Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr<Schema>& schema, int64_t offset, - io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out); + io::RandomAccessFile* file, + std::shared_ptr<RecordBatch>* out); /// EXPERIMENTAL: Read arrow::Tensor as encapsulated IPC message in file /// /// \param(in) offset the file location of the start of the message /// \param(in) file the file where the batch is located /// \param(out) out the read tensor -Status ARROW_EXPORT ReadTensor( - int64_t offset, io::RandomAccessFile* file, std::shared_ptr<Tensor>* out); +Status ARROW_EXPORT ReadTensor(int64_t offset, io::RandomAccessFile* file, + std::shared_ptr<Tensor>* out); /// Backwards-compatibility for Arrow < 0.4.0 /// http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/ipc/stream-to-file.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/stream-to-file.cc b/cpp/src/arrow/ipc/stream-to-file.cc index de65883..33719b3 100644 --- a/cpp/src/arrow/ipc/stream-to-file.cc +++ b/cpp/src/arrow/ipc/stream-to-file.cc @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. +#include <iostream> #include "arrow/io/file.h" #include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" #include "arrow/status.h" -#include <iostream> #include "arrow/util/io-util.h" http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/ipc/test-common.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index 67a41ba..a876792 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -69,8 +69,8 @@ static inline void CompareBatch(const RecordBatch& left, const RecordBatch& righ } } -static inline void CompareArraysDetailed( - int index, const Array& result, const Array& expected) { +static inline void CompareArraysDetailed(int index, const Array& result, + const Array& expected) { if (!expected.Equals(result)) { std::stringstream pp_result; std::stringstream pp_expected; @@ -83,8 +83,8 @@ static inline void CompareArraysDetailed( } } -static inline void CompareBatchColumnsDetailed( - const RecordBatch& result, const RecordBatch& expected) { +static inline void CompareBatchColumnsDetailed(const RecordBatch& result, + const RecordBatch& expected) { for (int i = 0; i < expected.num_columns(); ++i) { auto left = result.column(i); auto right = expected.column(i); @@ -95,16 +95,16 @@ static inline void CompareBatchColumnsDetailed( const auto kListInt32 = list(int32()); const auto kListListInt32 = list(kListInt32); -Status MakeRandomInt32Array( - int64_t length, bool include_nulls, MemoryPool* pool, std::shared_ptr<Array>* out) { +Status MakeRandomInt32Array(int64_t length, bool include_nulls, MemoryPool* pool, + std::shared_ptr<Array>* out) { std::shared_ptr<PoolBuffer> data; RETURN_NOT_OK(test::MakeRandomInt32PoolBuffer(length, pool, &data)); Int32Builder builder(pool, int32()); if (include_nulls) { std::shared_ptr<PoolBuffer> valid_bytes; RETURN_NOT_OK(test::MakeRandomBytePoolBuffer(length, pool, &valid_bytes)); - RETURN_NOT_OK(builder.Append( - reinterpret_cast<const int32_t*>(data->data()), length, valid_bytes->data())); + RETURN_NOT_OK(builder.Append(reinterpret_cast<const int32_t*>(data->data()), length, + valid_bytes->data())); return builder.Finish(out); } RETURN_NOT_OK(builder.Append(reinterpret_cast<const int32_t*>(data->data()), length)); @@ -112,7 +112,8 @@ Status MakeRandomInt32Array( } Status MakeRandomListArray(const std::shared_ptr<Array>& child_array, int num_lists, - bool include_nulls, MemoryPool* pool, std::shared_ptr<Array>* out) { + bool include_nulls, MemoryPool* pool, + std::shared_ptr<Array>* out) { // Create the null list values std::vector<uint8_t> valid_lists(num_lists); const double null_percent = include_nulls ? 0.1 : 0; @@ -129,15 +130,16 @@ Status MakeRandomListArray(const std::shared_ptr<Array>& child_array, int num_li test::rand_uniform_int(num_lists, seed, 0, max_list_size, list_sizes.data()); // make sure sizes are consistent with null std::transform(list_sizes.begin(), list_sizes.end(), valid_lists.begin(), - list_sizes.begin(), - [](int32_t size, int32_t valid) { return valid == 0 ? 0 : size; }); + list_sizes.begin(), + [](int32_t size, int32_t valid) { return valid == 0 ? 0 : size; }); std::partial_sum(list_sizes.begin(), list_sizes.end(), ++offsets.begin()); // Force invariants const int32_t child_length = static_cast<int32_t>(child_array->length()); offsets[0] = 0; std::replace_if(offsets.begin(), offsets.end(), - [child_length](int32_t offset) { return offset > child_length; }, child_length); + [child_length](int32_t offset) { return offset > child_length; }, + child_length); } offsets[num_lists] = static_cast<int32_t>(child_array->length()); @@ -148,14 +150,14 @@ Status MakeRandomListArray(const std::shared_ptr<Array>& child_array, int num_li RETURN_NOT_OK(test::CopyBufferFromVector(offsets, pool, &offsets_buffer)); *out = std::make_shared<ListArray>(list(child_array->type()), num_lists, offsets_buffer, - child_array, null_bitmap, kUnknownNullCount); + child_array, null_bitmap, kUnknownNullCount); return ValidateArray(**out); } typedef Status MakeRecordBatch(std::shared_ptr<RecordBatch>* out); -Status MakeRandomBooleanArray( - const int length, bool include_nulls, std::shared_ptr<Array>* out) { +Status MakeRandomBooleanArray(const int length, bool include_nulls, + std::shared_ptr<Array>* out) { std::vector<uint8_t> values(length); test::random_null_bytes(length, 0.5, values.data()); std::shared_ptr<Buffer> data; @@ -210,10 +212,10 @@ Status MakeIntRecordBatch(std::shared_ptr<RecordBatch>* out) { } template <class Builder, class RawType> -Status MakeRandomBinaryArray( - int64_t length, bool include_nulls, MemoryPool* pool, std::shared_ptr<Array>* out) { - const std::vector<std::string> values = { - "", "", "abc", "123", "efg", "456!@#!@#", "12312"}; +Status MakeRandomBinaryArray(int64_t length, bool include_nulls, MemoryPool* pool, + std::shared_ptr<Array>* out) { + const std::vector<std::string> values = {"", "", "abc", "123", + "efg", "456!@#!@#", "12312"}; Builder builder(pool); const size_t values_len = values.size(); for (int64_t i = 0; i < length; ++i) { @@ -223,7 +225,7 @@ Status MakeRandomBinaryArray( } else { const std::string& value = values[values_index]; RETURN_NOT_OK(builder.Append(reinterpret_cast<const RawType*>(value.data()), - static_cast<int32_t>(value.size()))); + static_cast<int32_t>(value.size()))); } } return builder.Finish(out); @@ -434,11 +436,12 @@ Status MakeUnion(std::shared_ptr<RecordBatch>* out) { // construct individual nullable/non-nullable struct arrays auto sparse_no_nulls = std::make_shared<UnionArray>(sparse_type, length, sparse_children, type_ids_buffer); - auto sparse = std::make_shared<UnionArray>( - sparse_type, length, sparse_children, type_ids_buffer, nullptr, null_bitmask, 1); + auto sparse = std::make_shared<UnionArray>(sparse_type, length, sparse_children, + type_ids_buffer, nullptr, null_bitmask, 1); - auto dense = std::make_shared<UnionArray>(dense_type, length, dense_children, - type_ids_buffer, offsets_buffer, null_bitmask, 1); + auto dense = + std::make_shared<UnionArray>(dense_type, length, dense_children, type_ids_buffer, + offsets_buffer, null_bitmask, 1); // construct batch std::vector<std::shared_ptr<Array>> arrays = {sparse_no_nulls, sparse, dense}; @@ -480,8 +483,8 @@ Status MakeDictionary(std::shared_ptr<RecordBatch>* out) { std::vector<int32_t> list_offsets = {0, 0, 2, 2, 5, 6, 9}; std::shared_ptr<Array> offsets, indices3; - ArrayFromVector<Int32Type, int32_t>( - std::vector<bool>(list_offsets.size(), true), list_offsets, &offsets); + ArrayFromVector<Int32Type, int32_t>(std::vector<bool>(list_offsets.size(), true), + list_offsets, &offsets); std::vector<int8_t> indices3_values = {0, 1, 2, 0, 1, 2, 0, 1, 2}; std::vector<bool> is_valid3(9, true); @@ -490,8 +493,8 @@ Status MakeDictionary(std::shared_ptr<RecordBatch>* out) { std::shared_ptr<Buffer> null_bitmap; RETURN_NOT_OK(test::GetBitmapFromVector(is_valid, &null_bitmap)); - std::shared_ptr<Array> a3 = std::make_shared<ListArray>(f3_type, length, - std::static_pointer_cast<PrimitiveArray>(offsets)->values(), + std::shared_ptr<Array> a3 = std::make_shared<ListArray>( + f3_type, length, std::static_pointer_cast<PrimitiveArray>(offsets)->values(), std::make_shared<DictionaryArray>(f1_type, indices3), null_bitmap, 1); // Dictionary-encoded list of integer @@ -500,14 +503,15 @@ Status MakeDictionary(std::shared_ptr<RecordBatch>* out) { std::shared_ptr<Array> offsets4, values4, indices4; std::vector<int32_t> list_offsets4 = {0, 2, 2, 3}; - ArrayFromVector<Int32Type, int32_t>( - std::vector<bool>(4, true), list_offsets4, &offsets4); + ArrayFromVector<Int32Type, int32_t>(std::vector<bool>(4, true), list_offsets4, + &offsets4); std::vector<int8_t> list_values4 = {0, 1, 2}; ArrayFromVector<Int8Type, int8_t>(std::vector<bool>(3, true), list_values4, &values4); - auto dict3 = std::make_shared<ListArray>(f4_value_type, 3, - std::static_pointer_cast<PrimitiveArray>(offsets4)->values(), values4); + auto dict3 = std::make_shared<ListArray>( + f4_value_type, 3, std::static_pointer_cast<PrimitiveArray>(offsets4)->values(), + values4); std::vector<int8_t> indices4_values = {0, 1, 2, 0, 1, 2}; ArrayFromVector<Int8Type, int8_t>(is_valid, indices4_values, &indices4); @@ -516,9 +520,9 @@ Status MakeDictionary(std::shared_ptr<RecordBatch>* out) { auto a4 = std::make_shared<DictionaryArray>(f4_type, indices4); // construct batch - std::shared_ptr<Schema> schema(new Schema({field("dict1", f0_type), - field("sparse", f1_type), field("dense", f2_type), - field("list of encoded string", f3_type), field("encoded list<int8>", f4_type)})); + std::shared_ptr<Schema> schema(new Schema( + {field("dict1", f0_type), field("sparse", f1_type), field("dense", f2_type), + field("list of encoded string", f3_type), field("encoded list<int8>", f4_type)})); std::vector<std::shared_ptr<Array>> arrays = {a0, a1, a2, a3, a4}; @@ -575,7 +579,8 @@ Status MakeDates(std::shared_ptr<RecordBatch>* out) { ArrayFromVector<Date32Type, int32_t>(is_valid, date32_values, &date32_array); std::vector<int64_t> date64_values = {1489269000000, 1489270000000, 1489271000000, - 1489272000000, 1489272000000, 1489273000000, 1489274000000}; + 1489272000000, 1489272000000, 1489273000000, + 1489274000000}; std::shared_ptr<Array> date64_array; ArrayFromVector<Date64Type, int64_t>(is_valid, date64_values, &date64_array); @@ -592,7 +597,7 @@ Status MakeTimestamps(std::shared_ptr<RecordBatch>* out) { std::shared_ptr<Schema> schema(new Schema({f0, f1, f2})); std::vector<int64_t> ts_values = {1489269000000, 1489270000000, 1489271000000, - 1489272000000, 1489272000000, 1489273000000}; + 1489272000000, 1489272000000, 1489273000000}; std::shared_ptr<Array> a0, a1, a2; ArrayFromVector<TimestampType, int64_t>(f0->type(), is_valid, ts_values, &a0); @@ -612,10 +617,10 @@ Status MakeTimes(std::shared_ptr<RecordBatch>* out) { auto f3 = field("f3", time64(TimeUnit::NANO)); std::shared_ptr<Schema> schema(new Schema({f0, f1, f2, f3})); - std::vector<int32_t> t32_values = { - 1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000}; + std::vector<int32_t> t32_values = {1489269000, 1489270000, 1489271000, + 1489272000, 1489272000, 1489273000}; std::vector<int64_t> t64_values = {1489269000000, 1489270000000, 1489271000000, - 1489272000000, 1489272000000, 1489273000000}; + 1489272000000, 1489272000000, 1489273000000}; std::shared_ptr<Array> a0, a1, a2, a3; ArrayFromVector<Time32Type, int32_t>(f0->type(), is_valid, t32_values, &a0); @@ -630,7 +635,7 @@ Status MakeTimes(std::shared_ptr<RecordBatch>* out) { template <typename BuilderType, typename T> void AppendValues(const std::vector<bool>& is_valid, const std::vector<T>& values, - BuilderType* builder) { + BuilderType* builder) { for (size_t i = 0; i < values.size(); ++i) { if (is_valid[i]) { ASSERT_OK(builder->Append(values[i])); http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/ipc/writer.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 14708a1..163b27b 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -45,8 +45,9 @@ namespace ipc { // Record batch write path static inline Status GetTruncatedBitmap(int64_t offset, int64_t length, - const std::shared_ptr<Buffer> input, MemoryPool* pool, - std::shared_ptr<Buffer>* buffer) { + const std::shared_ptr<Buffer> input, + MemoryPool* pool, + std::shared_ptr<Buffer>* buffer) { if (!input) { *buffer = input; return Status::OK(); @@ -63,8 +64,8 @@ static inline Status GetTruncatedBitmap(int64_t offset, int64_t length, template <typename T> inline Status GetTruncatedBuffer(int64_t offset, int64_t length, - const std::shared_ptr<Buffer> input, MemoryPool* pool, - std::shared_ptr<Buffer>* buffer) { + const std::shared_ptr<Buffer> input, MemoryPool* pool, + std::shared_ptr<Buffer>* buffer) { if (!input) { *buffer = input; return Status::OK(); @@ -80,17 +81,19 @@ inline Status GetTruncatedBuffer(int64_t offset, int64_t length, return Status::OK(); } -static inline bool NeedTruncate( - int64_t offset, const Buffer* buffer, int64_t min_length) { +static inline bool NeedTruncate(int64_t offset, const Buffer* buffer, + int64_t min_length) { // buffer can be NULL - if (buffer == nullptr) { return false; } + if (buffer == nullptr) { + return false; + } return offset != 0 || min_length < buffer->size(); } class RecordBatchSerializer : public ArrayVisitor { public: RecordBatchSerializer(MemoryPool* pool, int64_t buffer_start_offset, - int max_recursion_depth, bool allow_64bit) + int max_recursion_depth, bool allow_64bit) : pool_(pool), max_recursion_depth_(max_recursion_depth), buffer_start_offset_(buffer_start_offset), @@ -114,8 +117,8 @@ class RecordBatchSerializer : public ArrayVisitor { if (arr.null_count() > 0) { std::shared_ptr<Buffer> bitmap; - RETURN_NOT_OK(GetTruncatedBitmap( - arr.offset(), arr.length(), arr.null_bitmap(), pool_, &bitmap)); + RETURN_NOT_OK(GetTruncatedBitmap(arr.offset(), arr.length(), arr.null_bitmap(), + pool_, &bitmap)); buffers_.push_back(bitmap); } else { // Push a dummy zero-length buffer, not to be copied @@ -175,14 +178,14 @@ class RecordBatchSerializer : public ArrayVisitor { } // Override this for writing dictionary metadata - virtual Status WriteMetadataMessage( - int64_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) { - return WriteRecordBatchMessage( - num_rows, body_length, field_nodes_, buffer_meta_, out); + virtual Status WriteMetadataMessage(int64_t num_rows, int64_t body_length, + std::shared_ptr<Buffer>* out) { + return WriteRecordBatchMessage(num_rows, body_length, field_nodes_, buffer_meta_, + out); } Status Write(const RecordBatch& batch, io::OutputStream* dst, int32_t* metadata_length, - int64_t* body_length) { + int64_t* body_length) { RETURN_NOT_OK(Assemble(batch, body_length)); #ifndef NDEBUG @@ -216,9 +219,13 @@ class RecordBatchSerializer : public ArrayVisitor { padding = BitUtil::RoundUpToMultipleOf64(size) - size; } - if (size > 0) { RETURN_NOT_OK(dst->Write(buffer->data(), size)); } + if (size > 0) { + RETURN_NOT_OK(dst->Write(buffer->data(), size)); + } - if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); } + if (padding > 0) { + RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); + } } #ifndef NDEBUG @@ -245,7 +252,7 @@ class RecordBatchSerializer : public ArrayVisitor { // Send padding if it's available const int64_t buffer_length = std::min(BitUtil::RoundUpToMultipleOf64(array.length() * type_width), - data->size() - byte_offset); + data->size() - byte_offset); data = SliceBuffer(data, byte_offset, buffer_length); } buffers_.push_back(data); @@ -253,8 +260,8 @@ class RecordBatchSerializer : public ArrayVisitor { } template <typename ArrayType> - Status GetZeroBasedValueOffsets( - const ArrayType& array, std::shared_ptr<Buffer>* value_offsets) { + Status GetZeroBasedValueOffsets(const ArrayType& array, + std::shared_ptr<Buffer>* value_offsets) { // Share slicing logic between ListArray and BinaryArray auto offsets = array.value_offsets(); @@ -265,8 +272,8 @@ class RecordBatchSerializer : public ArrayVisitor { // b) slice the values array accordingly std::shared_ptr<MutableBuffer> shifted_offsets; - RETURN_NOT_OK(AllocateBuffer( - pool_, sizeof(int32_t) * (array.length() + 1), &shifted_offsets)); + RETURN_NOT_OK(AllocateBuffer(pool_, sizeof(int32_t) * (array.length() + 1), + &shifted_offsets)); int32_t* dest_offsets = reinterpret_cast<int32_t*>(shifted_offsets->mutable_data()); const int32_t start_offset = array.value_offset(0); @@ -392,13 +399,15 @@ class RecordBatchSerializer : public ArrayVisitor { const auto& type = static_cast<const UnionType&>(*array.type()); std::shared_ptr<Buffer> value_offsets; - RETURN_NOT_OK(GetTruncatedBuffer<int32_t>( - offset, length, array.value_offsets(), pool_, &value_offsets)); + RETURN_NOT_OK(GetTruncatedBuffer<int32_t>(offset, length, array.value_offsets(), + pool_, &value_offsets)); // The Union type codes are not necessary 0-indexed uint8_t max_code = 0; for (uint8_t code : type.type_codes()) { - if (code > max_code) { max_code = code; } + if (code > max_code) { + max_code = code; + } } // Allocate an array of child offsets. Set all to -1 to indicate that we @@ -424,7 +433,9 @@ class RecordBatchSerializer : public ArrayVisitor { for (int64_t i = 0; i < length; ++i) { const uint8_t code = type_ids[i]; int32_t shift = child_offsets[code]; - if (shift == -1) { child_offsets[code] = shift = unshifted_offsets[i]; } + if (shift == -1) { + child_offsets[code] = shift = unshifted_offsets[i]; + } shifted_offsets[i] = unshifted_offsets[i] - shift; // Update the child length to account for observed value @@ -486,14 +497,14 @@ class DictionaryWriter : public RecordBatchSerializer { public: using RecordBatchSerializer::RecordBatchSerializer; - Status WriteMetadataMessage( - int64_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) override { - return WriteDictionaryMessage( - dictionary_id_, num_rows, body_length, field_nodes_, buffer_meta_, out); + Status WriteMetadataMessage(int64_t num_rows, int64_t body_length, + std::shared_ptr<Buffer>* out) override { + return WriteDictionaryMessage(dictionary_id_, num_rows, body_length, field_nodes_, + buffer_meta_, out); } Status Write(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary, - io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) { + io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) { dictionary_id_ = dictionary_id; // Make a dummy record batch. A bit tedious as we have to make a schema @@ -516,27 +527,30 @@ Status AlignStreamPosition(io::OutputStream* stream) { int64_t position; RETURN_NOT_OK(stream->Tell(&position)); int64_t remainder = PaddedLength(position) - position; - if (remainder > 0) { return stream->Write(kPaddingBytes, remainder); } + if (remainder > 0) { + return stream->Write(kPaddingBytes, remainder); + } return Status::OK(); } Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, - io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, - MemoryPool* pool, int max_recursion_depth, bool allow_64bit) { - RecordBatchSerializer writer( - pool, buffer_start_offset, max_recursion_depth, allow_64bit); + io::OutputStream* dst, int32_t* metadata_length, + int64_t* body_length, MemoryPool* pool, int max_recursion_depth, + bool allow_64bit) { + RecordBatchSerializer writer(pool, buffer_start_offset, max_recursion_depth, + allow_64bit); return writer.Write(batch, dst, metadata_length, body_length); } Status WriteLargeRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, - io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, - MemoryPool* pool) { + io::OutputStream* dst, int32_t* metadata_length, + int64_t* body_length, MemoryPool* pool) { return WriteRecordBatch(batch, buffer_start_offset, dst, metadata_length, body_length, - pool, kMaxNestingDepth, true); + pool, kMaxNestingDepth, true); } Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length, - int64_t* body_length) { + int64_t* body_length) { if (!tensor.is_contiguous()) { return Status::Invalid("No support yet for writing non-contiguous tensors"); } @@ -556,8 +570,8 @@ Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadat } Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary, - int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, - int64_t* body_length, MemoryPool* pool) { + int64_t buffer_start_offset, io::OutputStream* dst, + int32_t* metadata_length, int64_t* body_length, MemoryPool* pool) { DictionaryWriter writer(pool, buffer_start_offset, kMaxNestingDepth, false); return writer.Write(dictionary_id, dictionary, dst, metadata_length, body_length); } @@ -568,7 +582,7 @@ Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) { int64_t body_length = 0; io::MockOutputStream dst; RETURN_NOT_OK(WriteRecordBatch(batch, 0, &dst, &metadata_length, &body_length, - default_memory_pool(), kMaxNestingDepth, true)); + default_memory_pool(), kMaxNestingDepth, true)); *size = dst.GetExtentBytesWritten(); return Status::OK(); } @@ -632,7 +646,9 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl { } Status CheckStarted() { - if (!started_) { return Start(); } + if (!started_) { + return Start(); + } return Status::OK(); } @@ -653,7 +669,7 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl { // Frame of reference in file format is 0, see ARROW-384 const int64_t buffer_start_offset = 0; RETURN_NOT_OK(WriteDictionary(entry.first, entry.second, buffer_start_offset, sink_, - &block->metadata_length, &block->body_length, pool_)); + &block->metadata_length, &block->body_length, pool_)); RETURN_NOT_OK(UpdatePosition()); DCHECK(position_ % 8 == 0) << "WriteDictionary did not perform aligned writes"; } @@ -668,9 +684,9 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl { // Frame of reference in file format is 0, see ARROW-384 const int64_t buffer_start_offset = 0; - RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(batch, buffer_start_offset, sink_, - &block->metadata_length, &block->body_length, pool_, kMaxNestingDepth, - allow_64bit)); + RETURN_NOT_OK(arrow::ipc::WriteRecordBatch( + batch, buffer_start_offset, sink_, &block->metadata_length, &block->body_length, + pool_, kMaxNestingDepth, allow_64bit)); RETURN_NOT_OK(UpdatePosition()); DCHECK(position_ % 8 == 0) << "WriteRecordBatch did not perform aligned writes"; @@ -681,15 +697,17 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl { Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) { // Push an empty FileBlock. Can be written in the footer later record_batches_.push_back({0, 0, 0}); - return WriteRecordBatch( - batch, allow_64bit, &record_batches_[record_batches_.size() - 1]); + return WriteRecordBatch(batch, allow_64bit, + &record_batches_[record_batches_.size() - 1]); } // Adds padding bytes if necessary to ensure all memory blocks are written on // 64-byte (or other alignment) boundaries. Status Align(int64_t alignment = kArrowAlignment) { int64_t remainder = PaddedLength(position_, alignment) - position_; - if (remainder > 0) { return Write(kPaddingBytes, remainder); } + if (remainder > 0) { + return Write(kPaddingBytes, remainder); + } return Status::OK(); } @@ -725,8 +743,8 @@ RecordBatchStreamWriter::RecordBatchStreamWriter() { RecordBatchStreamWriter::~RecordBatchStreamWriter() {} -Status RecordBatchStreamWriter::WriteRecordBatch( - const RecordBatch& batch, bool allow_64bit) { +Status RecordBatchStreamWriter::WriteRecordBatch(const RecordBatch& batch, + bool allow_64bit) { return impl_->WriteRecordBatch(batch, allow_64bit); } @@ -735,16 +753,14 @@ void RecordBatchStreamWriter::set_memory_pool(MemoryPool* pool) { } Status RecordBatchStreamWriter::Open(io::OutputStream* sink, - const std::shared_ptr<Schema>& schema, - std::shared_ptr<RecordBatchStreamWriter>* out) { + const std::shared_ptr<Schema>& schema, + std::shared_ptr<RecordBatchStreamWriter>* out) { // ctor is private *out = std::shared_ptr<RecordBatchStreamWriter>(new RecordBatchStreamWriter()); return (*out)->impl_->Open(sink, schema); } -Status RecordBatchStreamWriter::Close() { - return impl_->Close(); -} +Status RecordBatchStreamWriter::Close() { return impl_->Close(); } // ---------------------------------------------------------------------- // File writer implementation @@ -756,8 +772,8 @@ class RecordBatchFileWriter::RecordBatchFileWriterImpl Status Start() override { // It is only necessary to align to 8-byte boundary at the start of the file - RETURN_NOT_OK(Write( - reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes))); + RETURN_NOT_OK(Write(reinterpret_cast<const uint8_t*>(kArrowMagicBytes), + strlen(kArrowMagicBytes))); RETURN_NOT_OK(Align(8)); // We write the schema at the start of the file (and the end). This also @@ -768,21 +784,23 @@ class RecordBatchFileWriter::RecordBatchFileWriterImpl Status Close() override { // Write metadata int64_t initial_position = position_; - RETURN_NOT_OK(WriteFileFooter( - *schema_, dictionaries_, record_batches_, &dictionary_memo_, sink_)); + RETURN_NOT_OK(WriteFileFooter(*schema_, dictionaries_, record_batches_, + &dictionary_memo_, sink_)); RETURN_NOT_OK(UpdatePosition()); // Write footer length int32_t footer_length = static_cast<int32_t>(position_ - initial_position); - if (footer_length <= 0) { return Status::Invalid("Invalid file footer"); } + if (footer_length <= 0) { + return Status::Invalid("Invalid file footer"); + } RETURN_NOT_OK( Write(reinterpret_cast<const uint8_t*>(&footer_length), sizeof(int32_t))); // Write magic bytes to end file - return Write( - reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes)); + return Write(reinterpret_cast<const uint8_t*>(kArrowMagicBytes), + strlen(kArrowMagicBytes)); } }; @@ -793,20 +811,19 @@ RecordBatchFileWriter::RecordBatchFileWriter() { RecordBatchFileWriter::~RecordBatchFileWriter() {} Status RecordBatchFileWriter::Open(io::OutputStream* sink, - const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatchFileWriter>* out) { + const std::shared_ptr<Schema>& schema, + std::shared_ptr<RecordBatchFileWriter>* out) { *out = std::shared_ptr<RecordBatchFileWriter>( new RecordBatchFileWriter()); // ctor is private return (*out)->impl_->Open(sink, schema); } -Status RecordBatchFileWriter::WriteRecordBatch( - const RecordBatch& batch, bool allow_64bit) { +Status RecordBatchFileWriter::WriteRecordBatch(const RecordBatch& batch, + bool allow_64bit) { return impl_->WriteRecordBatch(batch, allow_64bit); } -Status RecordBatchFileWriter::Close() { - return impl_->Close(); -} +Status RecordBatchFileWriter::Close() { return impl_->Close(); } } // namespace ipc } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/ipc/writer.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index 899a1b2..c28dfe0 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -85,7 +85,7 @@ class ARROW_EXPORT RecordBatchStreamWriter : public RecordBatchWriter { /// \param(out) out the created stream writer /// \return Status indicating success or failure static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, - std::shared_ptr<RecordBatchStreamWriter>* out); + std::shared_ptr<RecordBatchStreamWriter>* out); Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override; Status Close() override; @@ -113,7 +113,7 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter { /// \param(out) out the created stream writer /// \return Status indicating success or failure static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, - std::shared_ptr<RecordBatchFileWriter>* out); + std::shared_ptr<RecordBatchFileWriter>* out); Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override; Status Close() override; @@ -145,14 +145,16 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter { /// \param(out) body_length: the size of the contiguous buffer block plus /// padding bytes Status ARROW_EXPORT WriteRecordBatch(const RecordBatch& batch, - int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, - int64_t* body_length, MemoryPool* pool, int max_recursion_depth = kMaxNestingDepth, - bool allow_64bit = false); + int64_t buffer_start_offset, io::OutputStream* dst, + int32_t* metadata_length, int64_t* body_length, + MemoryPool* pool, + int max_recursion_depth = kMaxNestingDepth, + bool allow_64bit = false); // Write Array as a DictionaryBatch message Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary, - int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, - int64_t* body_length, MemoryPool* pool); + int64_t buffer_start_offset, io::OutputStream* dst, + int32_t* metadata_length, int64_t* body_length, MemoryPool* pool); // Compute the precise number of bytes needed in a contiguous memory segment to // write the record batch. This involves generating the complete serialized @@ -166,13 +168,14 @@ Status ARROW_EXPORT GetTensorSize(const Tensor& tensor, int64_t* size); /// EXPERIMENTAL: Write RecordBatch allowing lengths over INT32_MAX. This data /// may not be readable by all Arrow implementations Status ARROW_EXPORT WriteLargeRecordBatch(const RecordBatch& batch, - int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, - int64_t* body_length, MemoryPool* pool); + int64_t buffer_start_offset, + io::OutputStream* dst, int32_t* metadata_length, + int64_t* body_length, MemoryPool* pool); /// EXPERIMENTAL: Write arrow::Tensor as a contiguous message /// <metadata size><metadata><tensor data> Status ARROW_EXPORT WriteTensor(const Tensor& tensor, io::OutputStream* dst, - int32_t* metadata_length, int64_t* body_length); + int32_t* metadata_length, int64_t* body_length); /// Backwards-compatibility for Arrow < 0.4.0 /// http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/memory_pool-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/memory_pool-test.cc b/cpp/src/arrow/memory_pool-test.cc index 8a185ab..52e48db 100644 --- a/cpp/src/arrow/memory_pool-test.cc +++ b/cpp/src/arrow/memory_pool-test.cc @@ -27,9 +27,7 @@ class TestDefaultMemoryPool : public ::arrow::test::TestMemoryPoolBase { ::arrow::MemoryPool* memory_pool() override { return ::arrow::default_memory_pool(); } }; -TEST_F(TestDefaultMemoryPool, MemoryTracking) { - this->TestMemoryTracking(); -} +TEST_F(TestDefaultMemoryPool, MemoryTracking) { this->TestMemoryTracking(); } TEST_F(TestDefaultMemoryPool, OOM) { #ifndef ADDRESS_SANITIZER @@ -37,9 +35,7 @@ TEST_F(TestDefaultMemoryPool, OOM) { #endif } -TEST_F(TestDefaultMemoryPool, Reallocate) { - this->TestReallocate(); -} +TEST_F(TestDefaultMemoryPool, Reallocate) { this->TestReallocate(); } // Death tests and valgrind are known to not play well 100% of the time. See // googletest documentation @@ -53,7 +49,7 @@ TEST(DefaultMemoryPoolDeathTest, FreeLargeMemory) { #ifndef NDEBUG EXPECT_EXIT(pool->Free(data, 120), ::testing::ExitedWithCode(1), - ".*Check failed: \\(bytes_allocated_\\) >= \\(size\\)"); + ".*Check failed: \\(bytes_allocated_\\) >= \\(size\\)"); #endif pool->Free(data, 100); http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/memory_pool.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index e7de5c4..769fc10 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -17,12 +17,12 @@ #include "arrow/memory_pool.h" +#include <stdlib.h> #include <algorithm> #include <cstdlib> #include <iostream> #include <mutex> #include <sstream> -#include <stdlib.h> #include "arrow/status.h" #include "arrow/util/logging.h" @@ -60,8 +60,8 @@ Status AllocateAligned(int64_t size, uint8_t** out) { return Status::OutOfMemory(ss.str()); } #else - const int result = posix_memalign( - reinterpret_cast<void**>(out), kAlignment, static_cast<size_t>(size)); + const int result = posix_memalign(reinterpret_cast<void**>(out), kAlignment, + static_cast<size_t>(size)); if (result == ENOMEM) { std::stringstream ss; ss << "malloc of size " << size << " failed"; @@ -82,13 +82,9 @@ MemoryPool::MemoryPool() {} MemoryPool::~MemoryPool() {} -int64_t MemoryPool::max_memory() const { - return -1; -} +int64_t MemoryPool::max_memory() const { return -1; } -DefaultMemoryPool::DefaultMemoryPool() : bytes_allocated_(0) { - max_memory_ = 0; -} +DefaultMemoryPool::DefaultMemoryPool() : bytes_allocated_(0) { max_memory_ = 0; } Status DefaultMemoryPool::Allocate(int64_t size, uint8_t** out) { RETURN_NOT_OK(AllocateAligned(size, out)); @@ -96,7 +92,9 @@ Status DefaultMemoryPool::Allocate(int64_t size, uint8_t** out) { { std::lock_guard<std::mutex> guard(lock_); - if (bytes_allocated_ > max_memory_) { max_memory_ = bytes_allocated_.load(); } + if (bytes_allocated_ > max_memory_) { + max_memory_ = bytes_allocated_.load(); + } } return Status::OK(); } @@ -128,15 +126,15 @@ Status DefaultMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t bytes_allocated_ += new_size - old_size; { std::lock_guard<std::mutex> guard(lock_); - if (bytes_allocated_ > max_memory_) { max_memory_ = bytes_allocated_.load(); } + if (bytes_allocated_ > max_memory_) { + max_memory_ = bytes_allocated_.load(); + } } return Status::OK(); } -int64_t DefaultMemoryPool::bytes_allocated() const { - return bytes_allocated_.load(); -} +int64_t DefaultMemoryPool::bytes_allocated() const { return bytes_allocated_.load(); } void DefaultMemoryPool::Free(uint8_t* buffer, int64_t size) { DCHECK_GE(bytes_allocated_, size); @@ -150,9 +148,7 @@ void DefaultMemoryPool::Free(uint8_t* buffer, int64_t size) { bytes_allocated_ -= size; } -int64_t DefaultMemoryPool::max_memory() const { - return max_memory_.load(); -} +int64_t DefaultMemoryPool::max_memory() const { return max_memory_.load(); } DefaultMemoryPool::~DefaultMemoryPool() {} http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/pretty_print-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/pretty_print-test.cc b/cpp/src/arrow/pretty_print-test.cc index 10a91f5..049f5a5 100644 --- a/cpp/src/arrow/pretty_print-test.cc +++ b/cpp/src/arrow/pretty_print-test.cc @@ -57,7 +57,7 @@ void CheckArray(const Array& arr, int indent, const char* expected) { template <typename TYPE, typename C_TYPE> void CheckPrimitive(int indent, const std::vector<bool>& is_valid, - const std::vector<C_TYPE>& values, const char* expected) { + const std::vector<C_TYPE>& values, const char* expected) { std::shared_ptr<Array> array; ArrayFromVector<TYPE, C_TYPE>(is_valid, values, &array); CheckArray(*array, indent, expected);
