Repository: parquet-cpp Updated Branches: refs/heads/master 52d36960e -> fb9c1166c
PARQUET-827: Account for arrow::MemoryPool API change and fix bug in reading Int96 timestamps Prior to ARROW-427, the "length()" field was not being properly checked in the implementation of `PrimitiveArray::Equals`. Author: Wes McKinney <[email protected]> Closes #215 from wesm/PARQUET-825 and squashes the following commits: afa41f9 [Wes McKinney] Do not build arrow_jemalloc in arrow external project ff9f22e [Wes McKinney] cpplint, clang-format 802b325 [Wes McKinney] Update Arrow version 5f155b6 [Wes McKinney] Fix bug exposed by accidental bug fix in ARROW-427 a3c75bb [Wes McKinney] Implement TrackingAllocator based on Arrow default allocator Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/fb9c1166 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/fb9c1166 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/fb9c1166 Branch: refs/heads/master Commit: fb9c1166c5d7e77d54d7871ff55dfcf2990332bf Parents: 52d3696 Author: Wes McKinney <[email protected]> Authored: Sat Jan 7 15:05:12 2017 -0500 Committer: Wes McKinney <[email protected]> Committed: Sat Jan 7 15:05:12 2017 -0500 ---------------------------------------------------------------------- cmake_modules/ThirdpartyToolchain.cmake | 3 +- src/parquet/arrow/reader.cc | 1 + src/parquet/schema/descriptor.cc | 12 +++---- src/parquet/schema/schema-descriptor-test.cc | 16 ++++------ src/parquet/util/memory.cc | 39 +++++++++++++---------- src/parquet/util/memory.h | 16 +++++----- 6 files changed, 43 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fb9c1166/cmake_modules/ThirdpartyToolchain.cmake ---------------------------------------------------------------------- diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake index 48fee19..54033ec 100644 --- a/cmake_modules/ThirdpartyToolchain.cmake +++ b/cmake_modules/ThirdpartyToolchain.cmake @@ -22,7 +22,7 @@ set(THRIFT_VERSION "0.9.1") # Brotli 0.5.2 does not install headers/libraries yet, but 0.6.0.dev does set(BROTLI_VERSION "5db62dcc9d386579609540cdf8869e95ad334bbd") -set(ARROW_VERSION "e15c6a0b3c05b5b42c204f34369d127182450ca0") +set(ARROW_VERSION "74685f386307171a90a9f97316e25b7f39cdd0a1") # find boost headers and libs set(Boost_DEBUG TRUE) @@ -325,6 +325,7 @@ if (NOT ARROW_FOUND) set(ARROW_IO_STATIC_LIB "${ARROW_PREFIX}/lib/libarrow_io.a") set(ARROW_CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} -DCMAKE_INSTALL_PREFIX=${ARROW_PREFIX} + -DARROW_JEMALLOC=OFF -DARROW_BUILD_TESTS=OFF) if (CMAKE_VERSION VERSION_GREATER "3.2") http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fb9c1166/src/parquet/arrow/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index d1eec05..db281d9 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -252,6 +252,7 @@ void FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96T for (int64_t i = 0; i < values_read; i++) { out_ptr[i] = impala_timestamp_to_nanoseconds(values[i]); } + valid_bits_idx_ += values_read; } template <> http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fb9c1166/src/parquet/schema/descriptor.cc ---------------------------------------------------------------------- diff --git a/src/parquet/schema/descriptor.cc b/src/parquet/schema/descriptor.cc index c5250d1..0b0d006 100644 --- a/src/parquet/schema/descriptor.cc +++ b/src/parquet/schema/descriptor.cc @@ -48,14 +48,10 @@ void SchemaDescriptor::Init(const NodePtr& schema) { } bool SchemaDescriptor::Equals(const SchemaDescriptor& other) const { - if (this->num_columns() != other.num_columns()) { - return false; - } + if (this->num_columns() != other.num_columns()) { return false; } for (int i = 0; i < this->num_columns(); ++i) { - if (!this->Column(i)->Equals(*other.Column(i))) { - return false; - } + if (!this->Column(i)->Equals(*other.Column(i))) { return false; } } return true; @@ -98,8 +94,8 @@ ColumnDescriptor::ColumnDescriptor(const schema::NodePtr& node, bool ColumnDescriptor::Equals(const ColumnDescriptor& other) const { return primitive_node_->Equals(other.primitive_node_) && - max_repetition_level() == other.max_repetition_level() && - max_definition_level() == other.max_definition_level(); + max_repetition_level() == other.max_repetition_level() && + max_definition_level() == other.max_definition_level(); } const ColumnDescriptor* SchemaDescriptor::Column(int i) const { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fb9c1166/src/parquet/schema/schema-descriptor-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/schema/schema-descriptor-test.cc b/src/parquet/schema/schema-descriptor-test.cc index 467d63c..4b7f67c 100644 --- a/src/parquet/schema/schema-descriptor-test.cc +++ b/src/parquet/schema/schema-descriptor-test.cc @@ -89,30 +89,26 @@ TEST_F(TestSchemaDescriptor, Equals) { NodePtr bag2(GroupNode::Make("bag", Repetition::REQUIRED, {list})); SchemaDescriptor descr1; - descr1.Init(GroupNode::Make("schema", Repetition::REPEATED, - {inta, intb, intc, bag})); + descr1.Init(GroupNode::Make("schema", Repetition::REPEATED, {inta, intb, intc, bag})); ASSERT_TRUE(descr1.Equals(descr1)); SchemaDescriptor descr2; - descr2.Init(GroupNode::Make("schema", Repetition::REPEATED, - {inta, intb, intc, bag2})); + descr2.Init(GroupNode::Make("schema", Repetition::REPEATED, {inta, intb, intc, bag2})); ASSERT_FALSE(descr1.Equals(descr2)); SchemaDescriptor descr3; - descr3.Init(GroupNode::Make("schema", Repetition::REPEATED, - {inta, intb2, intc, bag})); + descr3.Init(GroupNode::Make("schema", Repetition::REPEATED, {inta, intb2, intc, bag})); ASSERT_FALSE(descr1.Equals(descr3)); // Robust to name of parent node SchemaDescriptor descr4; - descr4.Init(GroupNode::Make("SCHEMA", Repetition::REPEATED, - {inta, intb, intc, bag})); + descr4.Init(GroupNode::Make("SCHEMA", Repetition::REPEATED, {inta, intb, intc, bag})); ASSERT_TRUE(descr1.Equals(descr4)); SchemaDescriptor descr5; - descr5.Init(GroupNode::Make("schema", Repetition::REPEATED, - {inta, intb, intc, bag, intb2})); + descr5.Init( + GroupNode::Make("schema", Repetition::REPEATED, {inta, intb, intc, bag, intb2})); ASSERT_FALSE(descr1.Equals(descr5)); // Different max repetition / definition levels http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fb9c1166/src/parquet/util/memory.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/memory.cc b/src/parquet/util/memory.cc index b490c2e..72ed3ac 100644 --- a/src/parquet/util/memory.cc +++ b/src/parquet/util/memory.cc @@ -22,6 +22,8 @@ #include <cstdio> #include <string> +#include "arrow/status.h" + #include "parquet/exception.h" #include "parquet/types.h" #include "parquet/util/bit-util.h" @@ -30,31 +32,34 @@ namespace parquet { ::arrow::Status TrackingAllocator::Allocate(int64_t size, uint8_t** out) { - if (0 == size) { + if (size == 0) { *out = nullptr; return ::arrow::Status::OK(); } + ARROW_RETURN_NOT_OK(allocator_->Allocate(size, out)); + const int64_t total_memory = allocator_->bytes_allocated(); + if (total_memory > max_memory_) { max_memory_ = total_memory; } + return ::arrow::Status::OK(); +} - uint8_t* p = static_cast<uint8_t*>(std::malloc(size)); - if (!p) { return ::arrow::Status::OutOfMemory("memory allocation failed"); } - { - std::lock_guard<std::mutex> lock(stats_mutex_); - total_memory_ += size; - if (total_memory_ > max_memory_) { max_memory_ = total_memory_; } - } - *out = p; +::arrow::Status TrackingAllocator::Reallocate( + int64_t old_size, int64_t new_size, uint8_t** out) { + ARROW_RETURN_NOT_OK(allocator_->Reallocate(old_size, new_size, out)); + const int64_t total_memory = allocator_->bytes_allocated(); + if (total_memory > max_memory_) { max_memory_ = total_memory; } return ::arrow::Status::OK(); } void TrackingAllocator::Free(uint8_t* p, int64_t size) { - if (nullptr != p && size > 0) { - { - std::lock_guard<std::mutex> lock(stats_mutex_); - DCHECK_GE(total_memory_, size) << "Attempting to free too much memory"; - total_memory_ -= size; - } - std::free(p); - } + allocator_->Free(p, size); +} + +int64_t TrackingAllocator::max_memory() const { + return max_memory_.load(); +} + +int64_t TrackingAllocator::bytes_allocated() const { + return allocator_->bytes_allocated(); } MemoryAllocator* default_allocator() { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fb9c1166/src/parquet/util/memory.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h index b915450..2a71c37 100644 --- a/src/parquet/util/memory.h +++ b/src/parquet/util/memory.h @@ -18,11 +18,11 @@ #ifndef PARQUET_UTIL_MEMORY_H #define PARQUET_UTIL_MEMORY_H +#include <atomic> #include <cstdint> #include <cstdlib> #include <cstring> #include <memory> -#include <mutex> #include <string> #include <vector> @@ -72,19 +72,19 @@ PARQUET_EXPORT MemoryAllocator* default_allocator(); class PARQUET_EXPORT TrackingAllocator : public MemoryAllocator { public: - TrackingAllocator() : total_memory_(0), max_memory_(0) {} + explicit TrackingAllocator(MemoryAllocator* allocator = ::arrow::default_memory_pool()) + : allocator_(allocator), max_memory_(0) {} ::arrow::Status Allocate(int64_t size, uint8_t** out) override; + ::arrow::Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override; void Free(uint8_t* p, int64_t size) override; - int64_t bytes_allocated() const override { return total_memory_; } - - int64_t max_memory() { return max_memory_; } + int64_t max_memory() const; + int64_t bytes_allocated() const override; private: - std::mutex stats_mutex_; - int64_t total_memory_; - int64_t max_memory_; + MemoryAllocator* allocator_; + std::atomic<int64_t> max_memory_; }; template <class T>
