http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/builder.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc index ee363b9..d3a299e 100644 --- a/cpp/src/arrow/builder.cc +++ b/cpp/src/arrow/builder.cc @@ -69,7 +69,9 @@ Status ArrayBuilder::Init(int64_t capacity) { } Status ArrayBuilder::Resize(int64_t new_bits) { - if (!null_bitmap_) { return Init(new_bits); } + if (!null_bitmap_) { + return Init(new_bits); + } int64_t new_bytes = BitUtil::CeilByte(new_bits) / 8; int64_t old_bytes = null_bitmap_->size(); RETURN_NOT_OK(null_bitmap_->Resize(new_bytes)); @@ -78,8 +80,8 @@ Status ArrayBuilder::Resize(int64_t new_bits) { const int64_t byte_capacity = null_bitmap_->capacity(); capacity_ = new_bits; if (old_bytes < new_bytes) { - memset( - null_bitmap_data_ + old_bytes, 0, static_cast<size_t>(byte_capacity - old_bytes)); + memset(null_bitmap_data_ + old_bytes, 0, + static_cast<size_t>(byte_capacity - old_bytes)); } return Status::OK(); } @@ -140,7 +142,9 @@ void ArrayBuilder::UnsafeAppendToBitmap(const uint8_t* valid_bytes, int64_t leng bit_offset++; } - if (bit_offset != 0) { null_bitmap_data_[byte_offset] = bitset; } + if (bit_offset != 0) { + null_bitmap_data_[byte_offset] = bitset; + } length_ += length; } @@ -149,7 +153,9 @@ void ArrayBuilder::UnsafeSetNotNull(int64_t length) { // Fill up the bytes until we have a byte alignment int64_t pad_to_byte = std::min<int64_t>(8 - (length_ % 8), length); - if (pad_to_byte == 8) { pad_to_byte = 0; } + if (pad_to_byte == 8) { + pad_to_byte = 0; + } for (int64_t i = length_; i < length_ + pad_to_byte; ++i) { BitUtil::SetBit(null_bitmap_data_, i); } @@ -157,7 +163,7 @@ void ArrayBuilder::UnsafeSetNotNull(int64_t length) { // Fast bitsetting int64_t fast_length = (length - pad_to_byte) / 8; memset(null_bitmap_data_ + ((length_ + pad_to_byte) / 8), 0xFF, - static_cast<size_t>(fast_length)); + static_cast<size_t>(fast_length)); // Trailing bytes for (int64_t i = length_ + pad_to_byte + (fast_length * 8); i < new_length; ++i) { @@ -184,7 +190,9 @@ Status PrimitiveBuilder<T>::Init(int64_t capacity) { template <typename T> Status PrimitiveBuilder<T>::Resize(int64_t capacity) { // XXX: Set floor size for now - if (capacity < kMinBuilderCapacity) { capacity = kMinBuilderCapacity; } + if (capacity < kMinBuilderCapacity) { + capacity = kMinBuilderCapacity; + } if (capacity_ == 0) { RETURN_NOT_OK(Init(capacity)); @@ -195,20 +203,20 @@ Status PrimitiveBuilder<T>::Resize(int64_t capacity) { RETURN_NOT_OK(data_->Resize(new_bytes)); raw_data_ = reinterpret_cast<value_type*>(data_->mutable_data()); // TODO(emkornfield) valgrind complains without this - memset( - data_->mutable_data() + old_bytes, 0, static_cast<size_t>(new_bytes - old_bytes)); + memset(data_->mutable_data() + old_bytes, 0, + static_cast<size_t>(new_bytes - old_bytes)); } return Status::OK(); } template <typename T> -Status PrimitiveBuilder<T>::Append( - const value_type* values, int64_t length, const uint8_t* valid_bytes) { +Status PrimitiveBuilder<T>::Append(const value_type* values, int64_t length, + const uint8_t* valid_bytes) { RETURN_NOT_OK(Reserve(length)); if (length > 0) { std::memcpy(raw_data_ + length_, values, - static_cast<std::size_t>(TypeTraits<T>::bytes_required(length))); + static_cast<std::size_t>(TypeTraits<T>::bytes_required(length))); } // length_ is update by these @@ -224,8 +232,8 @@ Status PrimitiveBuilder<T>::Finish(std::shared_ptr<Array>* out) { // Trim buffers RETURN_NOT_OK(data_->Resize(bytes_required)); } - *out = std::make_shared<typename TypeTraits<T>::ArrayType>( - type_, length_, data_, null_bitmap_, null_count_); + *out = std::make_shared<typename TypeTraits<T>::ArrayType>(type_, length_, data_, + null_bitmap_, null_count_); data_ = null_bitmap_ = nullptr; capacity_ = length_ = null_count_ = 0; @@ -267,7 +275,9 @@ Status AdaptiveIntBuilderBase::Init(int64_t capacity) { Status AdaptiveIntBuilderBase::Resize(int64_t capacity) { // XXX: Set floor size for now - if (capacity < kMinBuilderCapacity) { capacity = kMinBuilderCapacity; } + if (capacity < kMinBuilderCapacity) { + capacity = kMinBuilderCapacity; + } if (capacity_ == 0) { RETURN_NOT_OK(Init(capacity)); @@ -278,8 +288,8 @@ Status AdaptiveIntBuilderBase::Resize(int64_t capacity) { RETURN_NOT_OK(data_->Resize(new_bytes)); raw_data_ = data_->mutable_data(); // TODO(emkornfield) valgrind complains without this - memset( - data_->mutable_data() + old_bytes, 0, static_cast<size_t>(new_bytes - old_bytes)); + memset(data_->mutable_data() + old_bytes, 0, + static_cast<size_t>(new_bytes - old_bytes)); } return Status::OK(); } @@ -298,16 +308,16 @@ Status AdaptiveIntBuilder::Finish(std::shared_ptr<Array>* out) { std::make_shared<Int8Array>(int8(), length_, data_, null_bitmap_, null_count_); break; case 2: - *out = std::make_shared<Int16Array>( - int16(), length_, data_, null_bitmap_, null_count_); + *out = std::make_shared<Int16Array>(int16(), length_, data_, null_bitmap_, + null_count_); break; case 4: - *out = std::make_shared<Int32Array>( - int32(), length_, data_, null_bitmap_, null_count_); + *out = std::make_shared<Int32Array>(int32(), length_, data_, null_bitmap_, + null_count_); break; case 8: - *out = std::make_shared<Int64Array>( - int64(), length_, data_, null_bitmap_, null_count_); + *out = std::make_shared<Int64Array>(int64(), length_, data_, null_bitmap_, + null_count_); break; default: DCHECK(false); @@ -319,8 +329,8 @@ Status AdaptiveIntBuilder::Finish(std::shared_ptr<Array>* out) { return Status::OK(); } -Status AdaptiveIntBuilder::Append( - const int64_t* values, int64_t length, const uint8_t* valid_bytes) { +Status AdaptiveIntBuilder::Append(const int64_t* values, int64_t length, + const uint8_t* valid_bytes) { RETURN_NOT_OK(Reserve(length)); if (length > 0) { @@ -331,13 +341,15 @@ Status AdaptiveIntBuilder::Append( new_int_size = expanded_int_size(values[i], new_int_size); } } - if (new_int_size != int_size_) { RETURN_NOT_OK(ExpandIntSize(new_int_size)); } + if (new_int_size != int_size_) { + RETURN_NOT_OK(ExpandIntSize(new_int_size)); + } } } if (int_size_ == 8) { std::memcpy(reinterpret_cast<int64_t*>(raw_data_) + length_, values, - sizeof(int64_t) * length); + sizeof(int64_t) * length); } else { #ifdef _MSC_VER #pragma warning(push) @@ -348,17 +360,17 @@ Status AdaptiveIntBuilder::Append( case 1: { int8_t* data_ptr = reinterpret_cast<int8_t*>(raw_data_) + length_; std::transform(values, values + length, data_ptr, - [](int64_t x) { return static_cast<int8_t>(x); }); + [](int64_t x) { return static_cast<int8_t>(x); }); } break; case 2: { int16_t* data_ptr = reinterpret_cast<int16_t*>(raw_data_) + length_; std::transform(values, values + length, data_ptr, - [](int64_t x) { return static_cast<int16_t>(x); }); + [](int64_t x) { return static_cast<int16_t>(x); }); } break; case 4: { int32_t* data_ptr = reinterpret_cast<int32_t*>(raw_data_) + length_; std::transform(values, values + length, data_ptr, - [](int64_t x) { return static_cast<int32_t>(x); }); + [](int64_t x) { return static_cast<int32_t>(x); }); } break; default: DCHECK(false); @@ -449,20 +461,20 @@ Status AdaptiveUIntBuilder::Finish(std::shared_ptr<Array>* out) { } switch (int_size_) { case 1: - *out = std::make_shared<UInt8Array>( - uint8(), length_, data_, null_bitmap_, null_count_); + *out = std::make_shared<UInt8Array>(uint8(), length_, data_, null_bitmap_, + null_count_); break; case 2: - *out = std::make_shared<UInt16Array>( - uint16(), length_, data_, null_bitmap_, null_count_); + *out = std::make_shared<UInt16Array>(uint16(), length_, data_, null_bitmap_, + null_count_); break; case 4: - *out = std::make_shared<UInt32Array>( - uint32(), length_, data_, null_bitmap_, null_count_); + *out = std::make_shared<UInt32Array>(uint32(), length_, data_, null_bitmap_, + null_count_); break; case 8: - *out = std::make_shared<UInt64Array>( - uint64(), length_, data_, null_bitmap_, null_count_); + *out = std::make_shared<UInt64Array>(uint64(), length_, data_, null_bitmap_, + null_count_); break; default: DCHECK(false); @@ -474,8 +486,8 @@ Status AdaptiveUIntBuilder::Finish(std::shared_ptr<Array>* out) { return Status::OK(); } -Status AdaptiveUIntBuilder::Append( - const uint64_t* values, int64_t length, const uint8_t* valid_bytes) { +Status AdaptiveUIntBuilder::Append(const uint64_t* values, int64_t length, + const uint8_t* valid_bytes) { RETURN_NOT_OK(Reserve(length)); if (length > 0) { @@ -486,13 +498,15 @@ Status AdaptiveUIntBuilder::Append( new_int_size = expanded_uint_size(values[i], new_int_size); } } - if (new_int_size != int_size_) { RETURN_NOT_OK(ExpandIntSize(new_int_size)); } + if (new_int_size != int_size_) { + RETURN_NOT_OK(ExpandIntSize(new_int_size)); + } } } if (int_size_ == 8) { std::memcpy(reinterpret_cast<uint64_t*>(raw_data_) + length_, values, - sizeof(uint64_t) * length); + sizeof(uint64_t) * length); } else { #ifdef _MSC_VER #pragma warning(push) @@ -503,17 +517,17 @@ Status AdaptiveUIntBuilder::Append( case 1: { uint8_t* data_ptr = reinterpret_cast<uint8_t*>(raw_data_) + length_; std::transform(values, values + length, data_ptr, - [](uint64_t x) { return static_cast<uint8_t>(x); }); + [](uint64_t x) { return static_cast<uint8_t>(x); }); } break; case 2: { uint16_t* data_ptr = reinterpret_cast<uint16_t*>(raw_data_) + length_; std::transform(values, values + length, data_ptr, - [](uint64_t x) { return static_cast<uint16_t>(x); }); + [](uint64_t x) { return static_cast<uint16_t>(x); }); } break; case 4: { uint32_t* data_ptr = reinterpret_cast<uint32_t*>(raw_data_) + length_; std::transform(values, values + length, data_ptr, - [](uint64_t x) { return static_cast<uint32_t>(x); }); + [](uint64_t x) { return static_cast<uint32_t>(x); }); } break; default: DCHECK(false); @@ -616,7 +630,9 @@ Status BooleanBuilder::Init(int64_t capacity) { Status BooleanBuilder::Resize(int64_t capacity) { // XXX: Set floor size for now - if (capacity < kMinBuilderCapacity) { capacity = kMinBuilderCapacity; } + if (capacity < kMinBuilderCapacity) { + capacity = kMinBuilderCapacity; + } if (capacity_ == 0) { RETURN_NOT_OK(Init(capacity)); @@ -627,8 +643,8 @@ Status BooleanBuilder::Resize(int64_t capacity) { RETURN_NOT_OK(data_->Resize(new_bytes)); raw_data_ = reinterpret_cast<uint8_t*>(data_->mutable_data()); - memset( - data_->mutable_data() + old_bytes, 0, static_cast<size_t>(new_bytes - old_bytes)); + memset(data_->mutable_data() + old_bytes, 0, + static_cast<size_t>(new_bytes - old_bytes)); } return Status::OK(); } @@ -647,8 +663,8 @@ Status BooleanBuilder::Finish(std::shared_ptr<Array>* out) { return Status::OK(); } -Status BooleanBuilder::Append( - const uint8_t* values, int64_t length, const uint8_t* valid_bytes) { +Status BooleanBuilder::Append(const uint8_t* values, int64_t length, + const uint8_t* valid_bytes) { RETURN_NOT_OK(Reserve(length)); for (int64_t i = 0; i < length; ++i) { @@ -673,14 +689,16 @@ Status BooleanBuilder::Append( // DictionaryBuilder template <typename T> -DictionaryBuilder<T>::DictionaryBuilder( - MemoryPool* pool, const std::shared_ptr<DataType>& type) +DictionaryBuilder<T>::DictionaryBuilder(MemoryPool* pool, + const std::shared_ptr<DataType>& type) : ArrayBuilder(pool, type), hash_table_(new PoolBuffer(pool)), hash_slots_(nullptr), dict_builder_(pool, type), values_builder_(pool) { - if (!::arrow::CpuInfo::initialized()) { ::arrow::CpuInfo::Init(); } + if (!::arrow::CpuInfo::initialized()) { + ::arrow::CpuInfo::Init(); + } } template <typename T> @@ -699,7 +717,9 @@ Status DictionaryBuilder<T>::Init(int64_t elements) { template <typename T> Status DictionaryBuilder<T>::Resize(int64_t capacity) { - if (capacity < kMinBuilderCapacity) { capacity = kMinBuilderCapacity; } + if (capacity < kMinBuilderCapacity) { + capacity = kMinBuilderCapacity; + } if (capacity_ == 0) { return Init(capacity); @@ -732,7 +752,9 @@ Status DictionaryBuilder<T>::Append(const Scalar& value) { while (kHashSlotEmpty != index && SlotDifferent(index, value)) { // Linear probing ++j; - if (j == hash_table_size_) { j = 0; } + if (j == hash_table_size_) { + j = 0; + } index = hash_slots_[j]; } @@ -784,7 +806,9 @@ Status DictionaryBuilder<T>::DoubleTableSize() { for (int i = 0; i < hash_table_size_; ++i) { hash_slot_t index = hash_slots_[i]; - if (index == kHashSlotEmpty) { continue; } + if (index == kHashSlotEmpty) { + continue; + } // Compute the hash value mod the new table size to start looking for an // empty slot @@ -796,7 +820,9 @@ Status DictionaryBuilder<T>::DoubleTableSize() { while (kHashSlotEmpty != slot && SlotDifferent(slot, value)) { ++j; - if (j == new_size) { j = 0; } + if (j == new_size) { + j = 0; + } slot = new_hash_slots[j]; } @@ -870,8 +896,8 @@ Status DictionaryBuilder<T>::AppendDictionary(const Scalar& value) { } \ \ template <> \ - bool DictionaryBuilder<Type>::SlotDifferent( \ - hash_slot_t index, const internal::WrappedBinary& value) { \ + bool DictionaryBuilder<Type>::SlotDifferent(hash_slot_t index, \ + const internal::WrappedBinary& value) { \ int32_t other_length; \ const uint8_t* other_value = \ dict_builder_.GetValue(static_cast<int64_t>(index), &other_length); \ @@ -951,7 +977,9 @@ Status DecimalBuilder::Init(int64_t capacity) { Status DecimalBuilder::Resize(int64_t capacity) { int64_t old_bytes = null_bitmap_ != nullptr ? null_bitmap_->size() : 0; - if (sign_bitmap_ == nullptr) { return Init(capacity); } + if (sign_bitmap_ == nullptr) { + return Init(capacity); + } RETURN_NOT_OK(FixedSizeBinaryBuilder::Resize(capacity)); if (byte_width_ == 16) { @@ -962,7 +990,7 @@ Status DecimalBuilder::Resize(int64_t capacity) { // The buffer might be overpadded to deal with padding according to the spec if (old_bytes < new_bytes) { memset(sign_bitmap_data_ + old_bytes, 0, - static_cast<size_t>(sign_bitmap_->capacity() - old_bytes)); + static_cast<size_t>(sign_bitmap_->capacity() - old_bytes)); } } return Status::OK(); @@ -973,8 +1001,8 @@ Status DecimalBuilder::Finish(std::shared_ptr<Array>* out) { RETURN_NOT_OK(byte_builder_.Finish(&data)); /// TODO(phillipc): not sure where to get the offset argument here - *out = std::make_shared<DecimalArray>( - type_, length_, data, null_bitmap_, null_count_, 0, sign_bitmap_); + *out = std::make_shared<DecimalArray>(type_, length_, data, null_bitmap_, null_count_, + 0, sign_bitmap_); return Status::OK(); } @@ -982,15 +1010,15 @@ Status DecimalBuilder::Finish(std::shared_ptr<Array>* out) { // ListBuilder ListBuilder::ListBuilder(MemoryPool* pool, std::unique_ptr<ArrayBuilder> value_builder, - const std::shared_ptr<DataType>& type) - : ArrayBuilder( - pool, type ? type : std::static_pointer_cast<DataType>( - std::make_shared<ListType>(value_builder->type()))), + const std::shared_ptr<DataType>& type) + : ArrayBuilder(pool, + type ? type : std::static_pointer_cast<DataType>( + std::make_shared<ListType>(value_builder->type()))), offsets_builder_(pool), value_builder_(std::move(value_builder)) {} -Status ListBuilder::Append( - const int32_t* offsets, int64_t length, const uint8_t* valid_bytes) { +Status ListBuilder::Append(const int32_t* offsets, int64_t length, + const uint8_t* valid_bytes) { RETURN_NOT_OK(Reserve(length)); UnsafeAppendToBitmap(valid_bytes, length); offsets_builder_.UnsafeAppend(offsets, length); @@ -1035,10 +1063,12 @@ Status ListBuilder::Finish(std::shared_ptr<Array>* out) { RETURN_NOT_OK(offsets_builder_.Finish(&offsets)); std::shared_ptr<Array> items = values_; - if (!items) { RETURN_NOT_OK(value_builder_->Finish(&items)); } + if (!items) { + RETURN_NOT_OK(value_builder_->Finish(&items)); + } - *out = std::make_shared<ListArray>( - type_, length_, offsets, items, null_bitmap_, null_count_); + *out = std::make_shared<ListArray>(type_, length_, offsets, items, null_bitmap_, + null_count_); Reset(); return Status::OK(); @@ -1111,8 +1141,8 @@ Status BinaryBuilder::FinishInternal(std::shared_ptr<internal::ArrayData>* out) RETURN_NOT_OK(value_data_builder_.Finish(&value_data)); BufferVector buffers = {null_bitmap_, offsets, value_data}; - *out = std::make_shared<internal::ArrayData>( - type_, length_, std::move(buffers), null_count_, 0); + *out = std::make_shared<internal::ArrayData>(type_, length_, std::move(buffers), + null_count_, 0); return Status::OK(); } @@ -1154,8 +1184,8 @@ Status StringBuilder::Finish(std::shared_ptr<Array>* out) { // ---------------------------------------------------------------------- // Fixed width binary -FixedSizeBinaryBuilder::FixedSizeBinaryBuilder( - MemoryPool* pool, const std::shared_ptr<DataType>& type) +FixedSizeBinaryBuilder::FixedSizeBinaryBuilder(MemoryPool* pool, + const std::shared_ptr<DataType>& type) : ArrayBuilder(pool, type), byte_width_(static_cast<const FixedSizeBinaryType&>(*type).byte_width()), byte_builder_(pool) {} @@ -1166,8 +1196,8 @@ Status FixedSizeBinaryBuilder::Append(const uint8_t* value) { return byte_builder_.Append(value, byte_width_); } -Status FixedSizeBinaryBuilder::Append( - const uint8_t* data, int64_t length, const uint8_t* valid_bytes) { +Status FixedSizeBinaryBuilder::Append(const uint8_t* data, int64_t length, + const uint8_t* valid_bytes) { RETURN_NOT_OK(Reserve(length)); UnsafeAppendToBitmap(valid_bytes, length); return byte_builder_.Append(data, length * byte_width_); @@ -1196,8 +1226,8 @@ Status FixedSizeBinaryBuilder::Resize(int64_t capacity) { Status FixedSizeBinaryBuilder::Finish(std::shared_ptr<Array>* out) { std::shared_ptr<Buffer> data; RETURN_NOT_OK(byte_builder_.Finish(&data)); - *out = std::make_shared<FixedSizeBinaryArray>( - type_, length_, data, null_bitmap_, null_count_); + *out = std::make_shared<FixedSizeBinaryArray>(type_, length_, data, null_bitmap_, + null_count_); return Status::OK(); } @@ -1205,7 +1235,7 @@ Status FixedSizeBinaryBuilder::Finish(std::shared_ptr<Array>* out) { // Struct StructBuilder::StructBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type, - std::vector<std::unique_ptr<ArrayBuilder>>&& field_builders) + std::vector<std::unique_ptr<ArrayBuilder>>&& field_builders) : ArrayBuilder(pool, type) { field_builders_ = std::move(field_builders); } @@ -1237,7 +1267,7 @@ Status StructBuilder::Finish(std::shared_ptr<Array>* out) { // // TODO(wesm): come up with a less monolithic strategy Status MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type, - std::unique_ptr<ArrayBuilder>* out) { + std::unique_ptr<ArrayBuilder>* out) { switch (type->id()) { BUILDER_CASE(UINT8, UInt8Builder); BUILDER_CASE(INT8, Int8Builder); @@ -1292,7 +1322,7 @@ Status MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type, return Status::OK(); Status MakeDictionaryBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type, - std::shared_ptr<ArrayBuilder>* out) { + std::shared_ptr<ArrayBuilder>* out) { switch (type->id()) { DICTIONARY_BUILDER_CASE(UINT8, DictionaryBuilder<UInt8Type>); DICTIONARY_BUILDER_CASE(INT8, DictionaryBuilder<Int8Type>);
http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/builder.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h index 065e115..080a329 100644 --- a/cpp/src/arrow/builder.h +++ b/cpp/src/arrow/builder.h @@ -186,8 +186,8 @@ class ARROW_EXPORT PrimitiveBuilder : public ArrayBuilder { /// /// If passed, valid_bytes is of equal length to values, and any zero byte /// will be considered as a null for that slot - Status Append( - const value_type* values, int64_t length, const uint8_t* valid_bytes = nullptr); + Status Append(const value_type* values, int64_t length, + const uint8_t* valid_bytes = nullptr); Status Finish(std::shared_ptr<Array>* out) override; Status Init(int64_t capacity) override; @@ -298,15 +298,15 @@ class ARROW_EXPORT AdaptiveIntBuilderBase : public ArrayBuilder { inline uint8_t expanded_uint_size(uint64_t val, uint8_t current_int_size) { if (current_int_size == 8 || (current_int_size < 8 && - (val > static_cast<uint64_t>(std::numeric_limits<uint32_t>::max())))) { + (val > static_cast<uint64_t>(std::numeric_limits<uint32_t>::max())))) { return 8; } else if (current_int_size == 4 || (current_int_size < 4 && - (val > static_cast<uint64_t>(std::numeric_limits<uint16_t>::max())))) { + (val > static_cast<uint64_t>(std::numeric_limits<uint16_t>::max())))) { return 4; } else if (current_int_size == 2 || (current_int_size == 1 && - (val > static_cast<uint64_t>(std::numeric_limits<uint8_t>::max())))) { + (val > static_cast<uint64_t>(std::numeric_limits<uint8_t>::max())))) { return 2; } else { return 1; @@ -325,7 +325,9 @@ class ARROW_EXPORT AdaptiveUIntBuilder : public AdaptiveIntBuilderBase { BitUtil::SetBit(null_bitmap_data_, length_); uint8_t new_int_size = expanded_uint_size(val, int_size_); - if (new_int_size != int_size_) { RETURN_NOT_OK(ExpandIntSize(new_int_size)); } + if (new_int_size != int_size_) { + RETURN_NOT_OK(ExpandIntSize(new_int_size)); + } switch (int_size_) { case 1: @@ -350,8 +352,8 @@ class ARROW_EXPORT AdaptiveUIntBuilder : public AdaptiveIntBuilderBase { /// /// If passed, valid_bytes is of equal length to values, and any zero byte /// will be considered as a null for that slot - Status Append( - const uint64_t* values, int64_t length, const uint8_t* valid_bytes = nullptr); + Status Append(const uint64_t* values, int64_t length, + const uint8_t* valid_bytes = nullptr); Status ExpandIntSize(uint8_t new_int_size); Status Finish(std::shared_ptr<Array>* out) override; @@ -374,18 +376,18 @@ class ARROW_EXPORT AdaptiveUIntBuilder : public AdaptiveIntBuilderBase { inline uint8_t expanded_int_size(int64_t val, uint8_t current_int_size) { if (current_int_size == 8 || (current_int_size < 8 && - (val > static_cast<int64_t>(std::numeric_limits<int32_t>::max()) || - val < static_cast<int64_t>(std::numeric_limits<int32_t>::min())))) { + (val > static_cast<int64_t>(std::numeric_limits<int32_t>::max()) || + val < static_cast<int64_t>(std::numeric_limits<int32_t>::min())))) { return 8; } else if (current_int_size == 4 || (current_int_size < 4 && - (val > static_cast<int64_t>(std::numeric_limits<int16_t>::max()) || - val < static_cast<int64_t>(std::numeric_limits<int16_t>::min())))) { + (val > static_cast<int64_t>(std::numeric_limits<int16_t>::max()) || + val < static_cast<int64_t>(std::numeric_limits<int16_t>::min())))) { return 4; } else if (current_int_size == 2 || (current_int_size == 1 && - (val > static_cast<int64_t>(std::numeric_limits<int8_t>::max()) || - val < static_cast<int64_t>(std::numeric_limits<int8_t>::min())))) { + (val > static_cast<int64_t>(std::numeric_limits<int8_t>::max()) || + val < static_cast<int64_t>(std::numeric_limits<int8_t>::min())))) { return 2; } else { return 1; @@ -404,7 +406,9 @@ class ARROW_EXPORT AdaptiveIntBuilder : public AdaptiveIntBuilderBase { BitUtil::SetBit(null_bitmap_data_, length_); uint8_t new_int_size = expanded_int_size(val, int_size_); - if (new_int_size != int_size_) { RETURN_NOT_OK(ExpandIntSize(new_int_size)); } + if (new_int_size != int_size_) { + RETURN_NOT_OK(ExpandIntSize(new_int_size)); + } switch (int_size_) { case 1: @@ -429,8 +433,8 @@ class ARROW_EXPORT AdaptiveIntBuilder : public AdaptiveIntBuilderBase { /// /// If passed, valid_bytes is of equal length to values, and any zero byte /// will be considered as a null for that slot - Status Append( - const int64_t* values, int64_t length, const uint8_t* valid_bytes = nullptr); + Status Append(const int64_t* values, int64_t length, + const uint8_t* valid_bytes = nullptr); Status ExpandIntSize(uint8_t new_int_size); Status Finish(std::shared_ptr<Array>* out) override; @@ -490,8 +494,8 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder { /// /// If passed, valid_bytes is of equal length to values, and any zero byte /// will be considered as a null for that slot - Status Append( - const uint8_t* values, int64_t length, const uint8_t* valid_bytes = nullptr); + Status Append(const uint8_t* values, int64_t length, + const uint8_t* valid_bytes = nullptr); Status Finish(std::shared_ptr<Array>* out) override; Status Init(int64_t capacity) override; @@ -526,7 +530,7 @@ class ARROW_EXPORT ListBuilder : public ArrayBuilder { /// Use this constructor to incrementally build the value array along with offsets and /// null bitmap. ListBuilder(MemoryPool* pool, std::unique_ptr<ArrayBuilder> value_builder, - const std::shared_ptr<DataType>& type = nullptr); + const std::shared_ptr<DataType>& type = nullptr); Status Init(int64_t elements) override; Status Resize(int64_t capacity) override; @@ -536,8 +540,8 @@ class ARROW_EXPORT ListBuilder : public ArrayBuilder { /// /// If passed, valid_bytes is of equal length to values, and any zero byte /// will be considered as a null for that slot - Status Append( - const int32_t* offsets, int64_t length, const uint8_t* valid_bytes = nullptr); + Status Append(const int32_t* offsets, int64_t length, + const uint8_t* valid_bytes = nullptr); /// \brief Start a new variable-length list slot /// @@ -626,8 +630,8 @@ class ARROW_EXPORT FixedSizeBinaryBuilder : public ArrayBuilder { FixedSizeBinaryBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type); Status Append(const uint8_t* value); - Status Append( - const uint8_t* data, int64_t length, const uint8_t* valid_bytes = nullptr); + Status Append(const uint8_t* data, int64_t length, + const uint8_t* valid_bytes = nullptr); Status Append(const std::string& value); Status AppendNull(); @@ -672,7 +676,7 @@ class ARROW_EXPORT DecimalBuilder : public FixedSizeBinaryBuilder { class ARROW_EXPORT StructBuilder : public ArrayBuilder { public: StructBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type, - std::vector<std::unique_ptr<ArrayBuilder>>&& field_builders); + std::vector<std::unique_ptr<ArrayBuilder>>&& field_builders); Status Finish(std::shared_ptr<Array>* out) override; @@ -808,7 +812,7 @@ class ARROW_EXPORT BinaryDictionaryBuilder : public DictionaryBuilder<BinaryType Status Append(const std::string& value) { return Append(internal::WrappedBinary(reinterpret_cast<const uint8_t*>(value.c_str()), - static_cast<int32_t>(value.size()))); + static_cast<int32_t>(value.size()))); } }; @@ -829,7 +833,7 @@ class ARROW_EXPORT StringDictionaryBuilder : public DictionaryBuilder<StringType Status Append(const std::string& value) { return Append(internal::WrappedBinary(reinterpret_cast<const uint8_t*>(value.c_str()), - static_cast<int32_t>(value.size()))); + static_cast<int32_t>(value.size()))); } }; @@ -837,10 +841,11 @@ class ARROW_EXPORT StringDictionaryBuilder : public DictionaryBuilder<StringType // Helper functions Status ARROW_EXPORT MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type, - std::unique_ptr<ArrayBuilder>* out); + std::unique_ptr<ArrayBuilder>* out); Status ARROW_EXPORT MakeDictionaryBuilder(MemoryPool* pool, - const std::shared_ptr<DataType>& type, std::shared_ptr<ArrayBuilder>* out); + const std::shared_ptr<DataType>& type, + std::shared_ptr<ArrayBuilder>* out); } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/compare.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc index 1465e0b..da10c2a 100644 --- a/cpp/src/arrow/compare.cc +++ b/cpp/src/arrow/compare.cc @@ -41,7 +41,7 @@ namespace arrow { class RangeEqualsVisitor { public: RangeEqualsVisitor(const Array& right, int64_t left_start_idx, int64_t left_end_idx, - int64_t right_start_idx) + int64_t right_start_idx) : right_(right), left_start_idx_(left_start_idx), left_end_idx_(left_end_idx), @@ -71,7 +71,9 @@ class RangeEqualsVisitor { for (int64_t i = left_start_idx_, o_i = right_start_idx_; i < left_end_idx_; ++i, ++o_i) { const bool is_null = left.IsNull(i); - if (is_null != right.IsNull(o_i)) { return false; } + if (is_null != right.IsNull(o_i)) { + return false; + } if (is_null) continue; const int32_t begin_offset = left.value_offset(i); const int32_t end_offset = left.value_offset(i + 1); @@ -84,8 +86,8 @@ class RangeEqualsVisitor { if (end_offset - begin_offset > 0 && std::memcmp(left.value_data()->data() + begin_offset, - right.value_data()->data() + right_begin_offset, - static_cast<size_t>(end_offset - begin_offset))) { + right.value_data()->data() + right_begin_offset, + static_cast<size_t>(end_offset - begin_offset))) { return false; } } @@ -101,7 +103,9 @@ class RangeEqualsVisitor { for (int64_t i = left_start_idx_, o_i = right_start_idx_; i < left_end_idx_; ++i, ++o_i) { const bool is_null = left.IsNull(i); - if (is_null != right.IsNull(o_i)) { return false; } + if (is_null != right.IsNull(o_i)) { + return false; + } if (is_null) continue; const int32_t begin_offset = left.value_offset(i); const int32_t end_offset = left.value_offset(i + 1); @@ -111,8 +115,8 @@ class RangeEqualsVisitor { if (end_offset - begin_offset != right_end_offset - right_begin_offset) { return false; } - if (!left_values->RangeEquals( - begin_offset, end_offset, right_begin_offset, right_values)) { + if (!left_values->RangeEquals(begin_offset, end_offset, right_begin_offset, + right_values)) { return false; } } @@ -124,7 +128,9 @@ class RangeEqualsVisitor { bool equal_fields = true; for (int64_t i = left_start_idx_, o_i = right_start_idx_; i < left_end_idx_; ++i, ++o_i) { - if (left.IsNull(i) != right.IsNull(o_i)) { return false; } + if (left.IsNull(i) != right.IsNull(o_i)) { + return false; + } if (left.IsNull(i)) continue; for (int j = 0; j < left.num_fields(); ++j) { // TODO: really we should be comparing stretches of non-null data rather @@ -132,9 +138,11 @@ class RangeEqualsVisitor { const int64_t left_abs_index = i + left.offset(); const int64_t right_abs_index = o_i + right.offset(); - equal_fields = left.field(j)->RangeEquals( - left_abs_index, left_abs_index + 1, right_abs_index, right.field(j)); - if (!equal_fields) { return false; } + equal_fields = left.field(j)->RangeEquals(left_abs_index, left_abs_index + 1, + right_abs_index, right.field(j)); + if (!equal_fields) { + return false; + } } } return true; @@ -144,7 +152,9 @@ class RangeEqualsVisitor { const auto& right = static_cast<const UnionArray&>(right_); const UnionMode union_mode = left.mode(); - if (union_mode != right.mode()) { return false; } + if (union_mode != right.mode()) { + return false; + } const auto& left_type = static_cast<const UnionType&>(*left.type()); @@ -154,7 +164,9 @@ class RangeEqualsVisitor { const std::vector<uint8_t>& type_codes = left_type.type_codes(); for (size_t i = 0; i < type_codes.size(); ++i) { const uint8_t code = type_codes[i]; - if (code > max_code) { max_code = code; } + if (code > max_code) { + max_code = code; + } } // Store mapping in a vector for constant time lookups @@ -169,9 +181,13 @@ class RangeEqualsVisitor { uint8_t id, child_num; for (int64_t i = left_start_idx_, o_i = right_start_idx_; i < left_end_idx_; ++i, ++o_i) { - if (left.IsNull(i) != right.IsNull(o_i)) { return false; } + if (left.IsNull(i) != right.IsNull(o_i)) { + return false; + } if (left.IsNull(i)) continue; - if (left_ids[i] != right_ids[o_i]) { return false; } + if (left_ids[i] != right_ids[o_i]) { + return false; + } id = left_ids[i]; child_num = type_id_to_child_num[id]; @@ -183,14 +199,15 @@ class RangeEqualsVisitor { // rather than looking at one value at a time. if (union_mode == UnionMode::SPARSE) { if (!left.child(child_num)->RangeEquals(left_abs_index, left_abs_index + 1, - right_abs_index, right.child(child_num))) { + right_abs_index, + right.child(child_num))) { return false; } } else { const int32_t offset = left.raw_value_offsets()[i]; const int32_t o_offset = right.raw_value_offsets()[o_i]; - if (!left.child(child_num)->RangeEquals( - offset, offset + 1, o_offset, right.child(child_num))) { + if (!left.child(child_num)->RangeEquals(offset, offset + 1, o_offset, + right.child(child_num))) { return false; } } @@ -211,9 +228,13 @@ class RangeEqualsVisitor { const uint8_t* left_data = nullptr; const uint8_t* right_data = nullptr; - if (left.values()) { left_data = left.raw_values() + left.offset() * width; } + if (left.values()) { + left_data = left.raw_values() + left.offset() * width; + } - if (right.values()) { right_data = right.raw_values() + right.offset() * width; } + if (right.values()) { + right_data = right.raw_values() + right.offset() * width; + } for (int64_t i = left_start_idx_, o_i = right_start_idx_; i < left_end_idx_; ++i, ++o_i) { @@ -241,9 +262,13 @@ class RangeEqualsVisitor { const uint8_t* left_data = nullptr; const uint8_t* right_data = nullptr; - if (left.values()) { left_data = left.raw_values() + left.offset() * width; } + if (left.values()) { + left_data = left.raw_values() + left.offset() * width; + } - if (right.values()) { right_data = right.raw_values() + right.offset() * width; } + if (right.values()) { + right_data = right.raw_values() + right.offset() * width; + } for (int64_t i = left_start_idx_, o_i = right_start_idx_; i < left_end_idx_; ++i, ++o_i) { @@ -301,8 +326,8 @@ class RangeEqualsVisitor { result_ = false; return Status::OK(); } - result_ = left.indices()->RangeEquals( - left_start_idx_, left_end_idx_, right_start_idx_, right.indices()); + result_ = left.indices()->RangeEquals(left_start_idx_, left_end_idx_, + right_start_idx_, right.indices()); return Status::OK(); } @@ -324,7 +349,9 @@ static bool IsEqualPrimitive(const PrimitiveArray& left, const PrimitiveArray& r const uint8_t* left_data = nullptr; const uint8_t* right_data = nullptr; - if (left.values()) { left_data = left.values()->data() + left.offset() * byte_width; } + if (left.values()) { + left_data = left.values()->data() + left.offset() * byte_width; + } if (right.values()) { right_data = right.values()->data() + right.offset() * byte_width; } @@ -341,13 +368,13 @@ static bool IsEqualPrimitive(const PrimitiveArray& left, const PrimitiveArray& r return true; } else { return memcmp(left_data, right_data, - static_cast<size_t>(byte_width * left.length())) == 0; + static_cast<size_t>(byte_width * left.length())) == 0; } } template <typename T> -static inline bool CompareBuiltIn( - const Array& left, const Array& right, const T* ldata, const T* rdata) { +static inline bool CompareBuiltIn(const Array& left, const Array& right, const T* ldata, + const T* rdata) { if (left.null_count() > 0) { for (int64_t i = 0; i < left.length(); ++i) { if (left.IsNull(i) != right.IsNull(i)) { @@ -369,17 +396,21 @@ static bool IsEqualDecimal(const DecimalArray& left, const DecimalArray& right) const uint8_t* left_data = nullptr; const uint8_t* right_data = nullptr; - if (left.values()) { left_data = left.values()->data(); } - if (right.values()) { right_data = right.values()->data(); } + if (left.values()) { + left_data = left.values()->data(); + } + if (right.values()) { + right_data = right.values()->data(); + } const int32_t byte_width = left.byte_width(); if (byte_width == 4) { - return CompareBuiltIn<int32_t>(left, right, - reinterpret_cast<const int32_t*>(left_data) + loffset, + return CompareBuiltIn<int32_t>( + left, right, reinterpret_cast<const int32_t*>(left_data) + loffset, reinterpret_cast<const int32_t*>(right_data) + roffset); } else if (byte_width == 8) { - return CompareBuiltIn<int64_t>(left, right, - reinterpret_cast<const int64_t*>(left_data) + loffset, + return CompareBuiltIn<int64_t>( + left, right, reinterpret_cast<const int64_t*>(left_data) + loffset, reinterpret_cast<const int64_t*>(right_data) + roffset); } else { // 128-bit @@ -387,8 +418,12 @@ static bool IsEqualDecimal(const DecimalArray& left, const DecimalArray& right) // Must also compare sign bitmap const uint8_t* left_sign = nullptr; const uint8_t* right_sign = nullptr; - if (left.sign_bitmap()) { left_sign = left.sign_bitmap()->data(); } - if (right.sign_bitmap()) { right_sign = right.sign_bitmap()->data(); } + if (left.sign_bitmap()) { + left_sign = left.sign_bitmap()->data(); + } + if (right.sign_bitmap()) { + right_sign = right.sign_bitmap()->data(); + } for (int64_t i = 0; i < left.length(); ++i) { bool left_null = left.IsNull(i); @@ -434,7 +469,7 @@ class ArrayEqualsVisitor : public RangeEqualsVisitor { result_ = true; } else { result_ = BitmapEquals(left.values()->data(), left.offset(), right.values()->data(), - right.offset(), left.length()); + right.offset(), left.length()); } return Status::OK(); } @@ -442,7 +477,7 @@ class ArrayEqualsVisitor : public RangeEqualsVisitor { template <typename T> typename std::enable_if<std::is_base_of<PrimitiveArray, T>::value && !std::is_base_of<BooleanArray, T>::value, - Status>::type + Status>::type Visit(const T& left) { result_ = IsEqualPrimitive(left, static_cast<const PrimitiveArray&>(right_)); return Status::OK(); @@ -458,8 +493,8 @@ class ArrayEqualsVisitor : public RangeEqualsVisitor { const auto& right = static_cast<const ArrayType&>(right_); if (left.offset() == 0 && right.offset() == 0) { - return left.value_offsets()->Equals( - *right.value_offsets(), (left.length() + 1) * sizeof(int32_t)); + return left.value_offsets()->Equals(*right.value_offsets(), + (left.length() + 1) * sizeof(int32_t)); } else { // One of the arrays is sliced; logic is more complicated because the // value offsets are not both 0-based @@ -482,10 +517,16 @@ class ArrayEqualsVisitor : public RangeEqualsVisitor { const auto& right = static_cast<const BinaryArray&>(right_); bool equal_offsets = ValueOffsetsEqual<BinaryArray>(left); - if (!equal_offsets) { return false; } + if (!equal_offsets) { + return false; + } - if (!left.value_data() && !(right.value_data())) { return true; } - if (left.value_offset(left.length()) == 0) { return true; } + if (!left.value_data() && !(right.value_data())) { + return true; + } + if (left.value_offset(left.length()) == 0) { + return true; + } const uint8_t* left_data = left.value_data()->data(); const uint8_t* right_data = right.value_data()->data(); @@ -493,23 +534,25 @@ class ArrayEqualsVisitor : public RangeEqualsVisitor { if (left.null_count() == 0) { // Fast path for null count 0, single memcmp if (left.offset() == 0 && right.offset() == 0) { - return std::memcmp( - left_data, right_data, left.raw_value_offsets()[left.length()]) == 0; + return std::memcmp(left_data, right_data, + left.raw_value_offsets()[left.length()]) == 0; } else { const int64_t total_bytes = left.value_offset(left.length()) - left.value_offset(0); return std::memcmp(left_data + left.value_offset(0), - right_data + right.value_offset(0), - static_cast<size_t>(total_bytes)) == 0; + right_data + right.value_offset(0), + static_cast<size_t>(total_bytes)) == 0; } } else { // ARROW-537: Only compare data in non-null slots const int32_t* left_offsets = left.raw_value_offsets(); const int32_t* right_offsets = right.raw_value_offsets(); for (int64_t i = 0; i < left.length(); ++i) { - if (left.IsNull(i)) { continue; } + if (left.IsNull(i)) { + continue; + } if (std::memcmp(left_data + left_offsets[i], right_data + right_offsets[i], - left.value_length(i))) { + left.value_length(i))) { return false; } } @@ -530,8 +573,9 @@ class ArrayEqualsVisitor : public RangeEqualsVisitor { return Status::OK(); } - result_ = left.values()->RangeEquals(left.value_offset(0), - left.value_offset(left.length()), right.value_offset(0), right.values()); + result_ = + left.values()->RangeEquals(left.value_offset(0), left.value_offset(left.length()), + right.value_offset(0), right.values()); return Status::OK(); } @@ -547,15 +591,15 @@ class ArrayEqualsVisitor : public RangeEqualsVisitor { template <typename T> typename std::enable_if<std::is_base_of<NestedType, typename T::TypeClass>::value, - Status>::type + Status>::type Visit(const T& left) { return RangeEqualsVisitor::Visit(left); } }; template <typename TYPE> -inline bool FloatingApproxEquals( - const NumericArray<TYPE>& left, const NumericArray<TYPE>& right) { +inline bool FloatingApproxEquals(const NumericArray<TYPE>& left, + const NumericArray<TYPE>& right) { using T = typename TYPE::c_type; const T* left_data = left.raw_values(); @@ -566,11 +610,15 @@ inline bool FloatingApproxEquals( if (left.null_count() > 0) { for (int64_t i = 0; i < left.length(); ++i) { if (left.IsNull(i)) continue; - if (fabs(left_data[i] - right_data[i]) > EPSILON) { return false; } + if (fabs(left_data[i] - right_data[i]) > EPSILON) { + return false; + } } } else { for (int64_t i = 0; i < left.length(); ++i) { - if (fabs(left_data[i] - right_data[i]) > EPSILON) { return false; } + if (fabs(left_data[i] - right_data[i]) > EPSILON) { + return false; + } } } return true; @@ -601,7 +649,7 @@ static bool BaseDataEquals(const Array& left, const Array& right) { } if (left.null_count() > 0 && left.null_count() < left.length()) { return BitmapEquals(left.null_bitmap()->data(), left.offset(), - right.null_bitmap()->data(), right.offset(), left.length()); + right.null_bitmap()->data(), right.offset(), left.length()); } return true; } @@ -634,7 +682,7 @@ Status ArrayApproxEquals(const Array& left, const Array& right, bool* are_equal) } Status ArrayRangeEquals(const Array& left, const Array& right, int64_t left_start_idx, - int64_t left_end_idx, int64_t right_start_idx, bool* are_equal) { + int64_t left_end_idx, int64_t right_start_idx, bool* are_equal) { if (&left == &right) { *are_equal = true; } else if (left.type_id() != right.type_id()) { @@ -705,7 +753,7 @@ class TypeEqualsVisitor { template <typename T> typename std::enable_if<std::is_base_of<NoExtraMeta, T>::value || std::is_base_of<PrimitiveCType, T>::value, - Status>::type + Status>::type Visit(const T& type) { result_ = true; return Status::OK(); @@ -714,7 +762,7 @@ class TypeEqualsVisitor { template <typename T> typename std::enable_if<std::is_base_of<TimeType, T>::value || std::is_base_of<DateType, T>::value, - Status>::type + Status>::type Visit(const T& left) { const auto& right = static_cast<const T&>(right_); result_ = left.unit() == right.unit(); http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/compare.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/compare.h b/cpp/src/arrow/compare.h index 96a6435..a36b553 100644 --- a/cpp/src/arrow/compare.h +++ b/cpp/src/arrow/compare.h @@ -34,21 +34,22 @@ class Tensor; /// Returns true if the arrays are exactly equal Status ARROW_EXPORT ArrayEquals(const Array& left, const Array& right, bool* are_equal); -Status ARROW_EXPORT TensorEquals( - const Tensor& left, const Tensor& right, bool* are_equal); +Status ARROW_EXPORT TensorEquals(const Tensor& left, const Tensor& right, + bool* are_equal); /// Returns true if the arrays are approximately equal. For non-floating point /// types, this is equivalent to ArrayEquals(left, right) -Status ARROW_EXPORT ArrayApproxEquals( - const Array& left, const Array& right, bool* are_equal); +Status ARROW_EXPORT ArrayApproxEquals(const Array& left, const Array& right, + bool* are_equal); /// Returns true if indicated equal-length segment of arrays is exactly equal Status ARROW_EXPORT ArrayRangeEquals(const Array& left, const Array& right, - int64_t start_idx, int64_t end_idx, int64_t other_start_idx, bool* are_equal); + int64_t start_idx, int64_t end_idx, + int64_t other_start_idx, bool* are_equal); /// Returns true if the type metadata are exactly equal -Status ARROW_EXPORT TypeEquals( - const DataType& left, const DataType& right, bool* are_equal); +Status ARROW_EXPORT TypeEquals(const DataType& left, const DataType& right, + bool* are_equal); } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/io/file.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc index 936655f..82e3ba8 100644 --- a/cpp/src/arrow/io/file.cc +++ b/cpp/src/arrow/io/file.cc @@ -123,8 +123,8 @@ constexpr const char* kRangeExceptionError = "Range exception during wide-char string conversion"; #endif -static inline Status CheckOpenResult( - int ret, int errno_actual, const char* filename, size_t filename_length) { +static inline Status CheckOpenResult(int ret, int errno_actual, const char* filename, + size_t filename_length) { if (ret == -1) { // TODO: errno codes to strings std::stringstream ss; @@ -134,12 +134,14 @@ static inline Status CheckOpenResult( // this requires c++11 std::wstring_convert<std::codecvt_utf8<wchar_t>, wchar_t> converter; - std::wstring wide_string( - reinterpret_cast<const wchar_t*>(filename), filename_length / sizeof(wchar_t)); + std::wstring wide_string(reinterpret_cast<const wchar_t*>(filename), + filename_length / sizeof(wchar_t)); try { std::string byte_string = converter.to_bytes(wide_string); ss << byte_string; - } catch (const std::range_error&) { ss << kRangeExceptionError; } + } catch (const std::range_error&) { + ss << kRangeExceptionError; + } #else ss << filename; #endif @@ -161,7 +163,9 @@ static inline int64_t lseek64_compat(int fd, int64_t pos, int whence) { #if defined(_MSC_VER) static inline Status ConvertToUtf16(const std::string& input, std::wstring* result) { - if (result == nullptr) { return Status::Invalid("Pointer to result is not valid"); } + if (result == nullptr) { + return Status::Invalid("Pointer to result is not valid"); + } if (input.empty()) { *result = std::wstring(); @@ -171,7 +175,9 @@ static inline Status ConvertToUtf16(const std::string& input, std::wstring* resu std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>> utf16_converter; try { *result = utf16_converter.from_bytes(input); - } catch (const std::range_error&) { return Status::Invalid(kRangeExceptionError); } + } catch (const std::range_error&) { + return Status::Invalid(kRangeExceptionError); + } return Status::OK(); } #endif @@ -194,8 +200,8 @@ static inline Status FileOpenReadable(const std::string& filename, int* fd) { return CheckOpenResult(ret, errno_actual, filename.c_str(), filename.size()); } -static inline Status FileOpenWriteable( - const std::string& filename, bool write_only, bool truncate, int* fd) { +static inline Status FileOpenWriteable(const std::string& filename, bool write_only, + bool truncate, int* fd) { int ret; errno_t errno_actual = 0; @@ -205,9 +211,13 @@ static inline Status FileOpenWriteable( int oflag = _O_CREAT | _O_BINARY; int pmode = _S_IWRITE; - if (!write_only) { pmode |= _S_IREAD; } + if (!write_only) { + pmode |= _S_IREAD; + } - if (truncate) { oflag |= _O_TRUNC; } + if (truncate) { + oflag |= _O_TRUNC; + } if (write_only) { oflag |= _O_WRONLY; @@ -221,7 +231,9 @@ static inline Status FileOpenWriteable( #else int oflag = O_CREAT | O_BINARY; - if (truncate) { oflag |= O_TRUNC; } + if (truncate) { + oflag |= O_TRUNC; + } if (write_only) { oflag |= O_WRONLY; @@ -239,7 +251,9 @@ static inline Status FileTell(int fd, int64_t* pos) { #if defined(_MSC_VER) current_pos = _telli64(fd); - if (current_pos == -1) { return Status::IOError("_telli64 failed"); } + if (current_pos == -1) { + return Status::IOError("_telli64 failed"); + } #else current_pos = lseek64_compat(fd, 0, SEEK_CUR); CHECK_LSEEK(current_pos); @@ -255,10 +269,12 @@ static inline Status FileSeek(int fd, int64_t pos) { return Status::OK(); } -static inline Status FileRead( - int fd, uint8_t* buffer, int64_t nbytes, int64_t* bytes_read) { +static inline Status FileRead(int fd, uint8_t* buffer, int64_t nbytes, + int64_t* bytes_read) { #if defined(_MSC_VER) - if (nbytes > INT32_MAX) { return Status::IOError("Unable to read > 2GB blocks yet"); } + if (nbytes > INT32_MAX) { + return Status::IOError("Unable to read > 2GB blocks yet"); + } *bytes_read = static_cast<int64_t>(_read(fd, buffer, static_cast<uint32_t>(nbytes))); #else *bytes_read = static_cast<int64_t>(read(fd, buffer, static_cast<size_t>(nbytes))); @@ -323,7 +339,9 @@ static inline Status FileClose(int fd) { ret = static_cast<int>(close(fd)); #endif - if (ret == -1) { return Status::IOError("error closing file"); } + if (ret == -1) { + return Status::IOError("error closing file"); + } return Status::OK(); } @@ -371,7 +389,9 @@ class OSFile { } Status Seek(int64_t pos) { - if (pos < 0) { return Status::Invalid("Invalid position"); } + if (pos < 0) { + return Status::Invalid("Invalid position"); + } return FileSeek(fd_, pos); } @@ -379,7 +399,9 @@ class OSFile { Status Write(const uint8_t* data, int64_t length) { std::lock_guard<std::mutex> guard(lock_); - if (length < 0) { return Status::IOError("Length must be non-negative"); } + if (length < 0) { + return Status::IOError("Length must be non-negative"); + } return FileWrite(fd_, data, length); } @@ -421,7 +443,9 @@ class ReadableFile::ReadableFileImpl : public OSFile { int64_t bytes_read = 0; RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data())); - if (bytes_read < nbytes) { RETURN_NOT_OK(buffer->Resize(bytes_read)); } + if (bytes_read < nbytes) { + RETURN_NOT_OK(buffer->Resize(bytes_read)); + } *out = buffer; return Status::OK(); } @@ -430,13 +454,9 @@ class ReadableFile::ReadableFileImpl : public OSFile { MemoryPool* pool_; }; -ReadableFile::ReadableFile(MemoryPool* pool) { - impl_.reset(new ReadableFileImpl(pool)); -} +ReadableFile::ReadableFile(MemoryPool* pool) { impl_.reset(new ReadableFileImpl(pool)); } -ReadableFile::~ReadableFile() { - DCHECK(impl_->Close().ok()); -} +ReadableFile::~ReadableFile() { DCHECK(impl_->Close().ok()); } Status ReadableFile::Open(const std::string& path, std::shared_ptr<ReadableFile>* file) { *file = std::shared_ptr<ReadableFile>(new ReadableFile(default_memory_pool())); @@ -444,18 +464,14 @@ Status ReadableFile::Open(const std::string& path, std::shared_ptr<ReadableFile> } Status ReadableFile::Open(const std::string& path, MemoryPool* memory_pool, - std::shared_ptr<ReadableFile>* file) { + std::shared_ptr<ReadableFile>* file) { *file = std::shared_ptr<ReadableFile>(new ReadableFile(memory_pool)); return (*file)->impl_->Open(path); } -Status ReadableFile::Close() { - return impl_->Close(); -} +Status ReadableFile::Close() { return impl_->Close(); } -Status ReadableFile::Tell(int64_t* pos) { - return impl_->Tell(pos); -} +Status ReadableFile::Tell(int64_t* pos) { return impl_->Tell(pos); } Status ReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) { return impl_->Read(nbytes, bytes_read, out); @@ -470,17 +486,11 @@ Status ReadableFile::GetSize(int64_t* size) { return Status::OK(); } -Status ReadableFile::Seek(int64_t pos) { - return impl_->Seek(pos); -} +Status ReadableFile::Seek(int64_t pos) { return impl_->Seek(pos); } -bool ReadableFile::supports_zero_copy() const { - return false; -} +bool ReadableFile::supports_zero_copy() const { return false; } -int ReadableFile::file_descriptor() const { - return impl_->fd(); -} +int ReadableFile::file_descriptor() const { return impl_->fd(); } // ---------------------------------------------------------------------- // FileOutputStream @@ -492,42 +502,34 @@ class FileOutputStream::FileOutputStreamImpl : public OSFile { } }; -FileOutputStream::FileOutputStream() { - impl_.reset(new FileOutputStreamImpl()); -} +FileOutputStream::FileOutputStream() { impl_.reset(new FileOutputStreamImpl()); } FileOutputStream::~FileOutputStream() { // This can fail; better to explicitly call close DCHECK(impl_->Close().ok()); } -Status FileOutputStream::Open( - const std::string& path, std::shared_ptr<FileOutputStream>* file) { +Status FileOutputStream::Open(const std::string& path, + std::shared_ptr<FileOutputStream>* file) { return Open(path, false, file); } -Status FileOutputStream::Open( - const std::string& path, bool append, std::shared_ptr<FileOutputStream>* file) { +Status FileOutputStream::Open(const std::string& path, bool append, + std::shared_ptr<FileOutputStream>* file) { // private ctor *file = std::shared_ptr<FileOutputStream>(new FileOutputStream()); return (*file)->impl_->Open(path, append); } -Status FileOutputStream::Close() { - return impl_->Close(); -} +Status FileOutputStream::Close() { return impl_->Close(); } -Status FileOutputStream::Tell(int64_t* pos) { - return impl_->Tell(pos); -} +Status FileOutputStream::Tell(int64_t* pos) { return impl_->Tell(pos); } Status FileOutputStream::Write(const uint8_t* data, int64_t length) { return impl_->Write(data, length); } -int FileOutputStream::file_descriptor() const { - return impl_->fd(); -} +int FileOutputStream::file_descriptor() const { return impl_->fd(); } // ---------------------------------------------------------------------- // Implement MemoryMappedFile @@ -567,7 +569,7 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer { } void* result = mmap(nullptr, static_cast<size_t>(file_->size()), prot_flags, map_mode, - file_->fd(), 0); + file_->fd(), 0); if (result == MAP_FAILED) { std::stringstream ss; ss << "Memory mapping file failed, errno: " << errno; @@ -585,7 +587,9 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer { int64_t size() const { return size_; } Status Seek(int64_t position) { - if (position < 0) { return Status::Invalid("position is out of bounds"); } + if (position < 0) { + return Status::Invalid("position is out of bounds"); + } position_ = position; return Status::OK(); } @@ -610,8 +614,8 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer { MemoryMappedFile::MemoryMappedFile() {} MemoryMappedFile::~MemoryMappedFile() {} -Status MemoryMappedFile::Create( - const std::string& path, int64_t size, std::shared_ptr<MemoryMappedFile>* out) { +Status MemoryMappedFile::Create(const std::string& path, int64_t size, + std::shared_ptr<MemoryMappedFile>* out) { std::shared_ptr<FileOutputStream> file; RETURN_NOT_OK(FileOutputStream::Open(path, &file)); #ifdef _MSC_VER @@ -624,7 +628,7 @@ Status MemoryMappedFile::Create( } Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode, - std::shared_ptr<MemoryMappedFile>* out) { + std::shared_ptr<MemoryMappedFile>* out) { std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile()); result->memory_map_.reset(new MemoryMap()); @@ -644,9 +648,7 @@ Status MemoryMappedFile::Tell(int64_t* position) { return Status::OK(); } -Status MemoryMappedFile::Seek(int64_t position) { - return memory_map_->Seek(position); -} +Status MemoryMappedFile::Seek(int64_t position) { return memory_map_->Seek(position); } Status MemoryMappedFile::Close() { // munmap handled in pimpl dtor @@ -656,7 +658,9 @@ Status MemoryMappedFile::Close() { Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) { nbytes = std::max<int64_t>( 0, std::min(nbytes, memory_map_->size() - memory_map_->position())); - if (nbytes > 0) { std::memcpy(out, memory_map_->head(), static_cast<size_t>(nbytes)); } + if (nbytes > 0) { + std::memcpy(out, memory_map_->head(), static_cast<size_t>(nbytes)); + } *bytes_read = nbytes; memory_map_->advance(nbytes); return Status::OK(); @@ -675,9 +679,7 @@ Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) { return Status::OK(); } -bool MemoryMappedFile::supports_zero_copy() const { - return true; -} +bool MemoryMappedFile::supports_zero_copy() const { return true; } Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) { std::lock_guard<std::mutex> guard(lock_); @@ -708,9 +710,7 @@ Status MemoryMappedFile::WriteInternal(const uint8_t* data, int64_t nbytes) { return Status::OK(); } -int MemoryMappedFile::file_descriptor() const { - return memory_map_->fd(); -} +int MemoryMappedFile::file_descriptor() const { return memory_map_->fd(); } } // namespace io } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/io/file.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h index f0be3cf..ba740f1 100644 --- a/cpp/src/arrow/io/file.h +++ b/cpp/src/arrow/io/file.h @@ -44,8 +44,8 @@ class ARROW_EXPORT FileOutputStream : public OutputStream { // truncated to 0 bytes, deleting any existing memory static Status Open(const std::string& path, std::shared_ptr<FileOutputStream>* file); - static Status Open( - const std::string& path, bool append, std::shared_ptr<FileOutputStream>* file); + static Status Open(const std::string& path, bool append, + std::shared_ptr<FileOutputStream>* file); // OutputStream interface Status Close() override; @@ -73,7 +73,7 @@ class ARROW_EXPORT ReadableFile : public RandomAccessFile { // Open file with one's own memory pool for memory allocations static Status Open(const std::string& path, MemoryPool* memory_pool, - std::shared_ptr<ReadableFile>* file); + std::shared_ptr<ReadableFile>* file); Status Close() override; Status Tell(int64_t* position) override; @@ -107,11 +107,11 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface { ~MemoryMappedFile(); /// Create new file with indicated size, return in read/write mode - static Status Create( - const std::string& path, int64_t size, std::shared_ptr<MemoryMappedFile>* out); + static Status Create(const std::string& path, int64_t size, + std::shared_ptr<MemoryMappedFile>* out); static Status Open(const std::string& path, FileMode::type mode, - std::shared_ptr<MemoryMappedFile>* out); + std::shared_ptr<MemoryMappedFile>* out); Status Close() override; http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/io/hdfs-internal.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/hdfs-internal.cc b/cpp/src/arrow/io/hdfs-internal.cc index 8b4a92b..8f42b1c 100644 --- a/cpp/src/arrow/io/hdfs-internal.cc +++ b/cpp/src/arrow/io/hdfs-internal.cc @@ -59,9 +59,9 @@ static std::vector<fs::path> get_potential_libhdfs_paths(); static std::vector<fs::path> get_potential_libhdfs3_paths(); static arrow::Status try_dlopen(std::vector<fs::path> potential_paths, const char* name, #ifndef _WIN32 - void*& out_handle); + void*& out_handle); #else - HINSTANCE& out_handle); + HINSTANCE& out_handle); #endif static std::vector<fs::path> get_potential_libhdfs_paths() { @@ -88,7 +88,9 @@ static std::vector<fs::path> get_potential_libhdfs_paths() { } const char* libhdfs_dir = std::getenv("ARROW_LIBHDFS_DIR"); - if (libhdfs_dir != nullptr) { search_paths.push_back(fs::path(libhdfs_dir)); } + if (libhdfs_dir != nullptr) { + search_paths.push_back(fs::path(libhdfs_dir)); + } // All paths with file name for (auto& path : search_paths) { @@ -115,7 +117,9 @@ static std::vector<fs::path> get_potential_libhdfs3_paths() { std::vector<fs::path> search_paths = {fs::path(""), fs::path(".")}; const char* libhdfs3_dir = std::getenv("ARROW_LIBHDFS3_DIR"); - if (libhdfs3_dir != nullptr) { search_paths.push_back(fs::path(libhdfs3_dir)); } + if (libhdfs3_dir != nullptr) { + search_paths.push_back(fs::path(libhdfs3_dir)); + } // All paths with file name for (auto& path : search_paths) { @@ -188,8 +192,8 @@ static std::vector<fs::path> get_potential_libjvm_paths() { } #ifndef _WIN32 -static arrow::Status try_dlopen( - std::vector<fs::path> potential_paths, const char* name, void*& out_handle) { +static arrow::Status try_dlopen(std::vector<fs::path> potential_paths, const char* name, + void*& out_handle) { std::vector<std::string> error_messages; for (auto& i : potential_paths) { @@ -219,8 +223,8 @@ static arrow::Status try_dlopen( } #else -static arrow::Status try_dlopen( - std::vector<fs::path> potential_paths, const char* name, HINSTANCE& out_handle) { +static arrow::Status try_dlopen(std::vector<fs::path> potential_paths, const char* name, + HINSTANCE& out_handle) { std::vector<std::string> error_messages; for (auto& i : potential_paths) { @@ -282,9 +286,7 @@ namespace io { static LibHdfsShim libhdfs_shim; static LibHdfsShim libhdfs3_shim; -hdfsBuilder* LibHdfsShim::NewBuilder(void) { - return this->hdfsNewBuilder(); -} +hdfsBuilder* LibHdfsShim::NewBuilder(void) { return this->hdfsNewBuilder(); } void LibHdfsShim::BuilderSetNameNode(hdfsBuilder* bld, const char* nn) { this->hdfsBuilderSetNameNode(bld, nn); @@ -298,8 +300,8 @@ void LibHdfsShim::BuilderSetUserName(hdfsBuilder* bld, const char* userName) { this->hdfsBuilderSetUserName(bld, userName); } -void LibHdfsShim::BuilderSetKerbTicketCachePath( - hdfsBuilder* bld, const char* kerbTicketCachePath) { +void LibHdfsShim::BuilderSetKerbTicketCachePath(hdfsBuilder* bld, + const char* kerbTicketCachePath) { this->hdfsBuilderSetKerbTicketCachePath(bld, kerbTicketCachePath); } @@ -307,12 +309,10 @@ hdfsFS LibHdfsShim::BuilderConnect(hdfsBuilder* bld) { return this->hdfsBuilderConnect(bld); } -int LibHdfsShim::Disconnect(hdfsFS fs) { - return this->hdfsDisconnect(fs); -} +int LibHdfsShim::Disconnect(hdfsFS fs) { return this->hdfsDisconnect(fs); } hdfsFile LibHdfsShim::OpenFile(hdfsFS fs, const char* path, int flags, int bufferSize, - short replication, tSize blocksize) { // NOLINT + short replication, tSize blocksize) { // NOLINT return this->hdfsOpenFile(fs, path, flags, bufferSize, replication, blocksize); } @@ -328,9 +328,7 @@ int LibHdfsShim::Seek(hdfsFS fs, hdfsFile file, tOffset desiredPos) { return this->hdfsSeek(fs, file, desiredPos); } -tOffset LibHdfsShim::Tell(hdfsFS fs, hdfsFile file) { - return this->hdfsTell(fs, file); -} +tOffset LibHdfsShim::Tell(hdfsFS fs, hdfsFile file) { return this->hdfsTell(fs, file); } tSize LibHdfsShim::Read(hdfsFS fs, hdfsFile file, void* buffer, tSize length) { return this->hdfsRead(fs, file, buffer, length); @@ -341,8 +339,8 @@ bool LibHdfsShim::HasPread() { return this->hdfsPread != nullptr; } -tSize LibHdfsShim::Pread( - hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length) { +tSize LibHdfsShim::Pread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, + tSize length) { GET_SYMBOL(this, hdfsPread); return this->hdfsPread(fs, file, position, buffer, length); } @@ -351,9 +349,7 @@ tSize LibHdfsShim::Write(hdfsFS fs, hdfsFile file, const void* buffer, tSize len return this->hdfsWrite(fs, file, buffer, length); } -int LibHdfsShim::Flush(hdfsFS fs, hdfsFile file) { - return this->hdfsFlush(fs, file); -} +int LibHdfsShim::Flush(hdfsFS fs, hdfsFile file) { return this->hdfsFlush(fs, file); } int LibHdfsShim::Available(hdfsFS fs, hdfsFile file) { GET_SYMBOL(this, hdfsAvailable); @@ -434,8 +430,8 @@ void LibHdfsShim::FreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries) { this->hdfsFreeFileInfo(hdfsFileInfo, numEntries); } -char*** LibHdfsShim::GetHosts( - hdfsFS fs, const char* path, tOffset start, tOffset length) { +char*** LibHdfsShim::GetHosts(hdfsFS fs, const char* path, tOffset start, + tOffset length) { GET_SYMBOL(this, hdfsGetHosts); if (this->hdfsGetHosts) { return this->hdfsGetHosts(fs, path, start, length); @@ -446,7 +442,9 @@ char*** LibHdfsShim::GetHosts( void LibHdfsShim::FreeHosts(char*** blockHosts) { GET_SYMBOL(this, hdfsFreeHosts); - if (this->hdfsFreeHosts) { this->hdfsFreeHosts(blockHosts); } + if (this->hdfsFreeHosts) { + this->hdfsFreeHosts(blockHosts); + } } tOffset LibHdfsShim::GetDefaultBlockSize(hdfsFS fs) { @@ -458,16 +456,12 @@ tOffset LibHdfsShim::GetDefaultBlockSize(hdfsFS fs) { } } -tOffset LibHdfsShim::GetCapacity(hdfsFS fs) { - return this->hdfsGetCapacity(fs); -} +tOffset LibHdfsShim::GetCapacity(hdfsFS fs) { return this->hdfsGetCapacity(fs); } -tOffset LibHdfsShim::GetUsed(hdfsFS fs) { - return this->hdfsGetUsed(fs); -} +tOffset LibHdfsShim::GetUsed(hdfsFS fs) { return this->hdfsGetUsed(fs); } -int LibHdfsShim::Chown( - hdfsFS fs, const char* path, const char* owner, const char* group) { +int LibHdfsShim::Chown(hdfsFS fs, const char* path, const char* owner, + const char* group) { GET_SYMBOL(this, hdfsChown); if (this->hdfsChown) { return this->hdfsChown(fs, path, owner, group); http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/io/hdfs-internal.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/hdfs-internal.h b/cpp/src/arrow/io/hdfs-internal.h index c5ea397..db6a21c 100644 --- a/cpp/src/arrow/io/hdfs-internal.h +++ b/cpp/src/arrow/io/hdfs-internal.h @@ -45,22 +45,22 @@ struct LibHdfsShim { void (*hdfsBuilderSetNameNode)(hdfsBuilder* bld, const char* nn); void (*hdfsBuilderSetNameNodePort)(hdfsBuilder* bld, tPort port); void (*hdfsBuilderSetUserName)(hdfsBuilder* bld, const char* userName); - void (*hdfsBuilderSetKerbTicketCachePath)( - hdfsBuilder* bld, const char* kerbTicketCachePath); + void (*hdfsBuilderSetKerbTicketCachePath)(hdfsBuilder* bld, + const char* kerbTicketCachePath); hdfsFS (*hdfsBuilderConnect)(hdfsBuilder* bld); int (*hdfsDisconnect)(hdfsFS fs); hdfsFile (*hdfsOpenFile)(hdfsFS fs, const char* path, int flags, int bufferSize, - short replication, tSize blocksize); // NOLINT + short replication, tSize blocksize); // NOLINT int (*hdfsCloseFile)(hdfsFS fs, hdfsFile file); int (*hdfsExists)(hdfsFS fs, const char* path); int (*hdfsSeek)(hdfsFS fs, hdfsFile file, tOffset desiredPos); tOffset (*hdfsTell)(hdfsFS fs, hdfsFile file); tSize (*hdfsRead)(hdfsFS fs, hdfsFile file, void* buffer, tSize length); - tSize (*hdfsPread)( - hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length); + tSize (*hdfsPread)(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, + tSize length); tSize (*hdfsWrite)(hdfsFS fs, hdfsFile file, const void* buffer, tSize length); int (*hdfsFlush)(hdfsFS fs, hdfsFile file); int (*hdfsAvailable)(hdfsFS fs, hdfsFile file); @@ -139,7 +139,7 @@ struct LibHdfsShim { int Disconnect(hdfsFS fs); hdfsFile OpenFile(hdfsFS fs, const char* path, int flags, int bufferSize, - short replication, tSize blocksize); // NOLINT + short replication, tSize blocksize); // NOLINT int CloseFile(hdfsFS fs, hdfsFile file); http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/io/hdfs.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index 9ded9bc..500f42c 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -61,8 +61,8 @@ static constexpr int kDefaultHdfsBufferSize = 1 << 16; class HdfsAnyFileImpl { public: - void set_members( - const std::string& path, LibHdfsShim* driver, hdfsFS fs, hdfsFile handle) { + void set_members(const std::string& path, LibHdfsShim* driver, hdfsFS fs, + hdfsFile handle) { path_ = path; driver_ = driver; fs_ = fs; @@ -118,7 +118,7 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { tSize ret; if (driver_->HasPread()) { ret = driver_->Pread(fs_, file_, static_cast<tOffset>(position), - reinterpret_cast<void*>(buffer), static_cast<tSize>(nbytes)); + reinterpret_cast<void*>(buffer), static_cast<tSize>(nbytes)); } else { std::lock_guard<std::mutex> guard(lock_); RETURN_NOT_OK(Seek(position)); @@ -136,7 +136,9 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { int64_t bytes_read = 0; RETURN_NOT_OK(ReadAt(position, nbytes, &bytes_read, buffer->mutable_data())); - if (bytes_read < nbytes) { RETURN_NOT_OK(buffer->Resize(bytes_read)); } + if (bytes_read < nbytes) { + RETURN_NOT_OK(buffer->Resize(bytes_read)); + } *out = buffer; return Status::OK(); @@ -145,11 +147,14 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { int64_t total_bytes = 0; while (total_bytes < nbytes) { - tSize ret = driver_->Read(fs_, file_, reinterpret_cast<void*>(buffer + total_bytes), + tSize ret = driver_->Read( + fs_, file_, reinterpret_cast<void*>(buffer + total_bytes), static_cast<tSize>(std::min<int64_t>(buffer_size_, nbytes - total_bytes))); RETURN_NOT_OK(CheckReadResult(ret)); total_bytes += ret; - if (ret == 0) { break; } + if (ret == 0) { + break; + } } *bytes_read = total_bytes; @@ -162,7 +167,9 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { int64_t bytes_read = 0; RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data())); - if (bytes_read < nbytes) { RETURN_NOT_OK(buffer->Resize(bytes_read)); } + if (bytes_read < nbytes) { + RETURN_NOT_OK(buffer->Resize(bytes_read)); + } *out = buffer; return Status::OK(); @@ -170,7 +177,9 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { Status GetSize(int64_t* size) { hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path_.c_str()); - if (entry == nullptr) { return Status::IOError("HDFS: GetPathInfo failed"); } + if (entry == nullptr) { + return Status::IOError("HDFS: GetPathInfo failed"); + } *size = entry->mSize; driver_->FreeFileInfo(entry, 1); @@ -187,31 +196,27 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { }; HdfsReadableFile::HdfsReadableFile(MemoryPool* pool) { - if (pool == nullptr) { pool = default_memory_pool(); } + if (pool == nullptr) { + pool = default_memory_pool(); + } impl_.reset(new HdfsReadableFileImpl(pool)); } -HdfsReadableFile::~HdfsReadableFile() { - DCHECK(impl_->Close().ok()); -} +HdfsReadableFile::~HdfsReadableFile() { DCHECK(impl_->Close().ok()); } -Status HdfsReadableFile::Close() { - return impl_->Close(); -} +Status HdfsReadableFile::Close() { return impl_->Close(); } -Status HdfsReadableFile::ReadAt( - int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { +Status HdfsReadableFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, + uint8_t* buffer) { return impl_->ReadAt(position, nbytes, bytes_read, buffer); } -Status HdfsReadableFile::ReadAt( - int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) { +Status HdfsReadableFile::ReadAt(int64_t position, int64_t nbytes, + std::shared_ptr<Buffer>* out) { return impl_->ReadAt(position, nbytes, out); } -bool HdfsReadableFile::supports_zero_copy() const { - return false; -} +bool HdfsReadableFile::supports_zero_copy() const { return false; } Status HdfsReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { return impl_->Read(nbytes, bytes_read, buffer); @@ -221,17 +226,11 @@ Status HdfsReadableFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* buffer) { return impl_->Read(nbytes, buffer); } -Status HdfsReadableFile::GetSize(int64_t* size) { - return impl_->GetSize(size); -} +Status HdfsReadableFile::GetSize(int64_t* size) { return impl_->GetSize(size); } -Status HdfsReadableFile::Seek(int64_t position) { - return impl_->Seek(position); -} +Status HdfsReadableFile::Seek(int64_t position) { return impl_->Seek(position); } -Status HdfsReadableFile::Tell(int64_t* position) { - return impl_->Tell(position); -} +Status HdfsReadableFile::Tell(int64_t* position) { return impl_->Tell(position); } // ---------------------------------------------------------------------- // File writing @@ -259,28 +258,22 @@ class HdfsOutputStream::HdfsOutputStreamImpl : public HdfsAnyFileImpl { Status Write(const uint8_t* buffer, int64_t nbytes, int64_t* bytes_written) { std::lock_guard<std::mutex> guard(lock_); - tSize ret = driver_->Write( - fs_, file_, reinterpret_cast<const void*>(buffer), static_cast<tSize>(nbytes)); + tSize ret = driver_->Write(fs_, file_, reinterpret_cast<const void*>(buffer), + static_cast<tSize>(nbytes)); CHECK_FAILURE(ret, "Write"); *bytes_written = ret; return Status::OK(); } }; -HdfsOutputStream::HdfsOutputStream() { - impl_.reset(new HdfsOutputStreamImpl()); -} +HdfsOutputStream::HdfsOutputStream() { impl_.reset(new HdfsOutputStreamImpl()); } -HdfsOutputStream::~HdfsOutputStream() { - DCHECK(impl_->Close().ok()); -} +HdfsOutputStream::~HdfsOutputStream() { DCHECK(impl_->Close().ok()); } -Status HdfsOutputStream::Close() { - return impl_->Close(); -} +Status HdfsOutputStream::Close() { return impl_->Close(); } -Status HdfsOutputStream::Write( - const uint8_t* buffer, int64_t nbytes, int64_t* bytes_read) { +Status HdfsOutputStream::Write(const uint8_t* buffer, int64_t nbytes, + int64_t* bytes_read) { return impl_->Write(buffer, nbytes, bytes_read); } @@ -289,13 +282,9 @@ Status HdfsOutputStream::Write(const uint8_t* buffer, int64_t nbytes) { return Write(buffer, nbytes, &bytes_written_dummy); } -Status HdfsOutputStream::Flush() { - return impl_->Flush(); -} +Status HdfsOutputStream::Flush() { return impl_->Flush(); } -Status HdfsOutputStream::Tell(int64_t* position) { - return impl_->Tell(position); -} +Status HdfsOutputStream::Tell(int64_t* position) { return impl_->Tell(position); } // ---------------------------------------------------------------------- // HDFS client @@ -344,7 +333,9 @@ class HdfsClient::HdfsClientImpl { } fs_ = driver_->BuilderConnect(builder); - if (fs_ == nullptr) { return Status::IOError("HDFS connection failed"); } + if (fs_ == nullptr) { + return Status::IOError("HDFS connection failed"); + } namenode_host_ = config->host; port_ = config->port; user_ = config->user; @@ -395,7 +386,9 @@ class HdfsClient::HdfsClientImpl { Status GetPathInfo(const std::string& path, HdfsPathInfo* info) { hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path.c_str()); - if (entry == nullptr) { return Status::IOError("HDFS: GetPathInfo failed"); } + if (entry == nullptr) { + return Status::IOError("HDFS: GetPathInfo failed"); + } SetPathInfo(entry, info); driver_->FreeFileInfo(entry, 1); @@ -435,7 +428,7 @@ class HdfsClient::HdfsClientImpl { } Status OpenReadable(const std::string& path, int32_t buffer_size, - std::shared_ptr<HdfsReadableFile>* file) { + std::shared_ptr<HdfsReadableFile>* file) { hdfsFile handle = driver_->OpenFile(fs_, path.c_str(), O_RDONLY, buffer_size, 0, 0); if (handle == nullptr) { @@ -454,13 +447,14 @@ class HdfsClient::HdfsClientImpl { } Status OpenWriteable(const std::string& path, bool append, int32_t buffer_size, - int16_t replication, int64_t default_block_size, - std::shared_ptr<HdfsOutputStream>* file) { + int16_t replication, int64_t default_block_size, + std::shared_ptr<HdfsOutputStream>* file) { int flags = O_WRONLY; if (append) flags |= O_APPEND; - hdfsFile handle = driver_->OpenFile(fs_, path.c_str(), flags, buffer_size, - replication, static_cast<tSize>(default_block_size)); + hdfsFile handle = + driver_->OpenFile(fs_, path.c_str(), flags, buffer_size, replication, + static_cast<tSize>(default_block_size)); if (handle == nullptr) { // TODO(wesm): determine cause of failure @@ -496,14 +490,12 @@ class HdfsClient::HdfsClientImpl { // ---------------------------------------------------------------------- // Public API for HDFSClient -HdfsClient::HdfsClient() { - impl_.reset(new HdfsClientImpl()); -} +HdfsClient::HdfsClient() { impl_.reset(new HdfsClientImpl()); } HdfsClient::~HdfsClient() {} -Status HdfsClient::Connect( - const HdfsConnectionConfig* config, std::shared_ptr<HdfsClient>* fs) { +Status HdfsClient::Connect(const HdfsConnectionConfig* config, + std::shared_ptr<HdfsClient>* fs) { // ctor is private, make_shared will not work *fs = std::shared_ptr<HdfsClient>(new HdfsClient()); @@ -519,50 +511,43 @@ Status HdfsClient::Delete(const std::string& path, bool recursive) { return impl_->Delete(path, recursive); } -Status HdfsClient::Disconnect() { - return impl_->Disconnect(); -} +Status HdfsClient::Disconnect() { return impl_->Disconnect(); } -bool HdfsClient::Exists(const std::string& path) { - return impl_->Exists(path); -} +bool HdfsClient::Exists(const std::string& path) { return impl_->Exists(path); } Status HdfsClient::GetPathInfo(const std::string& path, HdfsPathInfo* info) { return impl_->GetPathInfo(path, info); } -Status HdfsClient::GetCapacity(int64_t* nbytes) { - return impl_->GetCapacity(nbytes); -} +Status HdfsClient::GetCapacity(int64_t* nbytes) { return impl_->GetCapacity(nbytes); } -Status HdfsClient::GetUsed(int64_t* nbytes) { - return impl_->GetUsed(nbytes); -} +Status HdfsClient::GetUsed(int64_t* nbytes) { return impl_->GetUsed(nbytes); } -Status HdfsClient::ListDirectory( - const std::string& path, std::vector<HdfsPathInfo>* listing) { +Status HdfsClient::ListDirectory(const std::string& path, + std::vector<HdfsPathInfo>* listing) { return impl_->ListDirectory(path, listing); } Status HdfsClient::OpenReadable(const std::string& path, int32_t buffer_size, - std::shared_ptr<HdfsReadableFile>* file) { + std::shared_ptr<HdfsReadableFile>* file) { return impl_->OpenReadable(path, buffer_size, file); } -Status HdfsClient::OpenReadable( - const std::string& path, std::shared_ptr<HdfsReadableFile>* file) { +Status HdfsClient::OpenReadable(const std::string& path, + std::shared_ptr<HdfsReadableFile>* file) { return OpenReadable(path, kDefaultHdfsBufferSize, file); } Status HdfsClient::OpenWriteable(const std::string& path, bool append, - int32_t buffer_size, int16_t replication, int64_t default_block_size, - std::shared_ptr<HdfsOutputStream>* file) { - return impl_->OpenWriteable( - path, append, buffer_size, replication, default_block_size, file); + int32_t buffer_size, int16_t replication, + int64_t default_block_size, + std::shared_ptr<HdfsOutputStream>* file) { + return impl_->OpenWriteable(path, append, buffer_size, replication, default_block_size, + file); } -Status HdfsClient::OpenWriteable( - const std::string& path, bool append, std::shared_ptr<HdfsOutputStream>* file) { +Status HdfsClient::OpenWriteable(const std::string& path, bool append, + std::shared_ptr<HdfsOutputStream>* file) { return OpenWriteable(path, append, 0, 0, 0, file); } http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/io/hdfs.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h index f3de4a2..63c3ae0 100644 --- a/cpp/src/arrow/io/hdfs.h +++ b/cpp/src/arrow/io/hdfs.h @@ -75,8 +75,8 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient { // @param config (in): configuration for connecting // @param fs (out): the created client // @returns Status - static Status Connect( - const HdfsConnectionConfig* config, std::shared_ptr<HdfsClient>* fs); + static Status Connect(const HdfsConnectionConfig* config, + std::shared_ptr<HdfsClient>* fs); // Create directory and all parents // @@ -132,7 +132,7 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient { // // @param path complete file path Status OpenReadable(const std::string& path, int32_t buffer_size, - std::shared_ptr<HdfsReadableFile>* file); + std::shared_ptr<HdfsReadableFile>* file); Status OpenReadable(const std::string& path, std::shared_ptr<HdfsReadableFile>* file); @@ -142,11 +142,11 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient { // @param replication, 0 for default // @param default_block_size, 0 for default Status OpenWriteable(const std::string& path, bool append, int32_t buffer_size, - int16_t replication, int64_t default_block_size, - std::shared_ptr<HdfsOutputStream>* file); + int16_t replication, int64_t default_block_size, + std::shared_ptr<HdfsOutputStream>* file); - Status OpenWriteable( - const std::string& path, bool append, std::shared_ptr<HdfsOutputStream>* file); + Status OpenWriteable(const std::string& path, bool append, + std::shared_ptr<HdfsOutputStream>* file); private: friend class HdfsReadableFile; @@ -173,8 +173,8 @@ class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile { Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override; - Status ReadAt( - int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override; + Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, + uint8_t* buffer) override; Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;
