http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/compare.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc index d039bba..27fad71 100644 --- a/cpp/src/arrow/compare.cc +++ b/cpp/src/arrow/compare.cc @@ -76,10 +76,10 @@ class RangeEqualsVisitor : public ArrayVisitor { const bool is_null = left.IsNull(i); if (is_null != right.IsNull(o_i)) { return false; } if (is_null) continue; - const int32_t begin_offset = left.offset(i); - const int32_t end_offset = left.offset(i + 1); - const int32_t right_begin_offset = right.offset(o_i); - const int32_t right_end_offset = right.offset(o_i + 1); + const int32_t begin_offset = left.value_offset(i); + const int32_t end_offset = left.value_offset(i + 1); + const int32_t right_begin_offset = right.value_offset(o_i); + const int32_t right_end_offset = right.value_offset(o_i + 1); // Underlying can't be equal if the size isn't equal if (end_offset - begin_offset != right_end_offset - right_begin_offset) { return false; @@ -169,10 +169,10 @@ class RangeEqualsVisitor : public ArrayVisitor { const bool is_null = left.IsNull(i); if (is_null != right.IsNull(o_i)) { return false; } if (is_null) continue; - const int32_t begin_offset = left.offset(i); - const int32_t end_offset = left.offset(i + 1); - const int32_t right_begin_offset = right.offset(o_i); - const int32_t right_end_offset = right.offset(o_i + 1); + const int32_t begin_offset = left.value_offset(i); + const int32_t end_offset = left.value_offset(i + 1); + const int32_t right_begin_offset = right.value_offset(o_i); + const int32_t right_end_offset = right.value_offset(o_i + 1); // Underlying can't be equal if the size isn't equal if (end_offset - begin_offset != right_end_offset - right_begin_offset) { return false; @@ -200,7 +200,11 @@ class RangeEqualsVisitor : public ArrayVisitor { for (size_t j = 0; j < left.fields().size(); ++j) { // TODO: really we should be comparing stretches of non-null data rather // than looking at one value at a time. - equal_fields = left.field(j)->RangeEquals(i, i + 1, o_i, right.field(j)); + const int left_abs_index = i + left.offset(); + const int right_abs_index = o_i + right.offset(); + + equal_fields = left.field(j)->RangeEquals( + left_abs_index, left_abs_index + 1, right_abs_index, right.field(j)); if (!equal_fields) { return false; } } } @@ -223,7 +227,7 @@ class RangeEqualsVisitor : public ArrayVisitor { // Define a mapping from the type id to child number uint8_t max_code = 0; - const std::vector<uint8_t> type_codes = left_type.type_ids; + const std::vector<uint8_t> type_codes = left_type.type_codes; for (size_t i = 0; i < type_codes.size(); ++i) { const uint8_t code = type_codes[i]; if (code > max_code) { max_code = code; } @@ -248,15 +252,19 @@ class RangeEqualsVisitor : public ArrayVisitor { id = left_ids[i]; child_num = type_id_to_child_num[id]; + const int left_abs_index = i + left.offset(); + const int right_abs_index = o_i + right.offset(); + // TODO(wesm): really we should be comparing stretches of non-null data // rather than looking at one value at a time. if (union_mode == UnionMode::SPARSE) { - if (!left.child(child_num)->RangeEquals(i, i + 1, o_i, right.child(child_num))) { + if (!left.child(child_num)->RangeEquals(left_abs_index, left_abs_index + 1, + right_abs_index, right.child(child_num))) { return false; } } else { - const int32_t offset = left.raw_offsets()[i]; - const int32_t o_offset = right.raw_offsets()[i]; + const int32_t offset = left.raw_value_offsets()[i]; + const int32_t o_offset = right.raw_value_offsets()[i]; if (!left.child(child_num)->RangeEquals( offset, offset + 1, o_offset, right.child(child_num))) { return false; @@ -315,20 +323,29 @@ class EqualsVisitor : public RangeEqualsVisitor { } result_ = true; } else { - result_ = left.data()->Equals(*right.data(), BitUtil::BytesForBits(left.length())); + result_ = BitmapEquals(left.data()->data(), left.offset(), right.data()->data(), + right.offset(), left.length()); } return Status::OK(); } bool IsEqualPrimitive(const PrimitiveArray& left) { const auto& right = static_cast<const PrimitiveArray&>(right_); - if (left.null_count() > 0) { - const uint8_t* left_data = left.data()->data(); - const uint8_t* right_data = right.data()->data(); - const auto& size_meta = dynamic_cast<const FixedWidthType&>(*left.type()); - const int value_byte_size = size_meta.bit_width() / 8; - DCHECK_GT(value_byte_size, 0); + const auto& size_meta = dynamic_cast<const FixedWidthType&>(*left.type()); + const int value_byte_size = size_meta.bit_width() / 8; + DCHECK_GT(value_byte_size, 0); + + const uint8_t* left_data = nullptr; + if (left.length() > 0) { + left_data = left.data()->data() + left.offset() * value_byte_size; + } + + const uint8_t* right_data = nullptr; + if (right.length() > 0) { + right_data = right.data()->data() + right.offset() * value_byte_size; + } + if (left.null_count() > 0) { for (int i = 0; i < left.length(); ++i) { if (!left.IsNull(i) && memcmp(left_data, right_data, value_byte_size)) { return false; @@ -339,7 +356,7 @@ class EqualsVisitor : public RangeEqualsVisitor { return true; } else { if (left.length() == 0) { return true; } - return left.data()->Equals(*right.data(), left.length()); + return memcmp(left_data, right_data, value_byte_size * left.length()) == 0; } } @@ -376,13 +393,46 @@ class EqualsVisitor : public RangeEqualsVisitor { Status Visit(const IntervalArray& left) override { return ComparePrimitive(left); } + template <typename ArrayType> + bool ValueOffsetsEqual(const ArrayType& left) { + const auto& right = static_cast<const ArrayType&>(right_); + + if (left.offset() == 0 && right.offset() == 0) { + return left.value_offsets()->Equals( + *right.value_offsets(), (left.length() + 1) * sizeof(int32_t)); + } else { + // One of the arrays is sliced; logic is more complicated because the + // value offsets are not both 0-based + auto left_offsets = + reinterpret_cast<const int32_t*>(left.value_offsets()->data()) + left.offset(); + auto right_offsets = + reinterpret_cast<const int32_t*>(right.value_offsets()->data()) + + right.offset(); + + for (int32_t i = 0; i < left.length() + 1; ++i) { + if (left_offsets[i] - left_offsets[0] != right_offsets[i] - right_offsets[0]) { + return false; + } + } + return true; + } + } + bool CompareBinary(const BinaryArray& left) { const auto& right = static_cast<const BinaryArray&>(right_); - bool equal_offsets = - left.offsets()->Equals(*right.offsets(), (left.length() + 1) * sizeof(int32_t)); + + bool equal_offsets = ValueOffsetsEqual<BinaryArray>(left); if (!equal_offsets) { return false; } - if (!left.data() && !(right.data())) { return true; } - return left.data()->Equals(*right.data(), left.raw_offsets()[left.length()]); + + if (left.offset() == 0 && right.offset() == 0) { + if (!left.data() && !(right.data())) { return true; } + return left.data()->Equals(*right.data(), left.raw_value_offsets()[left.length()]); + } else { + // Compare the corresponding data range + const int64_t total_bytes = left.value_offset(left.length()) - left.value_offset(0); + return std::memcmp(left.data()->data() + left.value_offset(0), + right.data()->data() + right.value_offset(0), total_bytes) == 0; + } } Status Visit(const StringArray& left) override { @@ -397,12 +447,20 @@ class EqualsVisitor : public RangeEqualsVisitor { Status Visit(const ListArray& left) override { const auto& right = static_cast<const ListArray&>(right_); - if (!left.offsets()->Equals( - *right.offsets(), (left.length() + 1) * sizeof(int32_t))) { + bool equal_offsets = ValueOffsetsEqual<ListArray>(left); + if (!equal_offsets) { result_ = false; - } else { + return Status::OK(); + } + + if (left.offset() == 0 && right.offset() == 0) { result_ = left.values()->Equals(right.values()); + } else { + // One of the arrays is sliced + result_ = left.values()->RangeEquals(left.value_offset(0), + left.value_offset(left.length()), right.value_offset(0), right.values()); } + return Status::OK(); } @@ -422,8 +480,8 @@ inline bool FloatingApproxEquals( const NumericArray<TYPE>& left, const NumericArray<TYPE>& right) { using T = typename TYPE::c_type; - auto left_data = reinterpret_cast<const T*>(left.data()->data()); - auto right_data = reinterpret_cast<const T*>(right.data()->data()); + const T* left_data = left.raw_data(); + const T* right_data = right.raw_data(); static constexpr T EPSILON = 1E-5; @@ -465,8 +523,8 @@ static bool BaseDataEquals(const Array& left, const Array& right) { return false; } if (left.null_count() > 0) { - return left.null_bitmap()->Equals( - *right.null_bitmap(), BitUtil::BytesForBits(left.length())); + return BitmapEquals(left.null_bitmap()->data(), left.offset(), + right.null_bitmap()->data(), right.offset(), left.length()); } return true; }
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/io/file.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc index ff58e53..c1f0ea4 100644 --- a/cpp/src/arrow/io/file.cc +++ b/cpp/src/arrow/io/file.cc @@ -401,8 +401,8 @@ class ReadableFile::ReadableFileImpl : public OSFile { Status Open(const std::string& path) { return OpenReadable(path); } Status ReadBuffer(int64_t nbytes, std::shared_ptr<Buffer>* out) { - auto buffer = std::make_shared<PoolBuffer>(pool_); - RETURN_NOT_OK(buffer->Resize(nbytes)); + std::shared_ptr<ResizableBuffer> buffer; + RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer)); int64_t bytes_read = 0; RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data())); http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/io/hdfs.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index 2845b0d..5682f44 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -125,8 +125,8 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { } Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) { - auto buffer = std::make_shared<PoolBuffer>(pool_); - RETURN_NOT_OK(buffer->Resize(nbytes)); + std::shared_ptr<ResizableBuffer> buffer; + RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer)); int64_t bytes_read = 0; RETURN_NOT_OK(ReadAt(position, nbytes, &bytes_read, buffer->mutable_data())); @@ -152,8 +152,8 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { } Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) { - auto buffer = std::make_shared<PoolBuffer>(pool_); - RETURN_NOT_OK(buffer->Resize(nbytes)); + std::shared_ptr<ResizableBuffer> buffer; + RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer)); int64_t bytes_read = 0; RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data())); http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/io/io-hdfs-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/io-hdfs-test.cc b/cpp/src/arrow/io/io-hdfs-test.cc index 72e0ba8..f0e5a28 100644 --- a/cpp/src/arrow/io/io-hdfs-test.cc +++ b/cpp/src/arrow/io/io-hdfs-test.cc @@ -336,8 +336,9 @@ TYPED_TEST(TestHdfsClient, LargeFile) { std::shared_ptr<HdfsReadableFile> file; ASSERT_OK(this->client_->OpenReadable(path, &file)); - auto buffer = std::make_shared<PoolBuffer>(); - ASSERT_OK(buffer->Resize(size)); + std::shared_ptr<MutableBuffer> buffer; + ASSERT_OK(AllocateBuffer(nullptr, size, &buffer)); + int64_t bytes_read = 0; ASSERT_OK(file->Read(size, &bytes_read, buffer->mutable_data())); @@ -348,8 +349,9 @@ TYPED_TEST(TestHdfsClient, LargeFile) { std::shared_ptr<HdfsReadableFile> file2; ASSERT_OK(this->client_->OpenReadable(path, 1 << 18, &file2)); - auto buffer2 = std::make_shared<PoolBuffer>(); - ASSERT_OK(buffer2->Resize(size)); + std::shared_ptr<MutableBuffer> buffer2; + ASSERT_OK(AllocateBuffer(nullptr, size, &buffer2)); + ASSERT_OK(file2->Read(size, &bytes_read, buffer2->mutable_data())); ASSERT_EQ(0, std::memcmp(buffer2->data(), data.data(), size)); ASSERT_EQ(size, bytes_read); http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/io/io-memory-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/io-memory-test.cc b/cpp/src/arrow/io/io-memory-test.cc index c0b0165..442cd0c 100644 --- a/cpp/src/arrow/io/io-memory-test.cc +++ b/cpp/src/arrow/io/io-memory-test.cc @@ -73,8 +73,8 @@ TEST(TestBufferReader, RetainParentReference) { std::shared_ptr<Buffer> slice1; std::shared_ptr<Buffer> slice2; { - auto buffer = std::make_shared<PoolBuffer>(); - ASSERT_OK(buffer->Resize(static_cast<int64_t>(data.size()))); + std::shared_ptr<MutableBuffer> buffer; + ASSERT_OK(AllocateBuffer(nullptr, static_cast<int64_t>(data.size()), &buffer)); std::memcpy(buffer->mutable_data(), data.c_str(), data.size()); BufferReader reader(buffer); ASSERT_OK(reader.Read(4, &slice1)); http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/adapter.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc index c8e631c..3613ccb 100644 --- a/cpp/src/arrow/ipc/adapter.cc +++ b/cpp/src/arrow/ipc/adapter.cc @@ -17,6 +17,7 @@ #include "arrow/ipc/adapter.h" +#include <algorithm> #include <cstdint> #include <cstring> #include <sstream> @@ -30,6 +31,7 @@ #include "arrow/ipc/metadata-internal.h" #include "arrow/ipc/metadata.h" #include "arrow/ipc/util.h" +#include "arrow/memory_pool.h" #include "arrow/schema.h" #include "arrow/status.h" #include "arrow/table.h" @@ -49,9 +51,10 @@ namespace ipc { class RecordBatchWriter : public ArrayVisitor { public: - RecordBatchWriter( - const RecordBatch& batch, int64_t buffer_start_offset, int max_recursion_depth) - : batch_(batch), + RecordBatchWriter(MemoryPool* pool, const RecordBatch& batch, + int64_t buffer_start_offset, int max_recursion_depth) + : pool_(pool), + batch_(batch), max_recursion_depth_(max_recursion_depth), buffer_start_offset_(buffer_start_offset) {} @@ -62,7 +65,15 @@ class RecordBatchWriter : public ArrayVisitor { // push back all common elements field_nodes_.push_back(flatbuf::FieldNode(arr.length(), arr.null_count())); if (arr.null_count() > 0) { - buffers_.push_back(arr.null_bitmap()); + std::shared_ptr<Buffer> bitmap = arr.null_bitmap(); + + if (arr.offset() != 0) { + // With a sliced array / non-zero offset, we must copy the bitmap + RETURN_NOT_OK( + CopyBitmap(pool_, bitmap->data(), arr.offset(), arr.length(), &bitmap)); + } + + buffers_.push_back(bitmap); } else { // Push a dummy zero-length buffer, not to be copied buffers_.push_back(std::make_shared<Buffer>(nullptr, 0)); @@ -208,50 +219,136 @@ class RecordBatchWriter : public ArrayVisitor { private: Status Visit(const NullArray& array) override { return Status::NotImplemented("null"); } - Status VisitPrimitive(const PrimitiveArray& array) { - buffers_.push_back(array.data()); + template <typename ArrayType> + Status VisitFixedWidth(const ArrayType& array) { + std::shared_ptr<Buffer> data_buffer = array.data(); + + if (array.offset() != 0) { + // Non-zero offset, slice the buffer + const auto& fw_type = static_cast<const FixedWidthType&>(*array.type()); + const int type_width = fw_type.bit_width() / 8; + const int64_t byte_offset = array.offset() * type_width; + + // Send padding if it's available + const int64_t buffer_length = + std::min(BitUtil::RoundUpToMultipleOf64(array.length() * type_width), + data_buffer->size() - byte_offset); + data_buffer = SliceBuffer(data_buffer, byte_offset, buffer_length); + } + buffers_.push_back(data_buffer); + return Status::OK(); + } + + template <typename ArrayType> + Status GetZeroBasedValueOffsets( + const ArrayType& array, std::shared_ptr<Buffer>* value_offsets) { + // Share slicing logic between ListArray and BinaryArray + + auto offsets = array.value_offsets(); + + if (array.offset() != 0) { + // If we have a non-zero offset, then the value offsets do not start at + // zero. We must a) create a new offsets array with shifted offsets and + // b) slice the values array accordingly + + std::shared_ptr<MutableBuffer> shifted_offsets; + RETURN_NOT_OK(AllocateBuffer( + pool_, sizeof(int32_t) * (array.length() + 1), &shifted_offsets)); + + int32_t* dest_offsets = reinterpret_cast<int32_t*>(shifted_offsets->mutable_data()); + const int32_t start_offset = array.value_offset(0); + + for (int i = 0; i < array.length(); ++i) { + dest_offsets[i] = array.value_offset(i) - start_offset; + } + // Final offset + dest_offsets[array.length()] = array.value_offset(array.length()) - start_offset; + offsets = shifted_offsets; + } + + *value_offsets = offsets; return Status::OK(); } Status VisitBinary(const BinaryArray& array) { - buffers_.push_back(array.offsets()); - buffers_.push_back(array.data()); + std::shared_ptr<Buffer> value_offsets; + RETURN_NOT_OK(GetZeroBasedValueOffsets<BinaryArray>(array, &value_offsets)); + auto data = array.data(); + + if (array.offset() != 0) { + // Slice the data buffer to include only the range we need now + data = SliceBuffer(data, array.value_offset(0), array.value_offset(array.length())); + } + + buffers_.push_back(value_offsets); + buffers_.push_back(data); return Status::OK(); } - Status Visit(const BooleanArray& array) override { return VisitPrimitive(array); } + Status Visit(const BooleanArray& array) override { + buffers_.push_back(array.data()); + return Status::OK(); + } - Status Visit(const Int8Array& array) override { return VisitPrimitive(array); } + Status Visit(const Int8Array& array) override { + return VisitFixedWidth<Int8Array>(array); + } - Status Visit(const Int16Array& array) override { return VisitPrimitive(array); } + Status Visit(const Int16Array& array) override { + return VisitFixedWidth<Int16Array>(array); + } - Status Visit(const Int32Array& array) override { return VisitPrimitive(array); } + Status Visit(const Int32Array& array) override { + return VisitFixedWidth<Int32Array>(array); + } - Status Visit(const Int64Array& array) override { return VisitPrimitive(array); } + Status Visit(const Int64Array& array) override { + return VisitFixedWidth<Int64Array>(array); + } - Status Visit(const UInt8Array& array) override { return VisitPrimitive(array); } + Status Visit(const UInt8Array& array) override { + return VisitFixedWidth<UInt8Array>(array); + } - Status Visit(const UInt16Array& array) override { return VisitPrimitive(array); } + Status Visit(const UInt16Array& array) override { + return VisitFixedWidth<UInt16Array>(array); + } - Status Visit(const UInt32Array& array) override { return VisitPrimitive(array); } + Status Visit(const UInt32Array& array) override { + return VisitFixedWidth<UInt32Array>(array); + } - Status Visit(const UInt64Array& array) override { return VisitPrimitive(array); } + Status Visit(const UInt64Array& array) override { + return VisitFixedWidth<UInt64Array>(array); + } - Status Visit(const HalfFloatArray& array) override { return VisitPrimitive(array); } + Status Visit(const HalfFloatArray& array) override { + return VisitFixedWidth<HalfFloatArray>(array); + } - Status Visit(const FloatArray& array) override { return VisitPrimitive(array); } + Status Visit(const FloatArray& array) override { + return VisitFixedWidth<FloatArray>(array); + } - Status Visit(const DoubleArray& array) override { return VisitPrimitive(array); } + Status Visit(const DoubleArray& array) override { + return VisitFixedWidth<DoubleArray>(array); + } Status Visit(const StringArray& array) override { return VisitBinary(array); } Status Visit(const BinaryArray& array) override { return VisitBinary(array); } - Status Visit(const DateArray& array) override { return VisitPrimitive(array); } + Status Visit(const DateArray& array) override { + return VisitFixedWidth<DateArray>(array); + } - Status Visit(const TimeArray& array) override { return VisitPrimitive(array); } + Status Visit(const TimeArray& array) override { + return VisitFixedWidth<TimeArray>(array); + } - Status Visit(const TimestampArray& array) override { return VisitPrimitive(array); } + Status Visit(const TimestampArray& array) override { + return VisitFixedWidth<TimestampArray>(array); + } Status Visit(const IntervalArray& array) override { return Status::NotImplemented("interval"); @@ -262,30 +359,109 @@ class RecordBatchWriter : public ArrayVisitor { } Status Visit(const ListArray& array) override { - buffers_.push_back(array.offsets()); + std::shared_ptr<Buffer> value_offsets; + RETURN_NOT_OK(GetZeroBasedValueOffsets<ListArray>(array, &value_offsets)); + buffers_.push_back(value_offsets); + --max_recursion_depth_; - RETURN_NOT_OK(VisitArray(*array.values().get())); + std::shared_ptr<Array> values = array.values(); + + if (array.offset() != 0) { + // For non-zero offset, we slice the values array accordingly + const int32_t offset = array.value_offset(0); + const int32_t length = array.value_offset(array.length()) - offset; + values = values->Slice(offset, length); + } + RETURN_NOT_OK(VisitArray(*values)); ++max_recursion_depth_; return Status::OK(); } Status Visit(const StructArray& array) override { --max_recursion_depth_; - for (const auto& field : array.fields()) { - RETURN_NOT_OK(VisitArray(*field.get())); + for (std::shared_ptr<Array> field : array.fields()) { + if (array.offset() != 0) { + // If offset is non-zero, slice the child array + field = field->Slice(array.offset(), array.length()); + } + RETURN_NOT_OK(VisitArray(*field)); } ++max_recursion_depth_; return Status::OK(); } Status Visit(const UnionArray& array) override { - buffers_.push_back(array.type_ids()); + auto type_ids = array.type_ids(); + if (array.offset() != 0) { + type_ids = SliceBuffer(type_ids, array.offset() * sizeof(UnionArray::type_id_t), + array.length() * sizeof(UnionArray::type_id_t)); + } - if (array.mode() == UnionMode::DENSE) { buffers_.push_back(array.offsets()); } + buffers_.push_back(type_ids); --max_recursion_depth_; - for (const auto& field : array.children()) { - RETURN_NOT_OK(VisitArray(*field.get())); + if (array.mode() == UnionMode::DENSE) { + const auto& type = static_cast<const UnionType&>(*array.type()); + auto value_offsets = array.value_offsets(); + + // The Union type codes are not necessary 0-indexed + uint8_t max_code = 0; + for (uint8_t code : type.type_codes) { + if (code > max_code) { max_code = code; } + } + + // Allocate an array of child offsets. Set all to -1 to indicate that we + // haven't observed a first occurrence of a particular child yet + std::vector<int32_t> child_offsets(max_code + 1); + std::vector<int32_t> child_lengths(max_code + 1, 0); + + if (array.offset() != 0) { + // This is an unpleasant case. Because the offsets are different for + // each child array, when we have a sliced array, we need to "rebase" + // the value_offsets for each array + + const int32_t* unshifted_offsets = array.raw_value_offsets(); + const uint8_t* type_ids = array.raw_type_ids(); + + // Allocate the shifted offsets + std::shared_ptr<MutableBuffer> shifted_offsets_buffer; + RETURN_NOT_OK(AllocateBuffer( + pool_, array.length() * sizeof(int32_t), &shifted_offsets_buffer)); + int32_t* shifted_offsets = + reinterpret_cast<int32_t*>(shifted_offsets_buffer->mutable_data()); + + for (int32_t i = 0; i < array.length(); ++i) { + const uint8_t code = type_ids[i]; + int32_t shift = child_offsets[code]; + if (shift == -1) { child_offsets[code] = shift = unshifted_offsets[i]; } + shifted_offsets[i] = unshifted_offsets[i] - shift; + + // Update the child length to account for observed value + ++child_lengths[code]; + } + + value_offsets = shifted_offsets_buffer; + } + buffers_.push_back(value_offsets); + + // Visit children and slice accordingly + for (int i = 0; i < type.num_children(); ++i) { + std::shared_ptr<Array> child = array.child(i); + if (array.offset() != 0) { + const uint8_t code = type.type_codes[i]; + child = child->Slice(child_offsets[code], child_lengths[code]); + } + RETURN_NOT_OK(VisitArray(*child)); + } + } else { + for (std::shared_ptr<Array> child : array.children()) { + // Sparse union, slicing is simpler + if (array.offset() != 0) { + // If offset is non-zero, slice the child array + child = child->Slice(array.offset(), array.length()); + } + RETURN_NOT_OK(VisitArray(*child)); + } } ++max_recursion_depth_; return Status::OK(); @@ -298,6 +474,8 @@ class RecordBatchWriter : public ArrayVisitor { return Status::OK(); } + // In some cases, intermediate buffers may need to be allocated (with sliced arrays) + MemoryPool* pool_; const RecordBatch& batch_; std::vector<flatbuf::FieldNode> field_nodes_; @@ -310,14 +488,14 @@ class RecordBatchWriter : public ArrayVisitor { Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, - int max_recursion_depth) { + MemoryPool* pool, int max_recursion_depth) { DCHECK_GT(max_recursion_depth, 0); - RecordBatchWriter serializer(batch, buffer_start_offset, max_recursion_depth); + RecordBatchWriter serializer(pool, batch, buffer_start_offset, max_recursion_depth); return serializer.Write(dst, metadata_length, body_length); } Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) { - RecordBatchWriter serializer(batch, 0, kMaxIpcRecursionDepth); + RecordBatchWriter serializer(default_memory_pool(), batch, 0, kMaxIpcRecursionDepth); RETURN_NOT_OK(serializer.GetTotalSize(size)); return Status::OK(); } @@ -372,7 +550,7 @@ class ArrayLoader : public TypeVisitor { BufferMetadata metadata = context_->metadata->buffer(buffer_index); if (metadata.length == 0) { - *out = std::make_shared<Buffer>(nullptr, 0); + *out = nullptr; return Status::OK(); } else { return file_->ReadAt(metadata.offset, metadata.length, out); @@ -412,8 +590,8 @@ class ArrayLoader : public TypeVisitor { context_->buffer_index++; data.reset(new Buffer(nullptr, 0)); } - return MakePrimitiveArray(field_.type, field_meta.length, data, field_meta.null_count, - null_bitmap, &result_); + return MakePrimitiveArray(field_.type, field_meta.length, data, null_bitmap, + field_meta.null_count, 0, &result_); } template <typename CONTAINER> @@ -428,7 +606,7 @@ class ArrayLoader : public TypeVisitor { RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &values)); result_ = std::make_shared<CONTAINER>( - field_meta.length, offsets, values, field_meta.null_count, null_bitmap); + field_meta.length, offsets, values, null_bitmap, field_meta.null_count); return Status::OK(); } @@ -496,7 +674,7 @@ class ArrayLoader : public TypeVisitor { RETURN_NOT_OK(LoadChild(*type.child(0).get(), &values_array)); result_ = std::make_shared<ListArray>(field_.type, field_meta.length, offsets, - values_array, field_meta.null_count, null_bitmap); + values_array, null_bitmap, field_meta.null_count); return Status::OK(); } @@ -521,7 +699,7 @@ class ArrayLoader : public TypeVisitor { RETURN_NOT_OK(LoadChildren(type.children(), &fields)); result_ = std::make_shared<StructArray>( - field_.type, field_meta.length, fields, field_meta.null_count, null_bitmap); + field_.type, field_meta.length, fields, null_bitmap, field_meta.null_count); return Status::OK(); } @@ -542,7 +720,7 @@ class ArrayLoader : public TypeVisitor { RETURN_NOT_OK(LoadChildren(type.children(), &fields)); result_ = std::make_shared<UnionArray>(field_.type, field_meta.length, fields, - type_ids, offsets, field_meta.null_count, null_bitmap); + type_ids, offsets, null_bitmap, field_meta.null_count); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/adapter.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h index f9ef7d9..83542d0 100644 --- a/cpp/src/arrow/ipc/adapter.h +++ b/cpp/src/arrow/ipc/adapter.h @@ -30,6 +30,7 @@ namespace arrow { class Array; +class MemoryPool; class RecordBatch; class Schema; class Status; @@ -71,14 +72,15 @@ constexpr int kMaxIpcRecursionDepth = 64; // // @param(out) body_length: the size of the contiguous buffer block plus // padding bytes -ARROW_EXPORT Status WriteRecordBatch(const RecordBatch& batch, +Status ARROW_EXPORT WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, - int64_t* body_length, int max_recursion_depth = kMaxIpcRecursionDepth); + int64_t* body_length, MemoryPool* pool, + int max_recursion_depth = kMaxIpcRecursionDepth); // Compute the precise number of bytes needed in a contiguous memory segment to // write the record batch. This involves generating the complete serialized // Flatbuffers metadata. -ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size); +Status ARROW_EXPORT GetRecordBatchSize(const RecordBatch& batch, int64_t* size); // ---------------------------------------------------------------------- // "Read" path; does not copy data if the input supports zero copy reads http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/ipc-adapter-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc index 17868f8..bae6578 100644 --- a/cpp/src/arrow/ipc/ipc-adapter-test.cc +++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc @@ -32,6 +32,7 @@ #include "arrow/buffer.h" #include "arrow/memory_pool.h" +#include "arrow/pretty_print.h" #include "arrow/status.h" #include "arrow/test-util.h" #include "arrow/util/bit-util.h" @@ -56,7 +57,7 @@ class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>, const int64_t buffer_offset = 0; RETURN_NOT_OK(WriteRecordBatch( - batch, buffer_offset, mmap_.get(), &metadata_length, &body_length)); + batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, pool_)); std::shared_ptr<RecordBatchMetadata> metadata; RETURN_NOT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata)); @@ -92,17 +93,49 @@ TEST_P(TestWriteRecordBatch, RoundTrip) { } } -INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRecordBatch, - ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch, - &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, - &MakeStringTypesRecordBatch, &MakeStruct, &MakeUnion)); +TEST_P(TestWriteRecordBatch, SliceRoundTrip) { + std::shared_ptr<RecordBatch> batch; + ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue + std::shared_ptr<RecordBatch> batch_result; + + // Skip the zero-length case + if (batch->num_rows() < 2) { return; } + + auto sliced_batch = batch->Slice(2, 10); + + ASSERT_OK(RoundTripHelper(*sliced_batch, 1 << 16, &batch_result)); + + EXPECT_EQ(sliced_batch->num_rows(), batch_result->num_rows()); + + for (int i = 0; i < sliced_batch->num_columns(); ++i) { + const auto& left = *sliced_batch->column(i); + const auto& right = *batch_result->column(i); + if (!left.Equals(right)) { + std::stringstream pp_result; + std::stringstream pp_expected; + + ASSERT_OK(PrettyPrint(left, 0, &pp_expected)); + ASSERT_OK(PrettyPrint(right, 0, &pp_result)); + + FAIL() << "Index: " << i << " Expected: " << pp_expected.str() + << "\nGot: " << pp_result.str(); + } + } +} + +INSTANTIATE_TEST_CASE_P( + RoundTripTests, TestWriteRecordBatch, + ::testing::Values(&MakeIntRecordBatch, &MakeStringTypesRecordBatch, + &MakeNonNullRecordBatch, &MakeZeroLengthRecordBatch, &MakeListRecordBatch, + &MakeDeeplyNestedList, &MakeStruct, &MakeUnion)); void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) { ipc::MockOutputStream mock; int32_t mock_metadata_length = -1; int64_t mock_body_length = -1; int64_t size = -1; - ASSERT_OK(WriteRecordBatch(*batch, 0, &mock, &mock_metadata_length, &mock_body_length)); + ASSERT_OK(WriteRecordBatch( + *batch, 0, &mock, &mock_metadata_length, &mock_body_length, default_memory_pool())); ASSERT_OK(GetRecordBatchSize(*batch, &size)); ASSERT_EQ(mock.GetExtentBytesWritten(), size); } @@ -156,10 +189,11 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture { io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_); if (override_level) { - return WriteRecordBatch( - *batch, 0, mmap_.get(), metadata_length, body_length, recursion_level + 1); + return WriteRecordBatch(*batch, 0, mmap_.get(), metadata_length, body_length, pool_, + recursion_level + 1); } else { - return WriteRecordBatch(*batch, 0, mmap_.get(), metadata_length, body_length); + return WriteRecordBatch( + *batch, 0, mmap_.get(), metadata_length, body_length, pool_); } } http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/ipc-json-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc index 30f968c..3e759cc 100644 --- a/cpp/src/arrow/ipc/ipc-json-test.cc +++ b/cpp/src/arrow/ipc/ipc-json-test.cc @@ -80,7 +80,7 @@ template <typename T, typename ValueType> void CheckPrimitive(const std::shared_ptr<DataType>& type, const std::vector<bool>& is_valid, const std::vector<ValueType>& values) { MemoryPool* pool = default_memory_pool(); - typename TypeTraits<T>::BuilderType builder(pool, type); + typename TypeTraits<T>::BuilderType builder(pool); for (size_t i = 0; i < values.size(); ++i) { if (is_valid[i]) { @@ -146,12 +146,11 @@ TEST(TestJsonArrayWriter, NestedTypes) { std::vector<int32_t> values = {0, 1, 2, 3, 4, 5, 6}; std::shared_ptr<Array> values_array; - ArrayFromVector<Int32Type, int32_t>(int32(), values_is_valid, values, &values_array); + ArrayFromVector<Int32Type, int32_t>(values_is_valid, values, &values_array); std::vector<int16_t> i16_values = {0, 1, 2, 3, 4, 5, 6}; std::shared_ptr<Array> i16_values_array; - ArrayFromVector<Int16Type, int16_t>( - int16(), values_is_valid, i16_values, &i16_values_array); + ArrayFromVector<Int16Type, int16_t>(values_is_valid, i16_values, &i16_values_array); // List std::vector<bool> list_is_valid = {true, false, true, true, true}; @@ -161,7 +160,7 @@ TEST(TestJsonArrayWriter, NestedTypes) { ASSERT_OK(test::GetBitmapFromBoolVector(list_is_valid, &list_bitmap)); std::shared_ptr<Buffer> offsets_buffer = test::GetBufferFromVector(offsets); - ListArray list_array(list(value_type), 5, offsets_buffer, values_array, 1, list_bitmap); + ListArray list_array(list(value_type), 5, offsets_buffer, values_array, list_bitmap, 1); TestArrayRoundTrip(list_array); @@ -175,7 +174,7 @@ TEST(TestJsonArrayWriter, NestedTypes) { std::vector<std::shared_ptr<Array>> fields = {values_array, values_array, values_array}; StructArray struct_array( - struct_type, static_cast<int>(struct_is_valid.size()), fields, 2, struct_bitmap); + struct_type, static_cast<int>(struct_is_valid.size()), fields, struct_bitmap, 2); TestArrayRoundTrip(struct_array); } @@ -202,15 +201,15 @@ void MakeBatchArrays(const std::shared_ptr<Schema>& schema, const int num_rows, test::randint<int32_t>(num_rows, 0, 100, &v2_values); std::shared_ptr<Array> v1; - ArrayFromVector<Int8Type, int8_t>(schema->field(0)->type, is_valid, v1_values, &v1); + ArrayFromVector<Int8Type, int8_t>(is_valid, v1_values, &v1); std::shared_ptr<Array> v2; - ArrayFromVector<Int32Type, int32_t>(schema->field(1)->type, is_valid, v2_values, &v2); + ArrayFromVector<Int32Type, int32_t>(is_valid, v2_values, &v2); static const int kBufferSize = 10; static uint8_t buffer[kBufferSize]; static uint32_t seed = 0; - StringBuilder string_builder(default_memory_pool(), utf8()); + StringBuilder string_builder(default_memory_pool()); for (int i = 0; i < num_rows; ++i) { if (!is_valid[i]) { string_builder.AppendNull(); @@ -338,13 +337,13 @@ TEST(TestJsonFileReadWrite, MinimalFormatExample) { std::vector<bool> foo_valid = {true, false, true, true, true}; std::vector<int32_t> foo_values = {1, 2, 3, 4, 5}; std::shared_ptr<Array> foo; - ArrayFromVector<Int32Type, int32_t>(int32(), foo_valid, foo_values, &foo); + ArrayFromVector<Int32Type, int32_t>(foo_valid, foo_values, &foo); ASSERT_TRUE(batch->column(0)->Equals(foo)); std::vector<bool> bar_valid = {true, false, false, true, true}; std::vector<double> bar_values = {1, 2, 3, 4, 5}; std::shared_ptr<Array> bar; - ArrayFromVector<DoubleType, double>(float64(), bar_valid, bar_values, &bar); + ArrayFromVector<DoubleType, double>(bar_valid, bar_values, &bar); ASSERT_TRUE(batch->column(1)->Equals(bar)); } http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/json-integration-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc index 95bc742..17ccc4a 100644 --- a/cpp/src/arrow/ipc/json-integration-test.cc +++ b/cpp/src/arrow/ipc/json-integration-test.cc @@ -144,10 +144,8 @@ static Status ValidateArrowVsJson( if (!json_schema->Equals(arrow_schema)) { std::stringstream ss; - ss << "JSON schema: \n" - << json_schema->ToString() << "\n" - << "Arrow schema: \n" - << arrow_schema->ToString(); + ss << "JSON schema: \n" << json_schema->ToString() << "\n" + << "Arrow schema: \n" << arrow_schema->ToString(); if (FLAGS_verbose) { std::cout << ss.str() << std::endl; } return Status::Invalid("Schemas did not match"); http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/json-internal.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc index 43bd8a4..1a95b2c 100644 --- a/cpp/src/arrow/ipc/json-internal.cc +++ b/cpp/src/arrow/ipc/json-internal.cc @@ -199,8 +199,8 @@ class JsonSchemaWriter : public TypeVisitor { // Write type ids writer_->Key("typeIds"); writer_->StartArray(); - for (size_t i = 0; i < type.type_ids.size(); ++i) { - writer_->Uint(type.type_ids[i]); + for (size_t i = 0; i < type.type_codes.size(); ++i) { + writer_->Uint(type.type_codes[i]); } writer_->EndArray(); } @@ -464,7 +464,7 @@ class JsonArrayWriter : public ArrayVisitor { template <typename T> Status WriteVarBytes(const T& array) { WriteValidityField(array); - WriteIntegerField("OFFSET", array.raw_offsets(), array.length() + 1); + WriteIntegerField("OFFSET", array.raw_value_offsets(), array.length() + 1); WriteDataField(array); SetNoChildren(); return Status::OK(); @@ -532,7 +532,7 @@ class JsonArrayWriter : public ArrayVisitor { Status Visit(const ListArray& array) override { WriteValidityField(array); - WriteIntegerField("OFFSET", array.raw_offsets(), array.length() + 1); + WriteIntegerField("OFFSET", array.raw_value_offsets(), array.length() + 1); auto type = static_cast<const ListType*>(array.type().get()); return WriteChildren(type->children(), {array.values()}); } @@ -549,7 +549,7 @@ class JsonArrayWriter : public ArrayVisitor { WriteIntegerField("TYPE_ID", array.raw_type_ids(), array.length()); if (type->mode == UnionMode::DENSE) { - WriteIntegerField("OFFSET", array.raw_offsets(), array.length()); + WriteIntegerField("OFFSET", array.raw_value_offsets(), array.length()); } return WriteChildren(type->children(), array.children()); } @@ -718,17 +718,17 @@ class JsonSchemaReader { return Status::Invalid(ss.str()); } - const auto& json_type_ids = json_type.FindMember("typeIds"); - RETURN_NOT_ARRAY("typeIds", json_type_ids, json_type); + const auto& json_type_codes = json_type.FindMember("typeIds"); + RETURN_NOT_ARRAY("typeIds", json_type_codes, json_type); - std::vector<uint8_t> type_ids; - const auto& id_array = json_type_ids->value.GetArray(); + std::vector<uint8_t> type_codes; + const auto& id_array = json_type_codes->value.GetArray(); for (const rj::Value& val : id_array) { DCHECK(val.IsUint()); - type_ids.push_back(val.GetUint()); + type_codes.push_back(val.GetUint()); } - *type = union_(children, type_ids, mode); + *type = union_(children, type_codes, mode); return Status::OK(); } @@ -844,7 +844,7 @@ class JsonArrayReader { typename std::enable_if<std::is_base_of<BinaryType, T>::value, Status>::type ReadArray( const RjObject& json_array, int32_t length, const std::vector<bool>& is_valid, const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) { - typename TypeTraits<T>::BuilderType builder(pool_, type); + typename TypeTraits<T>::BuilderType builder(pool_); const auto& json_data = json_array.FindMember("DATA"); RETURN_NOT_ARRAY("DATA", json_data, json_array); @@ -869,8 +869,9 @@ class JsonArrayReader { template <typename T> Status GetIntArray( const RjArray& json_array, const int32_t length, std::shared_ptr<Buffer>* out) { - auto buffer = std::make_shared<PoolBuffer>(pool_); - RETURN_NOT_OK(buffer->Resize(length * sizeof(T))); + std::shared_ptr<MutableBuffer> buffer; + RETURN_NOT_OK(AllocateBuffer(pool_, length * sizeof(T), &buffer)); + T* values = reinterpret_cast<T*>(buffer->mutable_data()); for (int i = 0; i < length; ++i) { const rj::Value& val = json_array[i]; @@ -901,7 +902,7 @@ class JsonArrayReader { DCHECK_EQ(children.size(), 1); *array = std::make_shared<ListArray>( - type, length, offsets_buffer, children[0], null_count, validity_buffer); + type, length, offsets_buffer, children[0], validity_buffer, null_count); return Status::OK(); } @@ -918,7 +919,7 @@ class JsonArrayReader { RETURN_NOT_OK(GetChildren(json_array, type, &fields)); *array = - std::make_shared<StructArray>(type, length, fields, null_count, validity_buffer); + std::make_shared<StructArray>(type, length, fields, validity_buffer, null_count); return Status::OK(); } @@ -953,7 +954,7 @@ class JsonArrayReader { RETURN_NOT_OK(GetChildren(json_array, type, &children)); *array = std::make_shared<UnionArray>(type, length, children, type_id_buffer, - offsets_buffer, null_count, validity_buffer); + offsets_buffer, validity_buffer, null_count); return Status::OK(); } @@ -962,7 +963,7 @@ class JsonArrayReader { typename std::enable_if<std::is_base_of<NullType, T>::value, Status>::type ReadArray( const RjObject& json_array, int32_t length, const std::vector<bool>& is_valid, const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) { - *array = std::make_shared<NullArray>(type, length); + *array = std::make_shared<NullArray>(length); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/stream.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/stream.cc b/cpp/src/arrow/ipc/stream.cc index c9057e8..72eb134 100644 --- a/cpp/src/arrow/ipc/stream.cc +++ b/cpp/src/arrow/ipc/stream.cc @@ -28,6 +28,7 @@ #include "arrow/ipc/adapter.h" #include "arrow/ipc/metadata.h" #include "arrow/ipc/util.h" +#include "arrow/memory_pool.h" #include "arrow/schema.h" #include "arrow/status.h" #include "arrow/util/logging.h" @@ -41,7 +42,11 @@ namespace ipc { StreamWriter::~StreamWriter() {} StreamWriter::StreamWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema) - : sink_(sink), schema_(schema), position_(-1), started_(false) {} + : sink_(sink), + schema_(schema), + pool_(default_memory_pool()), + position_(-1), + started_(false) {} Status StreamWriter::UpdatePosition() { return sink_->Tell(&position_); @@ -76,8 +81,8 @@ Status StreamWriter::WriteRecordBatch(const RecordBatch& batch, FileBlock* block // Frame of reference in file format is 0, see ARROW-384 const int64_t buffer_start_offset = 0; - RETURN_NOT_OK(arrow::ipc::WriteRecordBatch( - batch, buffer_start_offset, sink_, &block->metadata_length, &block->body_length)); + RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(batch, buffer_start_offset, sink_, + &block->metadata_length, &block->body_length, pool_)); RETURN_NOT_OK(UpdatePosition()); DCHECK(position_ % 8 == 0) << "WriteRecordBatch did not perform aligned writes"; @@ -85,6 +90,10 @@ Status StreamWriter::WriteRecordBatch(const RecordBatch& batch, FileBlock* block return Status::OK(); } +void StreamWriter::set_memory_pool(MemoryPool* pool) { + pool_ = pool; +} + // ---------------------------------------------------------------------- // StreamWriter implementation http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/stream.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/stream.h b/cpp/src/arrow/ipc/stream.h index 53f51dc..12414fa 100644 --- a/cpp/src/arrow/ipc/stream.h +++ b/cpp/src/arrow/ipc/stream.h @@ -30,6 +30,7 @@ namespace arrow { class Array; class Buffer; struct Field; +class MemoryPool; class RecordBatch; class Schema; class Status; @@ -59,6 +60,10 @@ class ARROW_EXPORT StreamWriter { /// closing the actual OutputStream virtual Status Close(); + // In some cases, writing may require memory allocation. We use the default + // memory pool, but provide the option to override + void set_memory_pool(MemoryPool* pool); + protected: StreamWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema); @@ -81,6 +86,9 @@ class ARROW_EXPORT StreamWriter { io::OutputStream* sink_; std::shared_ptr<Schema> schema_; + + MemoryPool* pool_; + int64_t position_; bool started_; }; http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/test-common.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index ca790de..b4930c4 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -21,6 +21,7 @@ #include <algorithm> #include <cstdint> #include <memory> +#include <numeric> #include <string> #include <vector> @@ -28,6 +29,7 @@ #include "arrow/buffer.h" #include "arrow/builder.h" #include "arrow/memory_pool.h" +#include "arrow/status.h" #include "arrow/table.h" #include "arrow/test-util.h" #include "arrow/type.h" @@ -104,8 +106,8 @@ Status MakeIntRecordBatch(std::shared_ptr<RecordBatch>* out) { const int length = 1000; // Make the schema - auto f0 = std::make_shared<Field>("f0", int32()); - auto f1 = std::make_shared<Field>("f1", int32()); + auto f0 = field("f0", int32()); + auto f1 = field("f1", int32()); std::shared_ptr<Schema> schema(new Schema({f0, f1})); // Example data @@ -119,10 +121,10 @@ Status MakeIntRecordBatch(std::shared_ptr<RecordBatch>* out) { template <class Builder, class RawType> Status MakeRandomBinaryArray( - const TypePtr& type, int32_t length, MemoryPool* pool, std::shared_ptr<Array>* out) { + int32_t length, MemoryPool* pool, std::shared_ptr<Array>* out) { const std::vector<std::string> values = { "", "", "abc", "123", "efg", "456!@#!@#", "12312"}; - Builder builder(pool, type); + Builder builder(pool); const auto values_len = values.size(); for (int32_t i = 0; i < length; ++i) { int values_index = i % values_len; @@ -141,22 +143,22 @@ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out) { const int32_t length = 500; auto string_type = utf8(); auto binary_type = binary(); - auto f0 = std::make_shared<Field>("f0", string_type); - auto f1 = std::make_shared<Field>("f1", binary_type); + auto f0 = field("f0", string_type); + auto f1 = field("f1", binary_type); std::shared_ptr<Schema> schema(new Schema({f0, f1})); std::shared_ptr<Array> a0, a1; MemoryPool* pool = default_memory_pool(); + // Quirk with RETURN_NOT_OK macro and templated functions { - auto status = - MakeRandomBinaryArray<StringBuilder, char>(string_type, length, pool, &a0); - RETURN_NOT_OK(status); + auto s = MakeRandomBinaryArray<StringBuilder, char>(length, pool, &a0); + RETURN_NOT_OK(s); } + { - auto status = - MakeRandomBinaryArray<BinaryBuilder, uint8_t>(binary_type, length, pool, &a1); - RETURN_NOT_OK(status); + auto s = MakeRandomBinaryArray<BinaryBuilder, uint8_t>(length, pool, &a1); + RETURN_NOT_OK(s); } out->reset(new RecordBatch(schema, length, {a0, a1})); return Status::OK(); @@ -164,9 +166,9 @@ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out) { Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out) { // Make the schema - auto f0 = std::make_shared<Field>("f0", kListInt32); - auto f1 = std::make_shared<Field>("f1", kListListInt32); - auto f2 = std::make_shared<Field>("f2", int32()); + auto f0 = field("f0", kListInt32); + auto f1 = field("f1", kListListInt32); + auto f2 = field("f2", int32()); std::shared_ptr<Schema> schema(new Schema({f0, f1, f2})); // Example data @@ -187,14 +189,13 @@ Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out) { Status MakeZeroLengthRecordBatch(std::shared_ptr<RecordBatch>* out) { // Make the schema - auto f0 = std::make_shared<Field>("f0", kListInt32); - auto f1 = std::make_shared<Field>("f1", kListListInt32); - auto f2 = std::make_shared<Field>("f2", int32()); + auto f0 = field("f0", kListInt32); + auto f1 = field("f1", kListListInt32); + auto f2 = field("f2", int32()); std::shared_ptr<Schema> schema(new Schema({f0, f1, f2})); // Example data MemoryPool* pool = default_memory_pool(); - const int length = 200; const bool include_nulls = true; std::shared_ptr<Array> leaf_values, list_array, list_list_array, flat_array; RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &leaf_values)); @@ -202,15 +203,15 @@ Status MakeZeroLengthRecordBatch(std::shared_ptr<RecordBatch>* out) { RETURN_NOT_OK( MakeRandomListArray(list_array, 0, include_nulls, pool, &list_list_array)); RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &flat_array)); - out->reset(new RecordBatch(schema, length, {list_array, list_list_array, flat_array})); + out->reset(new RecordBatch(schema, 0, {list_array, list_list_array, flat_array})); return Status::OK(); } Status MakeNonNullRecordBatch(std::shared_ptr<RecordBatch>* out) { // Make the schema - auto f0 = std::make_shared<Field>("f0", kListInt32); - auto f1 = std::make_shared<Field>("f1", kListListInt32); - auto f2 = std::make_shared<Field>("f2", int32()); + auto f0 = field("f0", kListInt32); + auto f1 = field("f1", kListListInt32); + auto f2 = field("f2", int32()); std::shared_ptr<Schema> schema(new Schema({f0, f1, f2})); // Example data @@ -242,7 +243,7 @@ Status MakeDeeplyNestedList(std::shared_ptr<RecordBatch>* out) { RETURN_NOT_OK(MakeRandomListArray(array, batch_length, include_nulls, pool, &array)); } - auto f0 = std::make_shared<Field>("f0", type); + auto f0 = field("f0", type); std::shared_ptr<Schema> schema(new Schema({f0})); std::vector<std::shared_ptr<Array>> arrays = {array}; out->reset(new RecordBatch(schema, batch_length, arrays)); @@ -260,8 +261,8 @@ Status MakeStruct(std::shared_ptr<RecordBatch>* out) { // Define schema std::shared_ptr<DataType> type(new StructType( {list_schema->field(0), list_schema->field(1), list_schema->field(2)})); - auto f0 = std::make_shared<Field>("non_null_struct", type); - auto f1 = std::make_shared<Field>("null_struct", type); + auto f0 = field("non_null_struct", type); + auto f1 = field("null_struct", type); std::shared_ptr<Schema> schema(new Schema({f0, f1})); // construct individual nullable/non-nullable struct arrays @@ -271,7 +272,7 @@ Status MakeStruct(std::shared_ptr<RecordBatch>* out) { std::shared_ptr<Buffer> null_bitmask; RETURN_NOT_OK(BitUtil::BytesToBits(null_bytes, &null_bitmask)); std::shared_ptr<Array> with_nulls( - new StructArray(type, list_batch->num_rows(), columns, 1, null_bitmask)); + new StructArray(type, list_batch->num_rows(), columns, null_bitmask, 1)); // construct batch std::vector<std::shared_ptr<Array>> arrays = {no_nulls, with_nulls}; @@ -282,7 +283,7 @@ Status MakeStruct(std::shared_ptr<RecordBatch>* out) { Status MakeUnion(std::shared_ptr<RecordBatch>* out) { // Define schema std::vector<std::shared_ptr<Field>> union_types( - {std::make_shared<Field>("u0", int32()), std::make_shared<Field>("u1", uint8())}); + {field("u0", int32()), field("u1", uint8())}); std::vector<uint8_t> type_codes = {5, 10}; auto sparse_type = @@ -291,9 +292,9 @@ Status MakeUnion(std::shared_ptr<RecordBatch>* out) { auto dense_type = std::make_shared<UnionType>(union_types, type_codes, UnionMode::DENSE); - auto f0 = std::make_shared<Field>("sparse_nonnull", sparse_type, false); - auto f1 = std::make_shared<Field>("sparse", sparse_type); - auto f2 = std::make_shared<Field>("dense", dense_type); + auto f0 = field("sparse_nonnull", sparse_type, false); + auto f1 = field("sparse", sparse_type); + auto f2 = field("dense", dense_type); std::shared_ptr<Schema> schema(new Schema({f0, f1, f2})); @@ -308,21 +309,17 @@ Status MakeUnion(std::shared_ptr<RecordBatch>* out) { RETURN_NOT_OK(test::CopyBufferFromVector(type_ids, &type_ids_buffer)); std::vector<int32_t> u0_values = {0, 1, 2, 3, 4, 5, 6}; - ArrayFromVector<Int32Type, int32_t>( - sparse_type->child(0)->type, u0_values, &sparse_children[0]); + ArrayFromVector<Int32Type, int32_t>(u0_values, &sparse_children[0]); std::vector<uint8_t> u1_values = {10, 11, 12, 13, 14, 15, 16}; - ArrayFromVector<UInt8Type, uint8_t>( - sparse_type->child(1)->type, u1_values, &sparse_children[1]); + ArrayFromVector<UInt8Type, uint8_t>(u1_values, &sparse_children[1]); // dense children u0_values = {0, 2, 3, 7}; - ArrayFromVector<Int32Type, int32_t>( - dense_type->child(0)->type, u0_values, &dense_children[0]); + ArrayFromVector<Int32Type, int32_t>(u0_values, &dense_children[0]); u1_values = {11, 14, 15}; - ArrayFromVector<UInt8Type, uint8_t>( - dense_type->child(1)->type, u1_values, &dense_children[1]); + ArrayFromVector<UInt8Type, uint8_t>(u1_values, &dense_children[1]); std::shared_ptr<Buffer> offsets_buffer; std::vector<int32_t> offsets = {0, 0, 1, 2, 1, 2, 3}; @@ -337,10 +334,10 @@ Status MakeUnion(std::shared_ptr<RecordBatch>* out) { auto sparse_no_nulls = std::make_shared<UnionArray>(sparse_type, length, sparse_children, type_ids_buffer); auto sparse = std::make_shared<UnionArray>( - sparse_type, length, sparse_children, type_ids_buffer, nullptr, 1, null_bitmask); + sparse_type, length, sparse_children, type_ids_buffer, nullptr, null_bitmask, 1); auto dense = std::make_shared<UnionArray>(dense_type, length, dense_children, - type_ids_buffer, offsets_buffer, 1, null_bitmask); + type_ids_buffer, offsets_buffer, null_bitmask, 1); // construct batch std::vector<std::shared_ptr<Array>> arrays = {sparse_no_nulls, sparse, dense}; http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/pretty_print-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/pretty_print-test.cc b/cpp/src/arrow/pretty_print-test.cc index 4725d5d..aca650f 100644 --- a/cpp/src/arrow/pretty_print-test.cc +++ b/cpp/src/arrow/pretty_print-test.cc @@ -55,7 +55,7 @@ template <typename TYPE, typename C_TYPE> void CheckPrimitive(int indent, const std::vector<bool>& is_valid, const std::vector<C_TYPE>& values, const char* expected) { std::shared_ptr<Array> array; - ArrayFromVector<TYPE, C_TYPE>(std::make_shared<TYPE>(), is_valid, values, &array); + ArrayFromVector<TYPE, C_TYPE>(is_valid, values, &array); CheckArray(*array.get(), indent, expected); } @@ -76,12 +76,12 @@ TEST_F(TestPrettyPrint, DictionaryType) { std::shared_ptr<Array> dict; std::vector<std::string> dict_values = {"foo", "bar", "baz"}; - ArrayFromVector<StringType, std::string>(utf8(), dict_values, &dict); + ArrayFromVector<StringType, std::string>(dict_values, &dict); std::shared_ptr<DataType> dict_type = dictionary(int16(), dict); std::shared_ptr<Array> indices; std::vector<int16_t> indices_values = {1, 2, -1, 0, 2, 0}; - ArrayFromVector<Int16Type, int16_t>(int16(), is_valid, indices_values, &indices); + ArrayFromVector<Int16Type, int16_t>(is_valid, indices_values, &indices); auto arr = std::make_shared<DictionaryArray>(dict_type, indices); static const char* expected = R"expected( http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/pretty_print.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc index e30f4cc..23c0580 100644 --- a/cpp/src/arrow/pretty_print.cc +++ b/cpp/src/arrow/pretty_print.cc @@ -164,39 +164,56 @@ class ArrayPrinter : public ArrayVisitor { Status WriteValidityBitmap(const Array& array) { Newline(); Write("-- is_valid: "); - BooleanArray is_valid(array.length(), array.null_bitmap()); - return PrettyPrint(is_valid, indent_ + 2, sink_); + + if (array.null_count() > 0) { + BooleanArray is_valid( + array.length(), array.null_bitmap(), nullptr, 0, array.offset()); + return PrettyPrint(is_valid, indent_ + 2, sink_); + } else { + Write("all not null"); + return Status::OK(); + } } Status Visit(const ListArray& array) override { RETURN_NOT_OK(WriteValidityBitmap(array)); Newline(); - Write("-- offsets: "); - Int32Array offsets(array.length() + 1, array.offsets()); - RETURN_NOT_OK(PrettyPrint(offsets, indent_ + 2, sink_)); + Write("-- value_offsets: "); + Int32Array value_offsets( + array.length() + 1, array.value_offsets(), nullptr, 0, array.offset()); + RETURN_NOT_OK(PrettyPrint(value_offsets, indent_ + 2, sink_)); Newline(); Write("-- values: "); - RETURN_NOT_OK(PrettyPrint(*array.values().get(), indent_ + 2, sink_)); + auto values = array.values(); + if (array.offset() != 0) { + values = values->Slice(array.value_offset(0), array.value_offset(array.length())); + } + RETURN_NOT_OK(PrettyPrint(*values, indent_ + 2, sink_)); return Status::OK(); } - Status PrintChildren(const std::vector<std::shared_ptr<Array>>& fields) { + Status PrintChildren( + const std::vector<std::shared_ptr<Array>>& fields, int32_t offset, int32_t length) { for (size_t i = 0; i < fields.size(); ++i) { Newline(); std::stringstream ss; ss << "-- child " << i << " type: " << fields[i]->type()->ToString() << " values: "; Write(ss.str()); - RETURN_NOT_OK(PrettyPrint(*fields[i].get(), indent_ + 2, sink_)); + + std::shared_ptr<Array> field = fields[i]; + if (offset != 0) { field = field->Slice(offset, length); } + + RETURN_NOT_OK(PrettyPrint(*field, indent_ + 2, sink_)); } return Status::OK(); } Status Visit(const StructArray& array) override { RETURN_NOT_OK(WriteValidityBitmap(array)); - return PrintChildren(array.fields()); + return PrintChildren(array.fields(), array.offset(), array.length()); } Status Visit(const UnionArray& array) override { @@ -204,17 +221,19 @@ class ArrayPrinter : public ArrayVisitor { Newline(); Write("-- type_ids: "); - UInt8Array type_ids(array.length(), array.type_ids()); + UInt8Array type_ids(array.length(), array.type_ids(), nullptr, 0, array.offset()); RETURN_NOT_OK(PrettyPrint(type_ids, indent_ + 2, sink_)); if (array.mode() == UnionMode::DENSE) { Newline(); - Write("-- offsets: "); - Int32Array offsets(array.length(), array.offsets()); - RETURN_NOT_OK(PrettyPrint(offsets, indent_ + 2, sink_)); + Write("-- value_offsets: "); + Int32Array value_offsets( + array.length(), array.value_offsets(), nullptr, 0, array.offset()); + RETURN_NOT_OK(PrettyPrint(value_offsets, indent_ + 2, sink_)); } - return PrintChildren(array.children()); + // Print the children without any offset, because the type ids are absolute + return PrintChildren(array.children(), 0, array.length() + array.offset()); } Status Visit(const DictionaryArray& array) override { @@ -222,11 +241,11 @@ class ArrayPrinter : public ArrayVisitor { Newline(); Write("-- dictionary: "); - RETURN_NOT_OK(PrettyPrint(*array.dictionary().get(), indent_ + 2, sink_)); + RETURN_NOT_OK(PrettyPrint(*array.dictionary(), indent_ + 2, sink_)); Newline(); Write("-- indices: "); - return PrettyPrint(*array.indices().get(), indent_ + 2, sink_); + return PrettyPrint(*array.indices(), indent_ + 2, sink_); } void Write(const char* data) { (*sink_) << data; } @@ -260,7 +279,7 @@ Status PrettyPrint(const RecordBatch& batch, int indent, std::ostream* sink) { for (int i = 0; i < batch.num_columns(); ++i) { const std::string& name = batch.column_name(i); (*sink) << name << ": "; - RETURN_NOT_OK(PrettyPrint(*batch.column(i).get(), indent + 2, sink)); + RETURN_NOT_OK(PrettyPrint(*batch.column(i), indent + 2, sink)); (*sink) << "\n"; } return Status::OK(); http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/table-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc index 67c9f67..e7c5d66 100644 --- a/cpp/src/arrow/table-test.cc +++ b/cpp/src/arrow/table-test.cc @@ -242,4 +242,30 @@ TEST_F(TestRecordBatch, Equals) { ASSERT_FALSE(b1.Equals(b4)); } +TEST_F(TestRecordBatch, Slice) { + const int length = 10; + + auto f0 = std::make_shared<Field>("f0", int32()); + auto f1 = std::make_shared<Field>("f1", uint8()); + + vector<shared_ptr<Field>> fields = {f0, f1}; + auto schema = std::make_shared<Schema>(fields); + + auto a0 = MakePrimitive<Int32Array>(length); + auto a1 = MakePrimitive<UInt8Array>(length); + + RecordBatch batch(schema, length, {a0, a1}); + + auto batch_slice = batch.Slice(2); + auto batch_slice2 = batch.Slice(1, 5); + + for (int i = 0; i < batch.num_columns(); ++i) { + ASSERT_EQ(2, batch_slice->column(i)->offset()); + ASSERT_EQ(length - 2, batch_slice->column(i)->length()); + + ASSERT_EQ(1, batch_slice2->column(i)->offset()); + ASSERT_EQ(5, batch_slice2->column(i)->length()); + } +} + } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/table.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc index b3563ea..9e31ba5 100644 --- a/cpp/src/arrow/table.cc +++ b/cpp/src/arrow/table.cc @@ -60,6 +60,19 @@ bool RecordBatch::ApproxEquals(const RecordBatch& other) const { return true; } +std::shared_ptr<RecordBatch> RecordBatch::Slice(int32_t offset) { + return Slice(offset, this->num_rows() - offset); +} + +std::shared_ptr<RecordBatch> RecordBatch::Slice(int32_t offset, int32_t length) { + std::vector<std::shared_ptr<Array>> arrays; + arrays.reserve(num_columns()); + for (const auto& field : columns_) { + arrays.emplace_back(field->Slice(offset, length)); + } + return std::make_shared<RecordBatch>(schema_, num_rows_, arrays); +} + // ---------------------------------------------------------------------- // Table methods @@ -93,8 +106,7 @@ Status Table::FromRecordBatches(const std::string& name, if (!batches[i]->schema()->Equals(schema)) { std::stringstream ss; ss << "Schema at index " << static_cast<int>(i) << " was different: \n" - << schema->ToString() << "\nvs\n" - << batches[i]->schema()->ToString(); + << schema->ToString() << "\nvs\n" << batches[i]->schema()->ToString(); return Status::Invalid(ss.str()); } } @@ -126,8 +138,7 @@ Status ConcatenateTables(const std::string& output_name, if (!tables[i]->schema()->Equals(schema)) { std::stringstream ss; ss << "Schema at index " << static_cast<int>(i) << " was different: \n" - << schema->ToString() << "\nvs\n" - << tables[i]->schema()->ToString(); + << schema->ToString() << "\nvs\n" << tables[i]->schema()->ToString(); return Status::Invalid(ss.str()); } } http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/table.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h index 583847c..fa56824 100644 --- a/cpp/src/arrow/table.h +++ b/cpp/src/arrow/table.h @@ -64,6 +64,10 @@ class ARROW_EXPORT RecordBatch { // @returns: the number of rows (the corresponding length of each column) int32_t num_rows() const { return num_rows_; } + /// Slice each of the arrays in the record batch and construct a new RecordBatch object + std::shared_ptr<RecordBatch> Slice(int32_t offset); + std::shared_ptr<RecordBatch> Slice(int32_t offset, int32_t length); + private: std::shared_ptr<Schema> schema_; int32_t num_rows_; http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/test-util.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h index 4e52580..ffc7806 100644 --- a/cpp/src/arrow/test-util.h +++ b/cpp/src/arrow/test-util.h @@ -61,14 +61,6 @@ EXPECT_TRUE(s.ok()); \ } while (0) -// Alias MSVC popcount to GCC name -#ifdef _MSC_VER -#include <intrin.h> -#define __builtin_popcount __popcnt -#include <nmmintrin.h> -#define __builtin_popcountll _mm_popcnt_u64 -#endif - namespace arrow { namespace test { @@ -175,29 +167,6 @@ void rand_uniform_int(int n, uint32_t seed, T min_value, T max_value, T* out) { } } -static inline int bitmap_popcount(const uint8_t* data, int length) { - // book keeping - constexpr int pop_len = sizeof(uint64_t); - const uint64_t* i64_data = reinterpret_cast<const uint64_t*>(data); - const int fast_counts = length / pop_len; - const uint64_t* end = i64_data + fast_counts; - - int count = 0; - // popcount as much as possible with the widest possible count - for (auto iter = i64_data; iter < end; ++iter) { - count += __builtin_popcountll(*iter); - } - - // Account for left over bytes (in theory we could fall back to smaller - // versions of popcount but the code complexity is likely not worth it) - const int loop_tail_index = fast_counts * pop_len; - for (int i = loop_tail_index; i < length; ++i) { - if (BitUtil::GetBit(data, i)) { ++count; } - } - - return count; -} - static inline int null_count(const std::vector<uint8_t>& valid_bytes) { int result = 0; for (size_t i = 0; i < valid_bytes.size(); ++i) { @@ -254,7 +223,7 @@ class TestBase : public ::testing::Test { auto null_bitmap = std::make_shared<PoolBuffer>(pool_); EXPECT_OK(null_bitmap->Resize(BitUtil::BytesForBits(length))); - return std::make_shared<ArrayType>(length, data, null_count, null_bitmap); + return std::make_shared<ArrayType>(length, data, null_bitmap, null_count); } protected: @@ -263,11 +232,10 @@ class TestBase : public ::testing::Test { }; template <typename TYPE, typename C_TYPE> -void ArrayFromVector(const std::shared_ptr<DataType>& type, - const std::vector<bool>& is_valid, const std::vector<C_TYPE>& values, +void ArrayFromVector(const std::vector<bool>& is_valid, const std::vector<C_TYPE>& values, std::shared_ptr<Array>* out) { MemoryPool* pool = default_memory_pool(); - typename TypeTraits<TYPE>::BuilderType builder(pool, std::make_shared<TYPE>()); + typename TypeTraits<TYPE>::BuilderType builder(pool); for (size_t i = 0; i < values.size(); ++i) { if (is_valid[i]) { ASSERT_OK(builder.Append(values[i])); @@ -279,10 +247,9 @@ void ArrayFromVector(const std::shared_ptr<DataType>& type, } template <typename TYPE, typename C_TYPE> -void ArrayFromVector(const std::shared_ptr<DataType>& type, - const std::vector<C_TYPE>& values, std::shared_ptr<Array>* out) { +void ArrayFromVector(const std::vector<C_TYPE>& values, std::shared_ptr<Array>* out) { MemoryPool* pool = default_memory_pool(); - typename TypeTraits<TYPE>::BuilderType builder(pool, std::make_shared<TYPE>()); + typename TypeTraits<TYPE>::BuilderType builder(pool); for (size_t i = 0; i < values.size(); ++i) { ASSERT_OK(builder.Append(values[i])); } http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/type.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc index ba77584..a1c2b79 100644 --- a/cpp/src/arrow/type.cc +++ b/cpp/src/arrow/type.cc @@ -115,7 +115,7 @@ std::string UnionType::ToString() const { for (size_t i = 0; i < children_.size(); ++i) { if (i) { s << ", "; } - s << children_[i]->ToString() << "=" << static_cast<int>(type_ids[i]); + s << children_[i]->ToString() << "=" << static_cast<int>(type_codes[i]); } s << ">"; return s.str(); @@ -224,8 +224,8 @@ std::shared_ptr<DataType> struct_(const std::vector<std::shared_ptr<Field>>& fie } std::shared_ptr<DataType> union_(const std::vector<std::shared_ptr<Field>>& child_fields, - const std::vector<uint8_t>& type_ids, UnionMode mode) { - return std::make_shared<UnionType>(child_fields, type_ids, mode); + const std::vector<uint8_t>& type_codes, UnionMode mode) { + return std::make_shared<UnionType>(child_fields, type_codes, mode); } std::shared_ptr<DataType> dictionary(const std::shared_ptr<DataType>& index_type, http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/type.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 8638a3f..927b8a4 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -413,8 +413,8 @@ struct ARROW_EXPORT UnionType : public DataType { static constexpr Type::type type_id = Type::UNION; UnionType(const std::vector<std::shared_ptr<Field>>& fields, - const std::vector<uint8_t>& type_ids, UnionMode mode = UnionMode::SPARSE) - : DataType(Type::UNION), mode(mode), type_ids(type_ids) { + const std::vector<uint8_t>& type_codes, UnionMode mode = UnionMode::SPARSE) + : DataType(Type::UNION), mode(mode), type_codes(type_codes) { children_ = fields; } @@ -429,7 +429,7 @@ struct ARROW_EXPORT UnionType : public DataType { // The type id used in the data to indicate each data type in the union. For // example, the first type in the union might be denoted by the id 5 (instead // of 0). - std::vector<uint8_t> type_ids; + std::vector<uint8_t> type_codes; }; // ---------------------------------------------------------------------- @@ -551,7 +551,7 @@ std::shared_ptr<DataType> ARROW_EXPORT struct_( std::shared_ptr<DataType> ARROW_EXPORT union_( const std::vector<std::shared_ptr<Field>>& child_fields, - const std::vector<uint8_t>& type_ids, UnionMode mode = UnionMode::SPARSE); + const std::vector<uint8_t>& type_codes, UnionMode mode = UnionMode::SPARSE); std::shared_ptr<DataType> ARROW_EXPORT dictionary( const std::shared_ptr<DataType>& index_type, const std::shared_ptr<Array>& values); http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/type_traits.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h index 5cd5f45..c4898b1 100644 --- a/cpp/src/arrow/type_traits.h +++ b/cpp/src/arrow/type_traits.h @@ -126,6 +126,15 @@ struct TypeTraits<TimestampType> { }; template <> +struct TypeTraits<TimeType> { + using ArrayType = TimeArray; + // using BuilderType = TimestampBuilder; + + static inline int bytes_required(int elements) { return elements * sizeof(int64_t); } + constexpr static bool is_parameter_free = false; +}; + +template <> struct TypeTraits<HalfFloatType> { using ArrayType = HalfFloatArray; using BuilderType = HalfFloatBuilder; http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/util/bit-util-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/bit-util-test.cc b/cpp/src/arrow/util/bit-util-test.cc index cfdee04..cb2fd1a 100644 --- a/cpp/src/arrow/util/bit-util-test.cc +++ b/cpp/src/arrow/util/bit-util-test.cc @@ -17,11 +17,17 @@ #include "arrow/util/bit-util.h" +#include <cstdint> +#include <vector> + #include "gtest/gtest.h" +#include "arrow/buffer.h" +#include "arrow/test-util.h" + namespace arrow { -TEST(UtilTests, TestIsMultipleOf64) { +TEST(BitUtilTests, TestIsMultipleOf64) { using BitUtil::IsMultipleOf64; EXPECT_TRUE(IsMultipleOf64(64)); EXPECT_TRUE(IsMultipleOf64(0)); @@ -31,7 +37,7 @@ TEST(UtilTests, TestIsMultipleOf64) { EXPECT_FALSE(IsMultipleOf64(32)); } -TEST(UtilTests, TestNextPower2) { +TEST(BitUtilTests, TestNextPower2) { using BitUtil::NextPower2; ASSERT_EQ(8, NextPower2(6)); @@ -51,4 +57,56 @@ TEST(UtilTests, TestNextPower2) { ASSERT_EQ(1LL << 62, NextPower2((1LL << 62) - 1)); } +static inline int64_t SlowCountBits( + const uint8_t* data, int64_t bit_offset, int64_t length) { + int64_t count = 0; + for (int64_t i = bit_offset; i < bit_offset + length; ++i) { + if (BitUtil::GetBit(data, i)) { ++count; } + } + return count; +} + +TEST(BitUtilTests, TestCountSetBits) { + const int kBufferSize = 1000; + uint8_t buffer[kBufferSize] = {0}; + + test::random_bytes(kBufferSize, 0, buffer); + + const int num_bits = kBufferSize * 8; + + std::vector<int64_t> offsets = { + 0, 12, 16, 32, 37, 63, 64, 128, num_bits - 30, num_bits - 64}; + for (int64_t offset : offsets) { + int64_t result = CountSetBits(buffer, offset, num_bits - offset); + int64_t expected = SlowCountBits(buffer, offset, num_bits - offset); + + ASSERT_EQ(expected, result); + } +} + +TEST(BitUtilTests, TestCopyBitmap) { + const int kBufferSize = 1000; + + std::shared_ptr<MutableBuffer> buffer; + ASSERT_OK(AllocateBuffer(default_memory_pool(), kBufferSize, &buffer)); + memset(buffer->mutable_data(), 0, kBufferSize); + test::random_bytes(kBufferSize, 0, buffer->mutable_data()); + + const int num_bits = kBufferSize * 8; + + const uint8_t* src = buffer->data(); + + std::vector<int64_t> offsets = {0, 12, 16, 32, 37, 63, 64, 128}; + for (int64_t offset : offsets) { + const int64_t copy_length = num_bits - offset; + + std::shared_ptr<Buffer> copy; + ASSERT_OK(CopyBitmap(default_memory_pool(), src, offset, copy_length, ©)); + + for (int64_t i = 0; i < copy_length; ++i) { + ASSERT_EQ(BitUtil::GetBit(src, i + offset), BitUtil::GetBit(copy->data(), i)); + } + } +} + } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/util/bit-util.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/bit-util.cc b/cpp/src/arrow/util/bit-util.cc index 9c82407..f3fbb41 100644 --- a/cpp/src/arrow/util/bit-util.cc +++ b/cpp/src/arrow/util/bit-util.cc @@ -15,10 +15,20 @@ // specific language governing permissions and limitations // under the License. +// Alias MSVC popcount to GCC name +#ifdef _MSC_VER +#include <intrin.h> +#define __builtin_popcount __popcnt +#include <nmmintrin.h> +#define __builtin_popcountll _mm_popcnt_u64 +#endif + +#include <algorithm> #include <cstring> #include <vector> #include "arrow/buffer.h" +#include "arrow/memory_pool.h" #include "arrow/status.h" #include "arrow/util/bit-util.h" @@ -34,8 +44,9 @@ Status BitUtil::BytesToBits( const std::vector<uint8_t>& bytes, std::shared_ptr<Buffer>* out) { int bit_length = BitUtil::BytesForBits(bytes.size()); - auto buffer = std::make_shared<PoolBuffer>(); - RETURN_NOT_OK(buffer->Resize(bit_length)); + std::shared_ptr<MutableBuffer> buffer; + RETURN_NOT_OK(AllocateBuffer(default_memory_pool(), bit_length, &buffer)); + memset(buffer->mutable_data(), 0, bit_length); BytesToBits(bytes, buffer->mutable_data()); @@ -43,4 +54,72 @@ Status BitUtil::BytesToBits( return Status::OK(); } +int64_t CountSetBits(const uint8_t* data, int64_t bit_offset, int64_t length) { + constexpr int64_t pop_len = sizeof(uint64_t) * 8; + + int64_t count = 0; + + // The first bit offset where we can use a 64-bit wide hardware popcount + const int64_t fast_count_start = BitUtil::RoundUp(bit_offset, pop_len); + + // The number of bits until fast_count_start + const int64_t initial_bits = std::min(length, fast_count_start - bit_offset); + for (int64_t i = bit_offset; i < bit_offset + initial_bits; ++i) { + if (BitUtil::GetBit(data, i)) { ++count; } + } + + const int64_t fast_counts = (length - initial_bits) / pop_len; + + // Advance until the first aligned 8-byte word after the initial bits + const uint64_t* u64_data = + reinterpret_cast<const uint64_t*>(data) + fast_count_start / pop_len; + + const uint64_t* end = u64_data + fast_counts; + + // popcount as much as possible with the widest possible count + for (auto iter = u64_data; iter < end; ++iter) { + count += __builtin_popcountll(*iter); + } + + // Account for left over bit (in theory we could fall back to smaller + // versions of popcount but the code complexity is likely not worth it) + const int64_t tail_index = bit_offset + initial_bits + fast_counts * pop_len; + for (int64_t i = tail_index; i < bit_offset + length; ++i) { + if (BitUtil::GetBit(data, i)) { ++count; } + } + + return count; +} + +Status GetEmptyBitmap( + MemoryPool* pool, int64_t length, std::shared_ptr<MutableBuffer>* result) { + RETURN_NOT_OK(AllocateBuffer(pool, BitUtil::BytesForBits(length), result)); + memset((*result)->mutable_data(), 0, (*result)->size()); + return Status::OK(); +} + +Status CopyBitmap(MemoryPool* pool, const uint8_t* data, int32_t offset, int32_t length, + std::shared_ptr<Buffer>* out) { + std::shared_ptr<MutableBuffer> buffer; + RETURN_NOT_OK(GetEmptyBitmap(pool, length, &buffer)); + uint8_t* dest = buffer->mutable_data(); + for (int64_t i = 0; i < length; ++i) { + BitUtil::SetBitTo(dest, i, BitUtil::GetBit(data, i + offset)); + } + *out = buffer; + return Status::OK(); +} + +bool BitmapEquals(const uint8_t* left, int64_t left_offset, const uint8_t* right, + int64_t right_offset, int64_t bit_length) { + // TODO(wesm): Make this faster using word-wise comparisons + for (int64_t i = 0; i < bit_length; ++i) { + if (BitUtil::GetBit(left, left_offset + i) != + BitUtil::GetBit(right, right_offset + i)) { + return false; + } + } + return true; +} + } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/util/bit-util.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/bit-util.h b/cpp/src/arrow/util/bit-util.h index 5c8055f..a0fbdd2 100644 --- a/cpp/src/arrow/util/bit-util.h +++ b/cpp/src/arrow/util/bit-util.h @@ -28,6 +28,8 @@ namespace arrow { class Buffer; +class MemoryPool; +class MutableBuffer; class Status; namespace BitUtil { @@ -62,6 +64,12 @@ static inline void SetBit(uint8_t* bits, int i) { bits[i / 8] |= kBitmask[i % 8]; } +static inline void SetBitTo(uint8_t* bits, int i, bool bit_is_set) { + // See https://graphics.stanford.edu/~seander/bithacks.html + // "Conditionally set or clear bits without branching" + bits[i / 8] ^= (-bit_is_set ^ bits[i / 8]) & kBitmask[i % 8]; +} + static inline int64_t NextPower2(int64_t n) { n--; n |= n >> 1; @@ -82,6 +90,11 @@ static inline bool IsMultipleOf8(int64_t n) { return (n & 7) == 0; } +/// Returns 'value' rounded up to the nearest multiple of 'factor' +inline int64_t RoundUp(int64_t value, int64_t factor) { + return (value + (factor - 1)) / factor * factor; +} + inline int64_t RoundUpToMultipleOf64(int64_t num) { // TODO(wesm): is this definitely needed? // DCHECK_GE(num, 0); @@ -98,6 +111,38 @@ void BytesToBits(const std::vector<uint8_t>& bytes, uint8_t* bits); ARROW_EXPORT Status BytesToBits(const std::vector<uint8_t>&, std::shared_ptr<Buffer>*); } // namespace BitUtil + +// ---------------------------------------------------------------------- +// Bitmap utilities + +Status ARROW_EXPORT GetEmptyBitmap( + MemoryPool* pool, int64_t length, std::shared_ptr<MutableBuffer>* result); + +/// Copy a bit range of an existing bitmap +/// +/// \param[in] pool memory pool to allocate memory from +/// \param[in] bitmap source data +/// \param[in] offset bit offset into the source data +/// \param[in] length number of bits to copy +/// \param[out] out the resulting copy +/// +/// \return Status message +Status ARROW_EXPORT CopyBitmap(MemoryPool* pool, const uint8_t* bitmap, int32_t offset, + int32_t length, std::shared_ptr<Buffer>* out); + +/// Compute the number of 1's in the given data array +/// +/// \param[in] data a packed LSB-ordered bitmap as a byte array +/// \param[in] bit_offset a bitwise offset into the bitmap +/// \param[in] length the number of bits to inspect in the bitmap relative to the offset +/// +/// \return The number of set (1) bits in the range +int64_t ARROW_EXPORT CountSetBits( + const uint8_t* data, int64_t bit_offset, int64_t length); + +bool ARROW_EXPORT BitmapEquals(const uint8_t* left, int64_t left_offset, + const uint8_t* right, int64_t right_offset, int64_t bit_length); + } // namespace arrow #endif // ARROW_UTIL_BIT_UTIL_H http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/util/logging.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h index b22f07d..06ee841 100644 --- a/cpp/src/arrow/util/logging.h +++ b/cpp/src/arrow/util/logging.h @@ -118,9 +118,9 @@ class CerrLog { class FatalLog : public CerrLog { public: explicit FatalLog(int /* severity */) // NOLINT - : CerrLog(ARROW_FATAL){} // NOLINT + : CerrLog(ARROW_FATAL) {} // NOLINT - [[noreturn]] ~FatalLog() { + [[noreturn]] ~FatalLog() { if (has_logged_) { std::cerr << std::endl; } std::exit(1); } http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/util/macros.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/macros.h b/cpp/src/arrow/util/macros.h index c4a62a4..81a9b0c 100644 --- a/cpp/src/arrow/util/macros.h +++ b/cpp/src/arrow/util/macros.h @@ -25,6 +25,6 @@ TypeName& operator=(const TypeName&) = delete #endif -#define UNUSED(x) (void)x +#define UNUSED(x) (void) x #endif // ARROW_UTIL_MACROS_H http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/python/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 842a219..ba26692 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -95,7 +95,7 @@ if ("${COMPILER_FAMILY}" STREQUAL "clang") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Qunused-arguments") # Cython warnings in clang - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-parentheses-equality") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-parentheses-equality -Wno-constant-logical-operand") endif() set(PYARROW_LINK "a") http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/python/pyarrow/includes/libarrow.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 38883e8..ebfdc41 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -179,8 +179,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: double Value(int i) cdef cppclass CListArray" arrow::ListArray"(CArray): - const int32_t* offsets() - int32_t offset(int i) + const int32_t* raw_value_offsets() + int32_t value_offset(int i) int32_t value_length(int i) shared_ptr[CArray] values() shared_ptr[CDataType] value_type() http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/python/pyarrow/scalar.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/scalar.pyx b/python/pyarrow/scalar.pyx index 30b9040..9d2b2b1 100644 --- a/python/pyarrow/scalar.pyx +++ b/python/pyarrow/scalar.pyx @@ -202,7 +202,7 @@ cdef class ListValue(ArrayValue): self.value_type = box_data_type(self.ap.value_type()) cdef getitem(self, int i): - cdef int j = self.ap.offset(self.index) + i + cdef int j = self.ap.value_offset(self.index) + i return box_arrow_scalar(self.value_type, self.ap.values(), j) def as_py(self): http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/python/src/pyarrow/adapters/builtin.cc ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/adapters/builtin.cc b/python/src/pyarrow/adapters/builtin.cc index 1abfb40..5fd8eef 100644 --- a/python/src/pyarrow/adapters/builtin.cc +++ b/python/src/pyarrow/adapters/builtin.cc @@ -505,7 +505,7 @@ Status ConvertPySequence( // Handle NA / NullType case if (type->type == Type::NA) { - out->reset(new arrow::NullArray(type, size)); + out->reset(new arrow::NullArray(size)); return Status::OK(); }
