http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/array.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h index 91fb93e..1a4a923 100644 --- a/cpp/src/arrow/array.h +++ b/cpp/src/arrow/array.h @@ -18,9 +18,13 @@ #ifndef ARROW_ARRAY_H #define ARROW_ARRAY_H +#include <cmath> #include <cstdint> #include <memory> +#include <string> +#include <vector> +#include "arrow/buffer.h" #include "arrow/type.h" #include "arrow/util/bit-util.h" #include "arrow/util/macros.h" @@ -28,7 +32,6 @@ namespace arrow { -class Buffer; class MemoryPool; class MutableBuffer; class Status; @@ -110,6 +113,374 @@ typedef std::shared_ptr<Array> ArrayPtr; Status ARROW_EXPORT GetEmptyBitmap( MemoryPool* pool, int32_t length, std::shared_ptr<MutableBuffer>* result); +// Base class for fixed-size logical types +class ARROW_EXPORT PrimitiveArray : public Array { + public: + virtual ~PrimitiveArray() {} + + std::shared_ptr<Buffer> data() const { return data_; } + + bool EqualsExact(const PrimitiveArray& other) const; + bool Equals(const std::shared_ptr<Array>& arr) const override; + + protected: + PrimitiveArray(const TypePtr& type, int32_t length, const std::shared_ptr<Buffer>& data, + int32_t null_count = 0, const std::shared_ptr<Buffer>& null_bitmap = nullptr); + std::shared_ptr<Buffer> data_; + const uint8_t* raw_data_; +}; + +template <typename TYPE> +class ARROW_EXPORT NumericArray : public PrimitiveArray { + public: + using TypeClass = TYPE; + using value_type = typename TypeClass::c_type; + NumericArray(int32_t length, const std::shared_ptr<Buffer>& data, + int32_t null_count = 0, const std::shared_ptr<Buffer>& null_bitmap = nullptr) + : PrimitiveArray( + std::make_shared<TypeClass>(), length, data, null_count, null_bitmap) {} + NumericArray(const TypePtr& type, int32_t length, const std::shared_ptr<Buffer>& data, + int32_t null_count = 0, const std::shared_ptr<Buffer>& null_bitmap = nullptr) + : PrimitiveArray(type, length, data, null_count, null_bitmap) {} + + bool EqualsExact(const NumericArray<TypeClass>& other) const { + return PrimitiveArray::EqualsExact(static_cast<const PrimitiveArray&>(other)); + } + + bool ApproxEquals(const std::shared_ptr<Array>& arr) const { return Equals(arr); } + + bool RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_idx, + const ArrayPtr& arr) const override { + if (this == arr.get()) { return true; } + if (!arr) { return false; } + if (this->type_enum() != arr->type_enum()) { return false; } + const auto other = static_cast<NumericArray<TypeClass>*>(arr.get()); + for (int32_t i = start_idx, o_i = other_start_idx; i < end_idx; ++i, ++o_i) { + const bool is_null = IsNull(i); + if (is_null != arr->IsNull(o_i) || (!is_null && Value(i) != other->Value(o_i))) { + return false; + } + } + return true; + } + const value_type* raw_data() const { + return reinterpret_cast<const value_type*>(raw_data_); + } + + Status Accept(ArrayVisitor* visitor) const override; + + value_type Value(int i) const { return raw_data()[i]; } +}; + +template <> +inline bool NumericArray<FloatType>::ApproxEquals( + const std::shared_ptr<Array>& arr) const { + if (this == arr.get()) { return true; } + if (!arr) { return false; } + if (this->type_enum() != arr->type_enum()) { return false; } + + const auto& other = *static_cast<NumericArray<FloatType>*>(arr.get()); + + if (this == &other) { return true; } + if (null_count_ != other.null_count_) { return false; } + + auto this_data = reinterpret_cast<const float*>(raw_data_); + auto other_data = reinterpret_cast<const float*>(other.raw_data_); + + static constexpr float EPSILON = 1E-5; + + if (length_ == 0 && other.length_ == 0) { return true; } + + if (null_count_ > 0) { + bool equal_bitmap = + null_bitmap_->Equals(*other.null_bitmap_, BitUtil::CeilByte(length_) / 8); + if (!equal_bitmap) { return false; } + + for (int i = 0; i < length_; ++i) { + if (IsNull(i)) continue; + if (fabs(this_data[i] - other_data[i]) > EPSILON) { return false; } + } + } else { + for (int i = 0; i < length_; ++i) { + if (fabs(this_data[i] - other_data[i]) > EPSILON) { return false; } + } + } + return true; +} + +template <> +inline bool NumericArray<DoubleType>::ApproxEquals( + const std::shared_ptr<Array>& arr) const { + if (this == arr.get()) { return true; } + if (!arr) { return false; } + if (this->type_enum() != arr->type_enum()) { return false; } + + const auto& other = *static_cast<NumericArray<DoubleType>*>(arr.get()); + + if (this == &other) { return true; } + if (null_count_ != other.null_count_) { return false; } + + auto this_data = reinterpret_cast<const double*>(raw_data_); + auto other_data = reinterpret_cast<const double*>(other.raw_data_); + + if (length_ == 0 && other.length_ == 0) { return true; } + + static constexpr double EPSILON = 1E-5; + + if (null_count_ > 0) { + bool equal_bitmap = + null_bitmap_->Equals(*other.null_bitmap_, BitUtil::CeilByte(length_) / 8); + if (!equal_bitmap) { return false; } + + for (int i = 0; i < length_; ++i) { + if (IsNull(i)) continue; + if (fabs(this_data[i] - other_data[i]) > EPSILON) { return false; } + } + } else { + for (int i = 0; i < length_; ++i) { + if (fabs(this_data[i] - other_data[i]) > EPSILON) { return false; } + } + } + return true; +} + +class ARROW_EXPORT BooleanArray : public PrimitiveArray { + public: + using TypeClass = BooleanType; + + BooleanArray(int32_t length, const std::shared_ptr<Buffer>& data, + int32_t null_count = 0, const std::shared_ptr<Buffer>& null_bitmap = nullptr); + BooleanArray(const TypePtr& type, int32_t length, const std::shared_ptr<Buffer>& data, + int32_t null_count = 0, const std::shared_ptr<Buffer>& null_bitmap = nullptr); + + bool EqualsExact(const BooleanArray& other) const; + bool Equals(const ArrayPtr& arr) const override; + bool RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_idx, + const ArrayPtr& arr) const override; + + Status Accept(ArrayVisitor* visitor) const override; + + const uint8_t* raw_data() const { return reinterpret_cast<const uint8_t*>(raw_data_); } + + bool Value(int i) const { return BitUtil::GetBit(raw_data(), i); } +}; + +// ---------------------------------------------------------------------- +// ListArray + +class ARROW_EXPORT ListArray : public Array { + public: + using TypeClass = ListType; + + ListArray(const TypePtr& type, int32_t length, std::shared_ptr<Buffer> offsets, + const ArrayPtr& values, int32_t null_count = 0, + std::shared_ptr<Buffer> null_bitmap = nullptr) + : Array(type, length, null_count, null_bitmap) { + offset_buffer_ = offsets; + offsets_ = offsets == nullptr ? nullptr : reinterpret_cast<const int32_t*>( + offset_buffer_->data()); + values_ = values; + } + + Status Validate() const override; + + virtual ~ListArray() = default; + + // Return a shared pointer in case the requestor desires to share ownership + // with this array. + std::shared_ptr<Array> values() const { return values_; } + std::shared_ptr<Buffer> offsets() const { + return std::static_pointer_cast<Buffer>(offset_buffer_); + } + + std::shared_ptr<DataType> value_type() const { return values_->type(); } + + const int32_t* raw_offsets() const { return offsets_; } + + int32_t offset(int i) const { return offsets_[i]; } + + // Neither of these functions will perform boundschecking + int32_t value_offset(int i) const { return offsets_[i]; } + int32_t value_length(int i) const { return offsets_[i + 1] - offsets_[i]; } + + bool EqualsExact(const ListArray& other) const; + bool Equals(const std::shared_ptr<Array>& arr) const override; + + bool RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_idx, + const ArrayPtr& arr) const override; + + Status Accept(ArrayVisitor* visitor) const override; + + protected: + std::shared_ptr<Buffer> offset_buffer_; + const int32_t* offsets_; + ArrayPtr values_; +}; + +// ---------------------------------------------------------------------- +// Binary and String + +class ARROW_EXPORT BinaryArray : public Array { + public: + using TypeClass = BinaryType; + + BinaryArray(int32_t length, const std::shared_ptr<Buffer>& offsets, + const std::shared_ptr<Buffer>& data, int32_t null_count = 0, + const std::shared_ptr<Buffer>& null_bitmap = nullptr); + + // Constructor that allows sub-classes/builders to propagate there logical type up the + // class hierarchy. + BinaryArray(const TypePtr& type, int32_t length, const std::shared_ptr<Buffer>& offsets, + const std::shared_ptr<Buffer>& data, int32_t null_count = 0, + const std::shared_ptr<Buffer>& null_bitmap = nullptr); + + // Return the pointer to the given elements bytes + // TODO(emkornfield) introduce a StringPiece or something similar to capture zero-copy + // pointer + offset + const uint8_t* GetValue(int i, int32_t* out_length) const { + const int32_t pos = offsets_[i]; + *out_length = offsets_[i + 1] - pos; + return data_ + pos; + } + + std::shared_ptr<Buffer> data() const { return data_buffer_; } + std::shared_ptr<Buffer> offsets() const { return offset_buffer_; } + + const int32_t* raw_offsets() const { return offsets_; } + + int32_t offset(int i) const { return offsets_[i]; } + + // Neither of these functions will perform boundschecking + int32_t value_offset(int i) const { return offsets_[i]; } + int32_t value_length(int i) const { return offsets_[i + 1] - offsets_[i]; } + + bool EqualsExact(const BinaryArray& other) const; + bool Equals(const std::shared_ptr<Array>& arr) const override; + bool RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_idx, + const ArrayPtr& arr) const override; + + Status Validate() const override; + + Status Accept(ArrayVisitor* visitor) const override; + + private: + std::shared_ptr<Buffer> offset_buffer_; + const int32_t* offsets_; + + std::shared_ptr<Buffer> data_buffer_; + const uint8_t* data_; +}; + +class ARROW_EXPORT StringArray : public BinaryArray { + public: + using TypeClass = StringType; + + StringArray(int32_t length, const std::shared_ptr<Buffer>& offsets, + const std::shared_ptr<Buffer>& data, int32_t null_count = 0, + const std::shared_ptr<Buffer>& null_bitmap = nullptr); + + // Construct a std::string + // TODO: std::bad_alloc possibility + std::string GetString(int i) const { + int32_t nchars; + const uint8_t* str = GetValue(i, &nchars); + return std::string(reinterpret_cast<const char*>(str), nchars); + } + + Status Validate() const override; + + Status Accept(ArrayVisitor* visitor) const override; +}; + +// ---------------------------------------------------------------------- +// Struct + +class ARROW_EXPORT StructArray : public Array { + public: + using TypeClass = StructType; + + StructArray(const TypePtr& type, int32_t length, std::vector<ArrayPtr>& field_arrays, + int32_t null_count = 0, std::shared_ptr<Buffer> null_bitmap = nullptr) + : Array(type, length, null_count, null_bitmap) { + type_ = type; + field_arrays_ = field_arrays; + } + + Status Validate() const override; + + virtual ~StructArray() {} + + // Return a shared pointer in case the requestor desires to share ownership + // with this array. + std::shared_ptr<Array> field(int32_t pos) const; + + const std::vector<ArrayPtr>& fields() const { return field_arrays_; } + + bool EqualsExact(const StructArray& other) const; + bool Equals(const std::shared_ptr<Array>& arr) const override; + bool RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_idx, + const std::shared_ptr<Array>& arr) const override; + + Status Accept(ArrayVisitor* visitor) const override; + + protected: + // The child arrays corresponding to each field of the struct data type. + std::vector<ArrayPtr> field_arrays_; +}; + +// ---------------------------------------------------------------------- +// Union + +class UnionArray : public Array { + protected: + // The data are types encoded as int16 + Buffer* types_; + std::vector<std::shared_ptr<Array>> children_; +}; + +class DenseUnionArray : public UnionArray { + protected: + Buffer* offset_buf_; +}; + +class SparseUnionArray : public UnionArray {}; + +// ---------------------------------------------------------------------- +// extern templates and other details + +// gcc and clang disagree about how to handle template visibility when you have +// explicit specializations https://llvm.org/bugs/show_bug.cgi?id=24815 +#if defined(__GNUC__) && !defined(__clang__) +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wattributes" +#endif + +// Only instantiate these templates once +extern template class ARROW_EXPORT NumericArray<Int8Type>; +extern template class ARROW_EXPORT NumericArray<UInt8Type>; +extern template class ARROW_EXPORT NumericArray<Int16Type>; +extern template class ARROW_EXPORT NumericArray<UInt16Type>; +extern template class ARROW_EXPORT NumericArray<Int32Type>; +extern template class ARROW_EXPORT NumericArray<UInt32Type>; +extern template class ARROW_EXPORT NumericArray<Int64Type>; +extern template class ARROW_EXPORT NumericArray<UInt64Type>; +extern template class ARROW_EXPORT NumericArray<HalfFloatType>; +extern template class ARROW_EXPORT NumericArray<FloatType>; +extern template class ARROW_EXPORT NumericArray<DoubleType>; +extern template class ARROW_EXPORT NumericArray<TimestampType>; + +#if defined(__GNUC__) && !defined(__clang__) +#pragma GCC diagnostic pop +#endif + +// ---------------------------------------------------------------------- +// Helper functions + +// Create new arrays for logical types that are backed by primitive arrays. +Status ARROW_EXPORT MakePrimitiveArray(const std::shared_ptr<DataType>& type, + int32_t length, const std::shared_ptr<Buffer>& data, int32_t null_count, + const std::shared_ptr<Buffer>& null_bitmap, std::shared_ptr<Array>* out); + } // namespace arrow #endif
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/buffer-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/buffer-test.cc b/cpp/src/arrow/buffer-test.cc new file mode 100644 index 0000000..c1d027b --- /dev/null +++ b/cpp/src/arrow/buffer-test.cc @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <limits> +#include <string> + +#include "gtest/gtest.h" + +#include "arrow/buffer.h" +#include "arrow/status.h" +#include "arrow/test-util.h" + +using std::string; + +namespace arrow { + +class TestBuffer : public ::testing::Test {}; + +TEST_F(TestBuffer, IsMutableFlag) { + Buffer buf(nullptr, 0); + + ASSERT_FALSE(buf.is_mutable()); + + MutableBuffer mbuf(nullptr, 0); + ASSERT_TRUE(mbuf.is_mutable()); + + PoolBuffer pbuf; + ASSERT_TRUE(pbuf.is_mutable()); +} + +TEST_F(TestBuffer, Resize) { + PoolBuffer buf; + + ASSERT_EQ(0, buf.size()); + ASSERT_OK(buf.Resize(100)); + ASSERT_EQ(100, buf.size()); + ASSERT_OK(buf.Resize(200)); + ASSERT_EQ(200, buf.size()); + + // Make it smaller, too + ASSERT_OK(buf.Resize(50)); + ASSERT_EQ(50, buf.size()); +} + +TEST_F(TestBuffer, ResizeOOM) { + // realloc fails, even though there may be no explicit limit + PoolBuffer buf; + ASSERT_OK(buf.Resize(100)); + int64_t to_alloc = std::numeric_limits<int64_t>::max(); + ASSERT_RAISES(OutOfMemory, buf.Resize(to_alloc)); +} + +TEST_F(TestBuffer, EqualsWithSameContent) { + MemoryPool* pool = default_memory_pool(); + const int32_t bufferSize = 128 * 1024; + uint8_t* rawBuffer1; + ASSERT_OK(pool->Allocate(bufferSize, &rawBuffer1)); + memset(rawBuffer1, 12, bufferSize); + uint8_t* rawBuffer2; + ASSERT_OK(pool->Allocate(bufferSize, &rawBuffer2)); + memset(rawBuffer2, 12, bufferSize); + uint8_t* rawBuffer3; + ASSERT_OK(pool->Allocate(bufferSize, &rawBuffer3)); + memset(rawBuffer3, 3, bufferSize); + + Buffer buffer1(rawBuffer1, bufferSize); + Buffer buffer2(rawBuffer2, bufferSize); + Buffer buffer3(rawBuffer3, bufferSize); + ASSERT_TRUE(buffer1.Equals(buffer2)); + ASSERT_FALSE(buffer1.Equals(buffer3)); + + pool->Free(rawBuffer1, bufferSize); + pool->Free(rawBuffer2, bufferSize); + pool->Free(rawBuffer3, bufferSize); +} + +TEST_F(TestBuffer, EqualsWithSameBuffer) { + MemoryPool* pool = default_memory_pool(); + const int32_t bufferSize = 128 * 1024; + uint8_t* rawBuffer; + ASSERT_OK(pool->Allocate(bufferSize, &rawBuffer)); + memset(rawBuffer, 111, bufferSize); + + Buffer buffer1(rawBuffer, bufferSize); + Buffer buffer2(rawBuffer, bufferSize); + ASSERT_TRUE(buffer1.Equals(buffer2)); + + const int64_t nbytes = bufferSize / 2; + Buffer buffer3(rawBuffer, nbytes); + ASSERT_TRUE(buffer1.Equals(buffer3, nbytes)); + ASSERT_FALSE(buffer1.Equals(buffer3, nbytes + 1)); + + pool->Free(rawBuffer, bufferSize); +} + +TEST_F(TestBuffer, Copy) { + std::string data_str = "some data to copy"; + + auto data = reinterpret_cast<const uint8_t*>(data_str.c_str()); + + Buffer buf(data, data_str.size()); + + std::shared_ptr<Buffer> out; + + ASSERT_OK(buf.Copy(5, 4, &out)); + + Buffer expected(data + 5, 4); + ASSERT_TRUE(out->Equals(expected)); +} + +TEST_F(TestBuffer, SliceBuffer) { + std::string data_str = "some data to slice"; + + auto data = reinterpret_cast<const uint8_t*>(data_str.c_str()); + + auto buf = std::make_shared<Buffer>(data, data_str.size()); + + std::shared_ptr<Buffer> out = SliceBuffer(buf, 5, 4); + Buffer expected(data + 5, 4); + ASSERT_TRUE(out->Equals(expected)); + + ASSERT_EQ(2, buf.use_count()); +} + +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/buffer.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/buffer.cc b/cpp/src/arrow/buffer.cc new file mode 100644 index 0000000..6ffa03a --- /dev/null +++ b/cpp/src/arrow/buffer.cc @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/buffer.h" + +#include <cstdint> +#include <limits> + +#include "arrow/memory_pool.h" +#include "arrow/status.h" +#include "arrow/util/bit-util.h" +#include "arrow/util/logging.h" + +namespace arrow { + +Buffer::Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size) { + data_ = parent->data() + offset; + size_ = size; + parent_ = parent; + capacity_ = size; +} + +Buffer::~Buffer() {} + +Status Buffer::Copy( + int64_t start, int64_t nbytes, MemoryPool* pool, std::shared_ptr<Buffer>* out) const { + // Sanity checks + DCHECK_LT(start, size_); + DCHECK_LE(nbytes, size_ - start); + + auto new_buffer = std::make_shared<PoolBuffer>(pool); + RETURN_NOT_OK(new_buffer->Resize(nbytes)); + + std::memcpy(new_buffer->mutable_data(), data() + start, nbytes); + + *out = new_buffer; + return Status::OK(); +} + +Status Buffer::Copy(int64_t start, int64_t nbytes, std::shared_ptr<Buffer>* out) const { + return Copy(start, nbytes, default_memory_pool(), out); +} + +std::shared_ptr<Buffer> SliceBuffer( + const std::shared_ptr<Buffer>& buffer, int64_t offset, int64_t length) { + DCHECK_LT(offset, buffer->size()); + DCHECK_LE(length, buffer->size() - offset); + return std::make_shared<Buffer>(buffer, offset, length); +} + +std::shared_ptr<Buffer> MutableBuffer::GetImmutableView() { + return std::make_shared<Buffer>(this->get_shared_ptr(), 0, size()); +} + +PoolBuffer::PoolBuffer(MemoryPool* pool) : ResizableBuffer(nullptr, 0) { + if (pool == nullptr) { pool = default_memory_pool(); } + pool_ = pool; +} + +PoolBuffer::~PoolBuffer() { + if (mutable_data_ != nullptr) { pool_->Free(mutable_data_, capacity_); } +} + +Status PoolBuffer::Reserve(int64_t new_capacity) { + if (!mutable_data_ || new_capacity > capacity_) { + uint8_t* new_data; + new_capacity = BitUtil::RoundUpToMultipleOf64(new_capacity); + if (mutable_data_) { + RETURN_NOT_OK(pool_->Allocate(new_capacity, &new_data)); + memcpy(new_data, mutable_data_, size_); + pool_->Free(mutable_data_, capacity_); + } else { + RETURN_NOT_OK(pool_->Allocate(new_capacity, &new_data)); + } + mutable_data_ = new_data; + data_ = mutable_data_; + capacity_ = new_capacity; + } + return Status::OK(); +} + +Status PoolBuffer::Resize(int64_t new_size) { + RETURN_NOT_OK(Reserve(new_size)); + size_ = new_size; + return Status::OK(); +} + +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/buffer.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h new file mode 100644 index 0000000..27437ca --- /dev/null +++ b/cpp/src/arrow/buffer.h @@ -0,0 +1,232 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_UTIL_BUFFER_H +#define ARROW_UTIL_BUFFER_H + +#include <algorithm> +#include <cstdint> +#include <cstring> +#include <memory> + +#include "arrow/status.h" +#include "arrow/util/macros.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class MemoryPool; +class Status; + +// ---------------------------------------------------------------------- +// Buffer classes + +// Immutable API for a chunk of bytes which may or may not be owned by the +// class instance. Buffers have two related notions of length: size and +// capacity. Size is the number of bytes that might have valid data. +// Capacity is the number of bytes that where allocated for the buffer in +// total. +// The following invariant is always true: Size < Capacity +class ARROW_EXPORT Buffer : public std::enable_shared_from_this<Buffer> { + public: + Buffer(const uint8_t* data, int64_t size) + : is_mutable_(false), data_(data), size_(size), capacity_(size) {} + virtual ~Buffer(); + + // An offset into data that is owned by another buffer, but we want to be + // able to retain a valid pointer to it even after other shared_ptr's to the + // parent buffer have been destroyed + // + // This method makes no assertions about alignment or padding of the buffer but + // in general we expected buffers to be aligned and padded to 64 bytes. In the future + // we might add utility methods to help determine if a buffer satisfies this contract. + Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size); + + std::shared_ptr<Buffer> get_shared_ptr() { return shared_from_this(); } + + bool is_mutable() const { return is_mutable_; } + + // Return true if both buffers are the same size and contain the same bytes + // up to the number of compared bytes + bool Equals(const Buffer& other, int64_t nbytes) const { + return this == &other || + (size_ >= nbytes && other.size_ >= nbytes && + (data_ == other.data_ || !memcmp(data_, other.data_, nbytes))); + } + + bool Equals(const Buffer& other) const { + return this == &other || + (size_ == other.size_ && + (data_ == other.data_ || !memcmp(data_, other.data_, size_))); + } + + // Copy section of buffer into a new Buffer + Status Copy(int64_t start, int64_t nbytes, MemoryPool* pool, + std::shared_ptr<Buffer>* out) const; + + // Default memory pool + Status Copy(int64_t start, int64_t nbytes, std::shared_ptr<Buffer>* out) const; + + int64_t capacity() const { return capacity_; } + const uint8_t* data() const { return data_; } + + int64_t size() const { return size_; } + + std::shared_ptr<Buffer> parent() const { return parent_; } + + protected: + bool is_mutable_; + const uint8_t* data_; + int64_t size_; + int64_t capacity_; + + // nullptr by default, but may be set + std::shared_ptr<Buffer> parent_; + + private: + DISALLOW_COPY_AND_ASSIGN(Buffer); +}; + +// Construct a view on passed buffer at the indicated offset and length. This +// function cannot fail and does not error checking (except in debug builds) +ARROW_EXPORT std::shared_ptr<Buffer> SliceBuffer( + const std::shared_ptr<Buffer>& buffer, int64_t offset, int64_t length); + +// A Buffer whose contents can be mutated. May or may not own its data. +class ARROW_EXPORT MutableBuffer : public Buffer { + public: + MutableBuffer(uint8_t* data, int64_t size) : Buffer(data, size) { + is_mutable_ = true; + mutable_data_ = data; + } + + uint8_t* mutable_data() { return mutable_data_; } + + // Get a read-only view of this buffer + std::shared_ptr<Buffer> GetImmutableView(); + + protected: + MutableBuffer() : Buffer(nullptr, 0), mutable_data_(nullptr) {} + + uint8_t* mutable_data_; +}; + +class ARROW_EXPORT ResizableBuffer : public MutableBuffer { + public: + // Change buffer reported size to indicated size, allocating memory if + // necessary. This will ensure that the capacity of the buffer is a multiple + // of 64 bytes as defined in Layout.md. + virtual Status Resize(int64_t new_size) = 0; + + // Ensure that buffer has enough memory allocated to fit the indicated + // capacity (and meets the 64 byte padding requirement in Layout.md). + // It does not change buffer's reported size. + virtual Status Reserve(int64_t new_capacity) = 0; + + protected: + ResizableBuffer(uint8_t* data, int64_t size) : MutableBuffer(data, size) {} +}; + +// A Buffer whose lifetime is tied to a particular MemoryPool +class ARROW_EXPORT PoolBuffer : public ResizableBuffer { + public: + explicit PoolBuffer(MemoryPool* pool = nullptr); + virtual ~PoolBuffer(); + + Status Resize(int64_t new_size) override; + Status Reserve(int64_t new_capacity) override; + + private: + MemoryPool* pool_; +}; + +class ARROW_EXPORT BufferBuilder { + public: + explicit BufferBuilder(MemoryPool* pool) + : pool_(pool), data_(nullptr), capacity_(0), size_(0) {} + + // Resizes the buffer to the nearest multiple of 64 bytes per Layout.md + Status Resize(int32_t elements) { + if (capacity_ == 0) { buffer_ = std::make_shared<PoolBuffer>(pool_); } + RETURN_NOT_OK(buffer_->Resize(elements)); + capacity_ = buffer_->capacity(); + data_ = buffer_->mutable_data(); + return Status::OK(); + } + + Status Append(const uint8_t* data, int length) { + if (capacity_ < length + size_) { RETURN_NOT_OK(Resize(length + size_)); } + UnsafeAppend(data, length); + return Status::OK(); + } + + template <typename T> + Status Append(T arithmetic_value) { + static_assert(std::is_arithmetic<T>::value, + "Convenience buffer append only supports arithmetic types"); + return Append(reinterpret_cast<uint8_t*>(&arithmetic_value), sizeof(T)); + } + + template <typename T> + Status Append(const T* arithmetic_values, int num_elements) { + static_assert(std::is_arithmetic<T>::value, + "Convenience buffer append only supports arithmetic types"); + return Append( + reinterpret_cast<const uint8_t*>(arithmetic_values), num_elements * sizeof(T)); + } + + // Unsafe methods don't check existing size + void UnsafeAppend(const uint8_t* data, int length) { + memcpy(data_ + size_, data, length); + size_ += length; + } + + template <typename T> + void UnsafeAppend(T arithmetic_value) { + static_assert(std::is_arithmetic<T>::value, + "Convenience buffer append only supports arithmetic types"); + UnsafeAppend(reinterpret_cast<uint8_t*>(&arithmetic_value), sizeof(T)); + } + + template <typename T> + void UnsafeAppend(const T* arithmetic_values, int num_elements) { + static_assert(std::is_arithmetic<T>::value, + "Convenience buffer append only supports arithmetic types"); + UnsafeAppend( + reinterpret_cast<const uint8_t*>(arithmetic_values), num_elements * sizeof(T)); + } + + std::shared_ptr<Buffer> Finish() { + auto result = buffer_; + buffer_ = nullptr; + capacity_ = size_ = 0; + return result; + } + int capacity() { return capacity_; } + int length() { return size_; } + + private: + std::shared_ptr<PoolBuffer> buffer_; + MemoryPool* pool_; + uint8_t* data_; + int64_t capacity_; + int64_t size_; +}; + +} // namespace arrow + +#endif // ARROW_UTIL_BUFFER_H http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/builder.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc index 151b257..493b5e7 100644 --- a/cpp/src/arrow/builder.cc +++ b/cpp/src/arrow/builder.cc @@ -17,11 +17,17 @@ #include "arrow/builder.h" +#include <cstdint> #include <cstring> +#include <limits> +#include "arrow/array.h" +#include "arrow/buffer.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/type_traits.h" #include "arrow/util/bit-util.h" -#include "arrow/util/buffer.h" -#include "arrow/util/status.h" +#include "arrow/util/logging.h" namespace arrow { @@ -123,4 +129,323 @@ void ArrayBuilder::UnsafeSetNotNull(int32_t length) { length_ = new_length; } +template <typename T> +Status PrimitiveBuilder<T>::Init(int32_t capacity) { + RETURN_NOT_OK(ArrayBuilder::Init(capacity)); + data_ = std::make_shared<PoolBuffer>(pool_); + + int64_t nbytes = TypeTraits<T>::bytes_required(capacity); + RETURN_NOT_OK(data_->Resize(nbytes)); + // TODO(emkornfield) valgrind complains without this + memset(data_->mutable_data(), 0, nbytes); + + raw_data_ = reinterpret_cast<value_type*>(data_->mutable_data()); + return Status::OK(); +} + +template <typename T> +Status PrimitiveBuilder<T>::Resize(int32_t capacity) { + // XXX: Set floor size for now + if (capacity < kMinBuilderCapacity) { capacity = kMinBuilderCapacity; } + + if (capacity_ == 0) { + RETURN_NOT_OK(Init(capacity)); + } else { + RETURN_NOT_OK(ArrayBuilder::Resize(capacity)); + const int64_t old_bytes = data_->size(); + const int64_t new_bytes = TypeTraits<T>::bytes_required(capacity); + RETURN_NOT_OK(data_->Resize(new_bytes)); + raw_data_ = reinterpret_cast<value_type*>(data_->mutable_data()); + memset(data_->mutable_data() + old_bytes, 0, new_bytes - old_bytes); + } + return Status::OK(); +} + +template <typename T> +Status PrimitiveBuilder<T>::Append( + const value_type* values, int32_t length, const uint8_t* valid_bytes) { + RETURN_NOT_OK(Reserve(length)); + + if (length > 0) { + memcpy(raw_data_ + length_, values, TypeTraits<T>::bytes_required(length)); + } + + // length_ is update by these + ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length); + + return Status::OK(); +} + +template <typename T> +Status PrimitiveBuilder<T>::Finish(std::shared_ptr<Array>* out) { + const int64_t bytes_required = TypeTraits<T>::bytes_required(length_); + if (bytes_required > 0 && bytes_required < data_->size()) { + // Trim buffers + RETURN_NOT_OK(data_->Resize(bytes_required)); + } + *out = std::make_shared<typename TypeTraits<T>::ArrayType>( + type_, length_, data_, null_count_, null_bitmap_); + + data_ = null_bitmap_ = nullptr; + capacity_ = length_ = null_count_ = 0; + return Status::OK(); +} + +template class PrimitiveBuilder<UInt8Type>; +template class PrimitiveBuilder<UInt16Type>; +template class PrimitiveBuilder<UInt32Type>; +template class PrimitiveBuilder<UInt64Type>; +template class PrimitiveBuilder<Int8Type>; +template class PrimitiveBuilder<Int16Type>; +template class PrimitiveBuilder<Int32Type>; +template class PrimitiveBuilder<Int64Type>; +template class PrimitiveBuilder<TimestampType>; +template class PrimitiveBuilder<HalfFloatType>; +template class PrimitiveBuilder<FloatType>; +template class PrimitiveBuilder<DoubleType>; + +Status BooleanBuilder::Init(int32_t capacity) { + RETURN_NOT_OK(ArrayBuilder::Init(capacity)); + data_ = std::make_shared<PoolBuffer>(pool_); + + int64_t nbytes = BitUtil::BytesForBits(capacity); + RETURN_NOT_OK(data_->Resize(nbytes)); + // TODO(emkornfield) valgrind complains without this + memset(data_->mutable_data(), 0, nbytes); + + raw_data_ = reinterpret_cast<uint8_t*>(data_->mutable_data()); + return Status::OK(); +} + +Status BooleanBuilder::Resize(int32_t capacity) { + // XXX: Set floor size for now + if (capacity < kMinBuilderCapacity) { capacity = kMinBuilderCapacity; } + + if (capacity_ == 0) { + RETURN_NOT_OK(Init(capacity)); + } else { + RETURN_NOT_OK(ArrayBuilder::Resize(capacity)); + const int64_t old_bytes = data_->size(); + const int64_t new_bytes = BitUtil::BytesForBits(capacity); + + RETURN_NOT_OK(data_->Resize(new_bytes)); + raw_data_ = reinterpret_cast<uint8_t*>(data_->mutable_data()); + memset(data_->mutable_data() + old_bytes, 0, new_bytes - old_bytes); + } + return Status::OK(); +} + +Status BooleanBuilder::Finish(std::shared_ptr<Array>* out) { + const int64_t bytes_required = BitUtil::BytesForBits(length_); + + if (bytes_required > 0 && bytes_required < data_->size()) { + // Trim buffers + RETURN_NOT_OK(data_->Resize(bytes_required)); + } + *out = std::make_shared<BooleanArray>(type_, length_, data_, null_count_, null_bitmap_); + + data_ = null_bitmap_ = nullptr; + capacity_ = length_ = null_count_ = 0; + return Status::OK(); +} + +Status BooleanBuilder::Append( + const uint8_t* values, int32_t length, const uint8_t* valid_bytes) { + RETURN_NOT_OK(Reserve(length)); + + for (int i = 0; i < length; ++i) { + // Skip reading from unitialised memory + // TODO: This actually is only to keep valgrind happy but may or may not + // have a performance impact. + if ((valid_bytes != nullptr) && !valid_bytes[i]) continue; + + if (values[i] > 0) { + BitUtil::SetBit(raw_data_, length_ + i); + } else { + BitUtil::ClearBit(raw_data_, length_ + i); + } + } + + // this updates length_ + ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length); + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// ListBuilder + +ListBuilder::ListBuilder( + MemoryPool* pool, std::shared_ptr<ArrayBuilder> value_builder, const TypePtr& type) + : ArrayBuilder( + pool, type ? type : std::static_pointer_cast<DataType>( + std::make_shared<ListType>(value_builder->type()))), + offset_builder_(pool), + value_builder_(value_builder) {} + +ListBuilder::ListBuilder( + MemoryPool* pool, std::shared_ptr<Array> values, const TypePtr& type) + : ArrayBuilder(pool, type ? type : std::static_pointer_cast<DataType>( + std::make_shared<ListType>(values->type()))), + offset_builder_(pool), + values_(values) {} + +Status ListBuilder::Init(int32_t elements) { + DCHECK_LT(elements, std::numeric_limits<int32_t>::max()); + RETURN_NOT_OK(ArrayBuilder::Init(elements)); + // one more then requested for offsets + return offset_builder_.Resize((elements + 1) * sizeof(int32_t)); +} + +Status ListBuilder::Resize(int32_t capacity) { + DCHECK_LT(capacity, std::numeric_limits<int32_t>::max()); + // one more then requested for offsets + RETURN_NOT_OK(offset_builder_.Resize((capacity + 1) * sizeof(int32_t))); + return ArrayBuilder::Resize(capacity); +} + +Status ListBuilder::Finish(std::shared_ptr<Array>* out) { + std::shared_ptr<Array> items = values_; + if (!items) { RETURN_NOT_OK(value_builder_->Finish(&items)); } + + RETURN_NOT_OK(offset_builder_.Append<int32_t>(items->length())); + std::shared_ptr<Buffer> offsets = offset_builder_.Finish(); + + *out = std::make_shared<ListArray>( + type_, length_, offsets, items, null_count_, null_bitmap_); + + Reset(); + + return Status::OK(); +} + +void ListBuilder::Reset() { + capacity_ = length_ = null_count_ = 0; + null_bitmap_ = nullptr; +} + +std::shared_ptr<ArrayBuilder> ListBuilder::value_builder() const { + DCHECK(!values_) << "Using value builder is pointless when values_ is set"; + return value_builder_; +} + +// ---------------------------------------------------------------------- +// String and binary + +// This used to be a static member variable of BinaryBuilder, but it can cause +// valgrind to report a (spurious?) memory leak when needed in other shared +// libraries. The problem came up while adding explicit visibility to libarrow +// and libparquet_arrow +static TypePtr kBinaryValueType = TypePtr(new UInt8Type()); + +BinaryBuilder::BinaryBuilder(MemoryPool* pool, const TypePtr& type) + : ListBuilder(pool, std::make_shared<UInt8Builder>(pool, kBinaryValueType), type) { + byte_builder_ = static_cast<UInt8Builder*>(value_builder_.get()); +} + +Status BinaryBuilder::Finish(std::shared_ptr<Array>* out) { + std::shared_ptr<Array> result; + RETURN_NOT_OK(ListBuilder::Finish(&result)); + + const auto list = std::dynamic_pointer_cast<ListArray>(result); + auto values = std::dynamic_pointer_cast<UInt8Array>(list->values()); + + *out = std::make_shared<BinaryArray>(list->length(), list->offsets(), values->data(), + list->null_count(), list->null_bitmap()); + return Status::OK(); +} + +Status StringBuilder::Finish(std::shared_ptr<Array>* out) { + std::shared_ptr<Array> result; + RETURN_NOT_OK(ListBuilder::Finish(&result)); + + const auto list = std::dynamic_pointer_cast<ListArray>(result); + auto values = std::dynamic_pointer_cast<UInt8Array>(list->values()); + + *out = std::make_shared<StringArray>(list->length(), list->offsets(), values->data(), + list->null_count(), list->null_bitmap()); + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// Struct + +Status StructBuilder::Finish(std::shared_ptr<Array>* out) { + std::vector<std::shared_ptr<Array>> fields(field_builders_.size()); + for (size_t i = 0; i < field_builders_.size(); ++i) { + RETURN_NOT_OK(field_builders_[i]->Finish(&fields[i])); + } + + *out = std::make_shared<StructArray>(type_, length_, fields, null_count_, null_bitmap_); + + null_bitmap_ = nullptr; + capacity_ = length_ = null_count_ = 0; + + return Status::OK(); +} + +std::shared_ptr<ArrayBuilder> StructBuilder::field_builder(int pos) const { + DCHECK_GT(field_builders_.size(), 0); + return field_builders_[pos]; +} + +// ---------------------------------------------------------------------- +// Helper functions + +#define BUILDER_CASE(ENUM, BuilderType) \ + case Type::ENUM: \ + out->reset(new BuilderType(pool, type)); \ + return Status::OK(); + +// Initially looked at doing this with vtables, but shared pointers makes it +// difficult +// +// TODO(wesm): come up with a less monolithic strategy +Status MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type, + std::shared_ptr<ArrayBuilder>* out) { + switch (type->type) { + BUILDER_CASE(UINT8, UInt8Builder); + BUILDER_CASE(INT8, Int8Builder); + BUILDER_CASE(UINT16, UInt16Builder); + BUILDER_CASE(INT16, Int16Builder); + BUILDER_CASE(UINT32, UInt32Builder); + BUILDER_CASE(INT32, Int32Builder); + BUILDER_CASE(UINT64, UInt64Builder); + BUILDER_CASE(INT64, Int64Builder); + BUILDER_CASE(TIMESTAMP, TimestampBuilder); + + BUILDER_CASE(BOOL, BooleanBuilder); + + BUILDER_CASE(FLOAT, FloatBuilder); + BUILDER_CASE(DOUBLE, DoubleBuilder); + + BUILDER_CASE(STRING, StringBuilder); + BUILDER_CASE(BINARY, BinaryBuilder); + + case Type::LIST: { + std::shared_ptr<ArrayBuilder> value_builder; + std::shared_ptr<DataType> value_type = + static_cast<ListType*>(type.get())->value_type(); + RETURN_NOT_OK(MakeBuilder(pool, value_type, &value_builder)); + out->reset(new ListBuilder(pool, value_builder)); + return Status::OK(); + } + + case Type::STRUCT: { + std::vector<FieldPtr>& fields = type->children_; + std::vector<std::shared_ptr<ArrayBuilder>> values_builder; + + for (auto it : fields) { + std::shared_ptr<ArrayBuilder> builder; + RETURN_NOT_OK(MakeBuilder(pool, it->type, &builder)); + values_builder.push_back(builder); + } + out->reset(new StructBuilder(pool, type, values_builder)); + return Status::OK(); + } + + default: + return Status::NotImplemented(type->ToString()); + } +} + } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/builder.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h index 73e49c0..7162d31 100644 --- a/cpp/src/arrow/builder.h +++ b/cpp/src/arrow/builder.h @@ -20,18 +20,20 @@ #include <cstdint> #include <memory> +#include <string> #include <vector> +#include "arrow/buffer.h" +#include "arrow/status.h" #include "arrow/type.h" +#include "arrow/util/bit-util.h" #include "arrow/util/macros.h" -#include "arrow/util/status.h" #include "arrow/util/visibility.h" namespace arrow { class Array; class MemoryPool; -class PoolBuffer; static constexpr int32_t kMinBuilderCapacity = 1 << 5; @@ -130,6 +132,315 @@ class ARROW_EXPORT ArrayBuilder { DISALLOW_COPY_AND_ASSIGN(ArrayBuilder); }; +template <typename Type> +class ARROW_EXPORT PrimitiveBuilder : public ArrayBuilder { + public: + using value_type = typename Type::c_type; + + explicit PrimitiveBuilder(MemoryPool* pool, const TypePtr& type) + : ArrayBuilder(pool, type), data_(nullptr) {} + + virtual ~PrimitiveBuilder() {} + + using ArrayBuilder::Advance; + + // Write nulls as uint8_t* (0 value indicates null) into pre-allocated memory + Status AppendNulls(const uint8_t* valid_bytes, int32_t length) { + RETURN_NOT_OK(Reserve(length)); + UnsafeAppendToBitmap(valid_bytes, length); + return Status::OK(); + } + + Status AppendNull() { + RETURN_NOT_OK(Reserve(1)); + UnsafeAppendToBitmap(false); + return Status::OK(); + } + + std::shared_ptr<Buffer> data() const { return data_; } + + // Vector append + // + // If passed, valid_bytes is of equal length to values, and any zero byte + // will be considered as a null for that slot + Status Append( + const value_type* values, int32_t length, const uint8_t* valid_bytes = nullptr); + + Status Finish(std::shared_ptr<Array>* out) override; + Status Init(int32_t capacity) override; + + // Increase the capacity of the builder to accommodate at least the indicated + // number of elements + Status Resize(int32_t capacity) override; + + protected: + std::shared_ptr<PoolBuffer> data_; + value_type* raw_data_; +}; + +template <typename T> +class ARROW_EXPORT NumericBuilder : public PrimitiveBuilder<T> { + public: + using typename PrimitiveBuilder<T>::value_type; + using PrimitiveBuilder<T>::PrimitiveBuilder; + + using PrimitiveBuilder<T>::Append; + using PrimitiveBuilder<T>::Init; + using PrimitiveBuilder<T>::Resize; + using PrimitiveBuilder<T>::Reserve; + + // Scalar append. + Status Append(value_type val) { + RETURN_NOT_OK(ArrayBuilder::Reserve(1)); + UnsafeAppend(val); + return Status::OK(); + } + + // Does not capacity-check; make sure to call Reserve beforehand + void UnsafeAppend(value_type val) { + BitUtil::SetBit(null_bitmap_data_, length_); + raw_data_[length_++] = val; + } + + protected: + using PrimitiveBuilder<T>::length_; + using PrimitiveBuilder<T>::null_bitmap_data_; + using PrimitiveBuilder<T>::raw_data_; +}; + +// Builders + +using UInt8Builder = NumericBuilder<UInt8Type>; +using UInt16Builder = NumericBuilder<UInt16Type>; +using UInt32Builder = NumericBuilder<UInt32Type>; +using UInt64Builder = NumericBuilder<UInt64Type>; + +using Int8Builder = NumericBuilder<Int8Type>; +using Int16Builder = NumericBuilder<Int16Type>; +using Int32Builder = NumericBuilder<Int32Type>; +using Int64Builder = NumericBuilder<Int64Type>; +using TimestampBuilder = NumericBuilder<TimestampType>; + +using HalfFloatBuilder = NumericBuilder<HalfFloatType>; +using FloatBuilder = NumericBuilder<FloatType>; +using DoubleBuilder = NumericBuilder<DoubleType>; + +class ARROW_EXPORT BooleanBuilder : public ArrayBuilder { + public: + explicit BooleanBuilder(MemoryPool* pool, const TypePtr& type) + : ArrayBuilder(pool, type), data_(nullptr) {} + + virtual ~BooleanBuilder() {} + + using ArrayBuilder::Advance; + + // Write nulls as uint8_t* (0 value indicates null) into pre-allocated memory + Status AppendNulls(const uint8_t* valid_bytes, int32_t length) { + RETURN_NOT_OK(Reserve(length)); + UnsafeAppendToBitmap(valid_bytes, length); + return Status::OK(); + } + + Status AppendNull() { + RETURN_NOT_OK(Reserve(1)); + UnsafeAppendToBitmap(false); + return Status::OK(); + } + + std::shared_ptr<Buffer> data() const { return data_; } + + // Scalar append + Status Append(bool val) { + Reserve(1); + BitUtil::SetBit(null_bitmap_data_, length_); + if (val) { + BitUtil::SetBit(raw_data_, length_); + } else { + BitUtil::ClearBit(raw_data_, length_); + } + ++length_; + return Status::OK(); + } + + // Vector append + // + // If passed, valid_bytes is of equal length to values, and any zero byte + // will be considered as a null for that slot + Status Append( + const uint8_t* values, int32_t length, const uint8_t* valid_bytes = nullptr); + + Status Finish(std::shared_ptr<Array>* out) override; + Status Init(int32_t capacity) override; + + // Increase the capacity of the builder to accommodate at least the indicated + // number of elements + Status Resize(int32_t capacity) override; + + protected: + std::shared_ptr<PoolBuffer> data_; + uint8_t* raw_data_; +}; + +// ---------------------------------------------------------------------- +// List builder + +// Builder class for variable-length list array value types +// +// To use this class, you must append values to the child array builder and use +// the Append function to delimit each distinct list value (once the values +// have been appended to the child array) or use the bulk API to append +// a sequence of offests and null values. +// +// A note on types. Per arrow/type.h all types in the c++ implementation are +// logical so even though this class always builds list array, this can +// represent multiple different logical types. If no logical type is provided +// at construction time, the class defaults to List<T> where t is taken from the +// value_builder/values that the object is constructed with. +class ARROW_EXPORT ListBuilder : public ArrayBuilder { + public: + // Use this constructor to incrementally build the value array along with offsets and + // null bitmap. + ListBuilder(MemoryPool* pool, std::shared_ptr<ArrayBuilder> value_builder, + const TypePtr& type = nullptr); + + // Use this constructor to build the list with a pre-existing values array + ListBuilder( + MemoryPool* pool, std::shared_ptr<Array> values, const TypePtr& type = nullptr); + + virtual ~ListBuilder() {} + + Status Init(int32_t elements) override; + Status Resize(int32_t capacity) override; + Status Finish(std::shared_ptr<Array>* out) override; + + // Vector append + // + // If passed, valid_bytes is of equal length to values, and any zero byte + // will be considered as a null for that slot + Status Append( + const int32_t* offsets, int32_t length, const uint8_t* valid_bytes = nullptr) { + RETURN_NOT_OK(Reserve(length)); + UnsafeAppendToBitmap(valid_bytes, length); + offset_builder_.UnsafeAppend<int32_t>(offsets, length); + return Status::OK(); + } + + // Start a new variable-length list slot + // + // This function should be called before beginning to append elements to the + // value builder + Status Append(bool is_valid = true) { + RETURN_NOT_OK(Reserve(1)); + UnsafeAppendToBitmap(is_valid); + RETURN_NOT_OK(offset_builder_.Append<int32_t>(value_builder_->length())); + return Status::OK(); + } + + Status AppendNull() { return Append(false); } + + std::shared_ptr<ArrayBuilder> value_builder() const; + + protected: + BufferBuilder offset_builder_; + std::shared_ptr<ArrayBuilder> value_builder_; + std::shared_ptr<Array> values_; + + void Reset(); +}; + +// ---------------------------------------------------------------------- +// Binary and String + +// BinaryBuilder : public ListBuilder +class ARROW_EXPORT BinaryBuilder : public ListBuilder { + public: + explicit BinaryBuilder(MemoryPool* pool, const TypePtr& type); + virtual ~BinaryBuilder() {} + + Status Append(const uint8_t* value, int32_t length) { + RETURN_NOT_OK(ListBuilder::Append()); + return byte_builder_->Append(value, length); + } + + Status Append(const char* value, int32_t length) { + return Append(reinterpret_cast<const uint8_t*>(value), length); + } + + Status Append(const std::string& value) { return Append(value.c_str(), value.size()); } + + Status Finish(std::shared_ptr<Array>* out) override; + + protected: + UInt8Builder* byte_builder_; +}; + +// String builder +class ARROW_EXPORT StringBuilder : public BinaryBuilder { + public: + explicit StringBuilder(MemoryPool* pool, const TypePtr& type) + : BinaryBuilder(pool, type) {} + + using BinaryBuilder::Append; + + Status Finish(std::shared_ptr<Array>* out) override; + + Status Append(const std::vector<std::string>& values, uint8_t* null_bytes); +}; + +// ---------------------------------------------------------------------- +// Struct + +// --------------------------------------------------------------------------------- +// StructArray builder +// Append, Resize and Reserve methods are acting on StructBuilder. +// Please make sure all these methods of all child-builders' are consistently +// called to maintain data-structure consistency. +class ARROW_EXPORT StructBuilder : public ArrayBuilder { + public: + StructBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type, + const std::vector<std::shared_ptr<ArrayBuilder>>& field_builders) + : ArrayBuilder(pool, type) { + field_builders_ = field_builders; + } + + Status Finish(std::shared_ptr<Array>* out) override; + + // Null bitmap is of equal length to every child field, and any zero byte + // will be considered as a null for that field, but users must using app- + // end methods or advance methods of the child builders' independently to + // insert data. + Status Append(int32_t length, const uint8_t* valid_bytes) { + RETURN_NOT_OK(Reserve(length)); + UnsafeAppendToBitmap(valid_bytes, length); + return Status::OK(); + } + + // Append an element to the Struct. All child-builders' Append method must + // be called independently to maintain data-structure consistency. + Status Append(bool is_valid = true) { + RETURN_NOT_OK(Reserve(1)); + UnsafeAppendToBitmap(is_valid); + return Status::OK(); + } + + Status AppendNull() { return Append(false); } + + std::shared_ptr<ArrayBuilder> field_builder(int pos) const; + + const std::vector<std::shared_ptr<ArrayBuilder>>& field_builders() const { + return field_builders_; + } + + protected: + std::vector<std::shared_ptr<ArrayBuilder>> field_builders_; +}; + +// ---------------------------------------------------------------------- +// Helper functions + +Status ARROW_EXPORT MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type, + std::shared_ptr<ArrayBuilder>* out); + } // namespace arrow #endif // ARROW_BUILDER_H_ http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/column-benchmark.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/column-benchmark.cc b/cpp/src/arrow/column-benchmark.cc index f429a81..650ec90 100644 --- a/cpp/src/arrow/column-benchmark.cc +++ b/cpp/src/arrow/column-benchmark.cc @@ -17,9 +17,9 @@ #include "benchmark/benchmark.h" +#include "arrow/array.h" +#include "arrow/memory_pool.h" #include "arrow/test-util.h" -#include "arrow/types/primitive.h" -#include "arrow/util/memory-pool.h" namespace arrow { namespace { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/column-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/column-test.cc b/cpp/src/arrow/column-test.cc index ac3636d..9005245 100644 --- a/cpp/src/arrow/column-test.cc +++ b/cpp/src/arrow/column-test.cc @@ -27,7 +27,6 @@ #include "arrow/schema.h" #include "arrow/test-util.h" #include "arrow/type.h" -#include "arrow/types/primitive.h" using std::shared_ptr; using std::vector; http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/column.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/column.cc b/cpp/src/arrow/column.cc index eca5f4d..1d136e7 100644 --- a/cpp/src/arrow/column.cc +++ b/cpp/src/arrow/column.cc @@ -21,8 +21,8 @@ #include <sstream> #include "arrow/array.h" +#include "arrow/status.h" #include "arrow/type.h" -#include "arrow/util/status.h" namespace arrow { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/io/file.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc index 05fa666..c50a9bb 100644 --- a/cpp/src/arrow/io/file.cc +++ b/cpp/src/arrow/io/file.cc @@ -107,9 +107,9 @@ #include "arrow/io/interfaces.h" -#include "arrow/util/buffer.h" -#include "arrow/util/memory-pool.h" -#include "arrow/util/status.h" +#include "arrow/buffer.h" +#include "arrow/memory_pool.h" +#include "arrow/status.h" namespace arrow { namespace io { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/io/hdfs.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index 8c6d49f..b8e2120 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -22,10 +22,10 @@ #include <sstream> #include <string> +#include "arrow/buffer.h" #include "arrow/io/hdfs.h" -#include "arrow/util/buffer.h" -#include "arrow/util/memory-pool.h" -#include "arrow/util/status.h" +#include "arrow/memory_pool.h" +#include "arrow/status.h" namespace arrow { namespace io { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/io/interfaces.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc index 44986ce..68c1ac3 100644 --- a/cpp/src/arrow/io/interfaces.cc +++ b/cpp/src/arrow/io/interfaces.cc @@ -20,8 +20,8 @@ #include <cstdint> #include <memory> -#include "arrow/util/buffer.h" -#include "arrow/util/status.h" +#include "arrow/buffer.h" +#include "arrow/status.h" namespace arrow { namespace io { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/io/io-file-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc index fad49ce..f0ea7ec 100644 --- a/cpp/src/arrow/io/io-file-test.cc +++ b/cpp/src/arrow/io/io-file-test.cc @@ -30,7 +30,7 @@ #include "arrow/io/file.h" #include "arrow/io/test-common.h" -#include "arrow/util/memory-pool.h" +#include "arrow/memory_pool.h" namespace arrow { namespace io { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/io/io-hdfs-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/io-hdfs-test.cc b/cpp/src/arrow/io/io-hdfs-test.cc index 8338de6..e07eaa3 100644 --- a/cpp/src/arrow/io/io-hdfs-test.cc +++ b/cpp/src/arrow/io/io-hdfs-test.cc @@ -25,8 +25,8 @@ #include <boost/filesystem.hpp> // NOLINT #include "arrow/io/hdfs.h" +#include "arrow/status.h" #include "arrow/test-util.h" -#include "arrow/util/status.h" namespace arrow { namespace io { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/io/libhdfs_shim.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/libhdfs_shim.cc b/cpp/src/arrow/io/libhdfs_shim.cc index 36b8a4e..3715376 100644 --- a/cpp/src/arrow/io/libhdfs_shim.cc +++ b/cpp/src/arrow/io/libhdfs_shim.cc @@ -53,7 +53,7 @@ extern "C" { #include <boost/filesystem.hpp> // NOLINT -#include "arrow/util/status.h" +#include "arrow/status.h" #include "arrow/util/visibility.h" namespace fs = boost::filesystem; http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/io/memory.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index af495e2..b5cf4b7 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -38,10 +38,9 @@ #include <sstream> #include <string> +#include "arrow/buffer.h" #include "arrow/io/interfaces.h" - -#include "arrow/util/buffer.h" -#include "arrow/util/status.h" +#include "arrow/status.h" namespace arrow { namespace io { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/io/test-common.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/test-common.h b/cpp/src/arrow/io/test-common.h index f8fed88..1468083 100644 --- a/cpp/src/arrow/io/test-common.h +++ b/cpp/src/arrow/io/test-common.h @@ -32,10 +32,10 @@ // nothing #endif +#include "arrow/buffer.h" #include "arrow/io/memory.h" +#include "arrow/memory_pool.h" #include "arrow/test-util.h" -#include "arrow/util/buffer.h" -#include "arrow/util/memory-pool.h" namespace arrow { namespace io { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/ipc/adapter.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc index edf716f..f813c1d 100644 --- a/cpp/src/arrow/ipc/adapter.cc +++ b/cpp/src/arrow/ipc/adapter.cc @@ -23,6 +23,7 @@ #include <vector> #include "arrow/array.h" +#include "arrow/buffer.h" #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" #include "arrow/ipc/Message_generated.h" @@ -30,17 +31,11 @@ #include "arrow/ipc/metadata.h" #include "arrow/ipc/util.h" #include "arrow/schema.h" +#include "arrow/status.h" #include "arrow/table.h" #include "arrow/type.h" -#include "arrow/types/construct.h" -#include "arrow/types/list.h" -#include "arrow/types/primitive.h" -#include "arrow/types/string.h" -#include "arrow/types/struct.h" #include "arrow/util/bit-util.h" -#include "arrow/util/buffer.h" #include "arrow/util/logging.h" -#include "arrow/util/status.h" namespace arrow { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/ipc/file.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/file.cc b/cpp/src/arrow/ipc/file.cc index fa50058..d7d2e61 100644 --- a/cpp/src/arrow/ipc/file.cc +++ b/cpp/src/arrow/ipc/file.cc @@ -22,14 +22,14 @@ #include <sstream> #include <vector> +#include "arrow/buffer.h" #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" #include "arrow/ipc/adapter.h" #include "arrow/ipc/metadata.h" #include "arrow/ipc/util.h" -#include "arrow/util/buffer.h" +#include "arrow/status.h" #include "arrow/util/logging.h" -#include "arrow/util/status.h" namespace arrow { namespace ipc { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/ipc/ipc-adapter-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc index 1accfde..f309b85 100644 --- a/cpp/src/arrow/ipc/ipc-adapter-test.cc +++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc @@ -30,15 +30,11 @@ #include "arrow/ipc/test-common.h" #include "arrow/ipc/util.h" +#include "arrow/buffer.h" +#include "arrow/memory_pool.h" +#include "arrow/status.h" #include "arrow/test-util.h" -#include "arrow/types/list.h" -#include "arrow/types/primitive.h" -#include "arrow/types/string.h" -#include "arrow/types/struct.h" #include "arrow/util/bit-util.h" -#include "arrow/util/buffer.h" -#include "arrow/util/memory-pool.h" -#include "arrow/util/status.h" namespace arrow { namespace ipc { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/ipc/ipc-file-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc b/cpp/src/arrow/ipc/ipc-file-test.cc index a1feac4..0a9f677 100644 --- a/cpp/src/arrow/ipc/ipc-file-test.cc +++ b/cpp/src/arrow/ipc/ipc-file-test.cc @@ -24,6 +24,7 @@ #include "gtest/gtest.h" +#include "arrow/array.h" #include "arrow/io/memory.h" #include "arrow/io/test-common.h" #include "arrow/ipc/adapter.h" @@ -31,15 +32,11 @@ #include "arrow/ipc/test-common.h" #include "arrow/ipc/util.h" +#include "arrow/buffer.h" +#include "arrow/memory_pool.h" +#include "arrow/status.h" #include "arrow/test-util.h" -#include "arrow/types/list.h" -#include "arrow/types/primitive.h" -#include "arrow/types/string.h" -#include "arrow/types/struct.h" #include "arrow/util/bit-util.h" -#include "arrow/util/buffer.h" -#include "arrow/util/memory-pool.h" -#include "arrow/util/status.h" namespace arrow { namespace ipc { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/ipc/ipc-json-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc index ba4d9ca..f793a26 100644 --- a/cpp/src/arrow/ipc/ipc-json-test.cc +++ b/cpp/src/arrow/ipc/ipc-json-test.cc @@ -26,17 +26,15 @@ #include "gtest/gtest.h" #include "arrow/array.h" +#include "arrow/builder.h" #include "arrow/ipc/json-internal.h" #include "arrow/ipc/json.h" +#include "arrow/memory_pool.h" +#include "arrow/status.h" #include "arrow/table.h" #include "arrow/test-util.h" #include "arrow/type.h" #include "arrow/type_traits.h" -#include "arrow/types/primitive.h" -#include "arrow/types/string.h" -#include "arrow/types/struct.h" -#include "arrow/util/memory-pool.h" -#include "arrow/util/status.h" namespace arrow { namespace ipc { @@ -147,7 +145,7 @@ TEST(TestJsonArrayWriter, NestedTypes) { std::vector<int32_t> values = {0, 1, 2, 3, 4, 5, 6}; std::shared_ptr<Array> values_array; - MakeArray<Int32Type, int32_t>(int32(), values_is_valid, values, &values_array); + ArrayFromVector<Int32Type, int32_t>(int32(), values_is_valid, values, &values_array); // List std::vector<bool> list_is_valid = {true, false, true, true, true}; @@ -188,10 +186,10 @@ void MakeBatchArrays(const std::shared_ptr<Schema>& schema, const int num_rows, test::randint<int32_t>(num_rows, 0, 100, &v2_values); std::shared_ptr<Array> v1; - MakeArray<Int8Type, int8_t>(schema->field(0)->type, is_valid, v1_values, &v1); + ArrayFromVector<Int8Type, int8_t>(schema->field(0)->type, is_valid, v1_values, &v1); std::shared_ptr<Array> v2; - MakeArray<Int32Type, int32_t>(schema->field(1)->type, is_valid, v2_values, &v2); + ArrayFromVector<Int32Type, int32_t>(schema->field(1)->type, is_valid, v2_values, &v2); static const int kBufferSize = 10; static uint8_t buffer[kBufferSize]; @@ -323,13 +321,13 @@ TEST(TestJsonFileReadWrite, MinimalFormatExample) { std::vector<bool> foo_valid = {true, false, true, true, true}; std::vector<int32_t> foo_values = {1, 2, 3, 4, 5}; std::shared_ptr<Array> foo; - MakeArray<Int32Type, int32_t>(int32(), foo_valid, foo_values, &foo); + ArrayFromVector<Int32Type, int32_t>(int32(), foo_valid, foo_values, &foo); ASSERT_TRUE(batch->column(0)->Equals(foo)); std::vector<bool> bar_valid = {true, false, false, true, true}; std::vector<double> bar_values = {1, 2, 3, 4, 5}; std::shared_ptr<Array> bar; - MakeArray<DoubleType, double>(float64(), bar_valid, bar_values, &bar); + ArrayFromVector<DoubleType, double>(float64(), bar_valid, bar_values, &bar); ASSERT_TRUE(batch->column(1)->Equals(bar)); } http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/ipc/ipc-metadata-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-metadata-test.cc b/cpp/src/arrow/ipc/ipc-metadata-test.cc index de08e6d..7c5744a 100644 --- a/cpp/src/arrow/ipc/ipc-metadata-test.cc +++ b/cpp/src/arrow/ipc/ipc-metadata-test.cc @@ -24,9 +24,9 @@ #include "arrow/io/memory.h" #include "arrow/ipc/metadata.h" #include "arrow/schema.h" +#include "arrow/status.h" #include "arrow/test-util.h" #include "arrow/type.h" -#include "arrow/util/status.h" namespace arrow { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/ipc/json-integration-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc index 291a719..5e59356 100644 --- a/cpp/src/arrow/ipc/json-integration-test.cc +++ b/cpp/src/arrow/ipc/json-integration-test.cc @@ -33,9 +33,9 @@ #include "arrow/ipc/json.h" #include "arrow/pretty_print.h" #include "arrow/schema.h" +#include "arrow/status.h" #include "arrow/table.h" #include "arrow/test-util.h" -#include "arrow/util/status.h" DEFINE_string(arrow, "", "Arrow file name"); DEFINE_string(json, "", "JSON file name"); http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/ipc/json-internal.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc index ff9f598..db11b7d 100644 --- a/cpp/src/arrow/ipc/json-internal.cc +++ b/cpp/src/arrow/ipc/json-internal.cc @@ -28,16 +28,14 @@ #include "rapidjson/writer.h" #include "arrow/array.h" +#include "arrow/builder.h" +#include "arrow/memory_pool.h" #include "arrow/schema.h" +#include "arrow/status.h" #include "arrow/type.h" #include "arrow/type_traits.h" -#include "arrow/types/list.h" -#include "arrow/types/primitive.h" -#include "arrow/types/string.h" -#include "arrow/types/struct.h" #include "arrow/util/bit-util.h" -#include "arrow/util/memory-pool.h" -#include "arrow/util/status.h" +#include "arrow/util/logging.h" namespace arrow { namespace ipc { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/ipc/json.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json.cc b/cpp/src/arrow/ipc/json.cc index 2281611..6e3a993 100644 --- a/cpp/src/arrow/ipc/json.cc +++ b/cpp/src/arrow/ipc/json.cc @@ -23,14 +23,14 @@ #include <vector> #include "arrow/array.h" +#include "arrow/buffer.h" #include "arrow/ipc/json-internal.h" +#include "arrow/memory_pool.h" #include "arrow/schema.h" +#include "arrow/status.h" #include "arrow/table.h" #include "arrow/type.h" -#include "arrow/util/buffer.h" #include "arrow/util/logging.h" -#include "arrow/util/memory-pool.h" -#include "arrow/util/status.h" namespace arrow { namespace ipc { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/ipc/metadata-internal.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc index 5a27589..16069a8 100644 --- a/cpp/src/arrow/ipc/metadata-internal.cc +++ b/cpp/src/arrow/ipc/metadata-internal.cc @@ -25,11 +25,11 @@ #include "flatbuffers/flatbuffers.h" +#include "arrow/buffer.h" #include "arrow/ipc/Message_generated.h" #include "arrow/schema.h" +#include "arrow/status.h" #include "arrow/type.h" -#include "arrow/util/buffer.h" -#include "arrow/util/status.h" namespace arrow { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/ipc/metadata.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc index 44d3939..f0674ff 100644 --- a/cpp/src/arrow/ipc/metadata.cc +++ b/cpp/src/arrow/ipc/metadata.cc @@ -28,9 +28,9 @@ #include "arrow/ipc/Message_generated.h" #include "arrow/ipc/metadata-internal.h" +#include "arrow/buffer.h" #include "arrow/schema.h" -#include "arrow/util/buffer.h" -#include "arrow/util/status.h" +#include "arrow/status.h" namespace arrow { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/ipc/test-common.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index 65b3782..8416f0d 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -25,16 +25,13 @@ #include <vector> #include "arrow/array.h" +#include "arrow/buffer.h" +#include "arrow/builder.h" +#include "arrow/memory_pool.h" #include "arrow/table.h" #include "arrow/test-util.h" #include "arrow/type.h" -#include "arrow/types/list.h" -#include "arrow/types/primitive.h" -#include "arrow/types/string.h" -#include "arrow/types/struct.h" #include "arrow/util/bit-util.h" -#include "arrow/util/buffer.h" -#include "arrow/util/memory-pool.h" namespace arrow { namespace ipc { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/ipc/util.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/util.h b/cpp/src/arrow/ipc/util.h index 242d662..2000c61 100644 --- a/cpp/src/arrow/ipc/util.h +++ b/cpp/src/arrow/ipc/util.h @@ -22,7 +22,7 @@ #include "arrow/array.h" #include "arrow/io/interfaces.h" -#include "arrow/util/status.h" +#include "arrow/status.h" namespace arrow { namespace ipc { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/memory_pool-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/memory_pool-test.cc b/cpp/src/arrow/memory_pool-test.cc new file mode 100644 index 0000000..d6f323d --- /dev/null +++ b/cpp/src/arrow/memory_pool-test.cc @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <limits> + +#include "gtest/gtest.h" + +#include "arrow/memory_pool.h" +#include "arrow/status.h" +#include "arrow/test-util.h" + +namespace arrow { + +TEST(DefaultMemoryPool, MemoryTracking) { + MemoryPool* pool = default_memory_pool(); + + uint8_t* data; + ASSERT_OK(pool->Allocate(100, &data)); + EXPECT_EQ(static_cast<uint64_t>(0), reinterpret_cast<uint64_t>(data) % 64); + ASSERT_EQ(100, pool->bytes_allocated()); + + pool->Free(data, 100); + ASSERT_EQ(0, pool->bytes_allocated()); +} + +TEST(DefaultMemoryPool, OOM) { + MemoryPool* pool = default_memory_pool(); + + uint8_t* data; + int64_t to_alloc = std::numeric_limits<int64_t>::max(); + ASSERT_RAISES(OutOfMemory, pool->Allocate(to_alloc, &data)); +} + +// Death tests and valgrind are known to not play well 100% of the time. See +// googletest documentation +#ifndef ARROW_VALGRIND + +TEST(DefaultMemoryPoolDeathTest, FreeLargeMemory) { + MemoryPool* pool = default_memory_pool(); + + uint8_t* data; + ASSERT_OK(pool->Allocate(100, &data)); + +#ifndef NDEBUG + EXPECT_EXIT(pool->Free(data, 120), ::testing::ExitedWithCode(1), + ".*Check failed: \\(bytes_allocated_\\) >= \\(size\\)"); +#endif + + pool->Free(data, 100); +} + +#endif // ARROW_VALGRIND + +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/memory_pool.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc new file mode 100644 index 0000000..f55b1ac --- /dev/null +++ b/cpp/src/arrow/memory_pool.cc @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/memory_pool.h" + +#include <cstdlib> +#include <mutex> +#include <sstream> +#include <stdlib.h> + +#include "arrow/status.h" +#include "arrow/util/logging.h" + +namespace arrow { + +namespace { +// Allocate memory according to the alignment requirements for Arrow +// (as of May 2016 64 bytes) +Status AllocateAligned(int64_t size, uint8_t** out) { + // TODO(emkornfield) find something compatible with windows + constexpr size_t kAlignment = 64; +#ifdef _MSC_VER + // Special code path for MSVC + *out = reinterpret_cast<uint8_t*>(_aligned_malloc(size, kAlignment)); + if (!*out) { + std::stringstream ss; + ss << "malloc of size " << size << " failed"; + return Status::OutOfMemory(ss.str()); + } +#else + const int result = posix_memalign(reinterpret_cast<void**>(out), kAlignment, size); + if (result == ENOMEM) { + std::stringstream ss; + ss << "malloc of size " << size << " failed"; + return Status::OutOfMemory(ss.str()); + } + + if (result == EINVAL) { + std::stringstream ss; + ss << "invalid alignment parameter: " << kAlignment; + return Status::Invalid(ss.str()); + } +#endif + return Status::OK(); +} +} // namespace + +MemoryPool::~MemoryPool() {} + +class InternalMemoryPool : public MemoryPool { + public: + InternalMemoryPool() : bytes_allocated_(0) {} + virtual ~InternalMemoryPool(); + + Status Allocate(int64_t size, uint8_t** out) override; + + void Free(uint8_t* buffer, int64_t size) override; + + int64_t bytes_allocated() const override; + + private: + mutable std::mutex pool_lock_; + int64_t bytes_allocated_; +}; + +Status InternalMemoryPool::Allocate(int64_t size, uint8_t** out) { + std::lock_guard<std::mutex> guard(pool_lock_); + RETURN_NOT_OK(AllocateAligned(size, out)); + bytes_allocated_ += size; + + return Status::OK(); +} + +int64_t InternalMemoryPool::bytes_allocated() const { + std::lock_guard<std::mutex> guard(pool_lock_); + return bytes_allocated_; +} + +void InternalMemoryPool::Free(uint8_t* buffer, int64_t size) { + std::lock_guard<std::mutex> guard(pool_lock_); + DCHECK_GE(bytes_allocated_, size); +#ifdef _MSC_VER + _aligned_free(buffer); +#else + std::free(buffer); +#endif + bytes_allocated_ -= size; +} + +InternalMemoryPool::~InternalMemoryPool() {} + +MemoryPool* default_memory_pool() { + static InternalMemoryPool default_memory_pool_; + return &default_memory_pool_; +} + +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/memory_pool.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h new file mode 100644 index 0000000..4c1d699 --- /dev/null +++ b/cpp/src/arrow/memory_pool.h @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_UTIL_MEMORY_POOL_H +#define ARROW_UTIL_MEMORY_POOL_H + +#include <cstdint> + +#include "arrow/util/visibility.h" + +namespace arrow { + +class Status; + +class ARROW_EXPORT MemoryPool { + public: + virtual ~MemoryPool(); + + virtual Status Allocate(int64_t size, uint8_t** out) = 0; + virtual void Free(uint8_t* buffer, int64_t size) = 0; + + virtual int64_t bytes_allocated() const = 0; +}; + +ARROW_EXPORT MemoryPool* default_memory_pool(); + +} // namespace arrow + +#endif // ARROW_UTIL_MEMORY_POOL_H http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/pretty_print-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/pretty_print-test.cc b/cpp/src/arrow/pretty_print-test.cc index b1e6a11..c22d3aa 100644 --- a/cpp/src/arrow/pretty_print-test.cc +++ b/cpp/src/arrow/pretty_print-test.cc @@ -26,14 +26,11 @@ #include "gtest/gtest.h" #include "arrow/array.h" +#include "arrow/builder.h" #include "arrow/pretty_print.h" #include "arrow/test-util.h" #include "arrow/type.h" #include "arrow/type_traits.h" -#include "arrow/types/list.h" -#include "arrow/types/primitive.h" -#include "arrow/types/string.h" -#include "arrow/types/struct.h" namespace arrow { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/pretty_print.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc index c63a9e9..9c439c4 100644 --- a/cpp/src/arrow/pretty_print.cc +++ b/cpp/src/arrow/pretty_print.cc @@ -22,13 +22,10 @@ #include "arrow/array.h" #include "arrow/pretty_print.h" +#include "arrow/status.h" #include "arrow/table.h" #include "arrow/type.h" #include "arrow/type_traits.h" -#include "arrow/types/list.h" -#include "arrow/types/string.h" -#include "arrow/types/struct.h" -#include "arrow/util/status.h" namespace arrow { http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/status-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/status-test.cc b/cpp/src/arrow/status-test.cc new file mode 100644 index 0000000..969ba97 --- /dev/null +++ b/cpp/src/arrow/status-test.cc @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gtest/gtest.h" + +#include "arrow/status.h" +#include "arrow/test-util.h" + +namespace arrow { + +TEST(StatusTest, TestCodeAndMessage) { + Status ok = Status::OK(); + ASSERT_EQ(StatusCode::OK, ok.code()); + Status file_error = Status::IOError("file error"); + ASSERT_EQ(StatusCode::IOError, file_error.code()); + ASSERT_EQ("file error", file_error.message()); +} + +TEST(StatusTest, TestToString) { + Status file_error = Status::IOError("file error"); + ASSERT_EQ("IOError: file error", file_error.ToString()); +} + +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/2c10d7cc/cpp/src/arrow/status.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/status.cc b/cpp/src/arrow/status.cc new file mode 100644 index 0000000..e1a2427 --- /dev/null +++ b/cpp/src/arrow/status.cc @@ -0,0 +1,86 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// A Status encapsulates the result of an operation. It may indicate success, +// or it may indicate an error with an associated error message. +// +// Multiple threads can invoke const methods on a Status without +// external synchronization, but if any of the threads may call a +// non-const method, all threads accessing the same Status must use +// external synchronization. + +#include "arrow/status.h" + +#include <assert.h> + +namespace arrow { + +Status::Status(StatusCode code, const std::string& msg, int16_t posix_code) { + assert(code != StatusCode::OK); + const uint32_t size = msg.size(); + char* result = new char[size + 7]; + memcpy(result, &size, sizeof(size)); + result[4] = static_cast<char>(code); + memcpy(result + 5, &posix_code, sizeof(posix_code)); + memcpy(result + 7, msg.c_str(), msg.size()); + state_ = result; +} + +const char* Status::CopyState(const char* state) { + uint32_t size; + memcpy(&size, state, sizeof(size)); + char* result = new char[size + 7]; + memcpy(result, state, size + 7); + return result; +} + +std::string Status::CodeAsString() const { + if (state_ == NULL) { return "OK"; } + + const char* type; + switch (code()) { + case StatusCode::OK: + type = "OK"; + break; + case StatusCode::OutOfMemory: + type = "Out of memory"; + break; + case StatusCode::KeyError: + type = "Key error"; + break; + case StatusCode::TypeError: + type = "Type error"; + break; + case StatusCode::Invalid: + type = "Invalid"; + break; + case StatusCode::IOError: + type = "IOError"; + break; + case StatusCode::UnknownError: + type = "Unknown error"; + break; + case StatusCode::NotImplemented: + type = "NotImplemented"; + break; + default: + type = "Unknown"; + break; + } + return std::string(type); +} + +std::string Status::ToString() const { + std::string result(CodeAsString()); + if (state_ == NULL) { return result; } + + result.append(": "); + + uint32_t length; + memcpy(&length, state_, sizeof(length)); + result.append(reinterpret_cast<const char*>(state_ + 7), length); + return result; +} + +} // namespace arrow
