PARQUET-671: performance improvements for rle/bit-packed decoding Testing on my own data shows an order-of-magnitude improvement.
I separated the commits for clarity, each one gives an imcremental improvement. The motivation for the last commit (allowing NULL for def_levels/rep_level) is a workaround for Spark which doesn't seem to be able to generate columns without def_level, even when a column is specified as "not nullable". Author: Eric Daniel <[email protected]> Closes #140 from edani/decode-perf and squashes the following commits: eec0855 [Eric Daniel] Ran "make format" 0568de6 [Eric Daniel] Only check num. of repetition levels when def_levels is set 5f54e1c [Eric Daniel] Added benchmarks for dictionary decoding 087945b [Eric Daniel] Style fixes from code review 906be73 [Eric Daniel] Allow the reader to skip rep/def decoding 04b7391 [Eric Daniel] Fast bit unpacking bda5d84 [Eric Daniel] The bit reader can decode in batches 3f10378 [Eric Daniel] Improve decoding of repeated values in the dict encoding Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/38f0ffd5 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/38f0ffd5 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/38f0ffd5 Branch: refs/heads/master Commit: 38f0ffd5adc37db948991a2eb6409e2727721463 Parents: 616305c Author: Eric Daniel <[email protected]> Authored: Tue Aug 2 16:38:24 2016 -0700 Committer: Wes McKinney <[email protected]> Committed: Tue Aug 2 16:38:24 2016 -0700 ---------------------------------------------------------------------- src/parquet/column/levels.cc | 5 +- src/parquet/column/reader.h | 10 +- src/parquet/encodings/dictionary-encoding.h | 20 +- src/parquet/encodings/encoding-benchmark.cc | 72 +- src/parquet/encodings/plain-encoding.h | 6 +- src/parquet/util/bit-stream-utils.h | 4 + src/parquet/util/bit-stream-utils.inline.h | 102 +- src/parquet/util/bpacking.h | 3323 ++++++++++++++++++++++ src/parquet/util/buffer.h | 2 +- src/parquet/util/rle-encoding.h | 72 +- 10 files changed, 3549 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/column/levels.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/levels.cc b/src/parquet/column/levels.cc index 9b2d901..3e7b9df 100644 --- a/src/parquet/column/levels.cc +++ b/src/parquet/column/levels.cc @@ -133,10 +133,7 @@ int LevelDecoder::Decode(int batch_size, int16_t* levels) { if (encoding_ == Encoding::RLE) { num_decoded = rle_decoder_->GetBatch(levels, num_values); } else { - for (int i = 0; i < num_values; ++i) { - if (!bit_packed_decoder_->GetValue(bit_width_, levels + i)) { break; } - ++num_decoded; - } + num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values); } num_values_remaining_ -= num_decoded; return num_decoded; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/column/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h index 5153698..25df2b4 100644 --- a/src/parquet/column/reader.h +++ b/src/parquet/column/reader.h @@ -115,6 +115,10 @@ class PARQUET_EXPORT TypedColumnReader : public ColumnReader { // may be less than the number of repetition and definition levels. With // nested data this is almost certainly true. // + // Set def_levels or rep_levels to nullptr if you want to skip reading them. + // This is only safe if you know through some other source that there are no + // undefined values. + // // To fully exhaust a row group, you must read batches until the number of // values read reaches the number of stored values according to the metadata. // @@ -171,7 +175,7 @@ inline int64_t TypedColumnReader<DType>::ReadBatch(int batch_size, int16_t* def_ int64_t values_to_read = 0; // If the field is required and non-repeated, there are no definition levels - if (descr_->max_definition_level() > 0) { + if (descr_->max_definition_level() > 0 && def_levels) { num_def_levels = ReadDefinitionLevels(batch_size, def_levels); // TODO(wesm): this tallying of values-to-decode can be performed with better // cache-efficiency if fused with the level decoding. @@ -184,9 +188,9 @@ inline int64_t TypedColumnReader<DType>::ReadBatch(int batch_size, int16_t* def_ } // Not present for non-repeated fields - if (descr_->max_repetition_level() > 0) { + if (descr_->max_repetition_level() > 0 && rep_levels) { num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels); - if (num_def_levels != num_rep_levels) { + if (def_levels && num_def_levels != num_rep_levels) { throw ParquetException("Number of decoded rep / def levels did not match"); } } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/encodings/dictionary-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h index 7d6785e..8e121ee 100644 --- a/src/parquet/encodings/dictionary-encoding.h +++ b/src/parquet/encodings/dictionary-encoding.h @@ -64,22 +64,15 @@ class DictionaryDecoder : public Decoder<Type> { virtual int Decode(T* buffer, int max_values) { max_values = std::min(max_values, num_values_); - for (int i = 0; i < max_values; ++i) { - buffer[i] = dictionary_[index()]; - } + int decoded_values = idx_decoder_.GetBatchWithDict(dictionary_, buffer, max_values); + if (decoded_values != max_values) { ParquetException::EofException(); } + num_values_ -= max_values; return max_values; } private: using Decoder<Type>::num_values_; - int index() { - int idx = 0; - if (!idx_decoder_.Get(&idx)) ParquetException::EofException(); - --num_values_; - return idx; - } - // Only one is set. Vector<T> dictionary_; @@ -177,7 +170,12 @@ class DictEncoderBase { /// Returns a conservative estimate of the number of bytes needed to encode the buffered /// indices. Used to size the buffer passed to WriteIndices(). int EstimatedDataEncodedSize() { - return 1 + RleEncoder::MaxBufferSize(bit_width(), buffered_indices_.size()); + // Note: because of the way RleEncoder::CheckBufferFull() is called, we have to + // reserve + // an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used + // but not reserving them would cause the encoder to fail. + return 1 + RleEncoder::MaxBufferSize(bit_width(), buffered_indices_.size()) + + RleEncoder::MinBufferSize(bit_width()); } /// The minimum bit width required to encode the currently buffered indices. http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/encodings/encoding-benchmark.cc ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/encoding-benchmark.cc b/src/parquet/encodings/encoding-benchmark.cc index f9265bd..43348d8 100644 --- a/src/parquet/encodings/encoding-benchmark.cc +++ b/src/parquet/encodings/encoding-benchmark.cc @@ -17,12 +17,23 @@ #include "benchmark/benchmark.h" -#include "parquet/encodings/plain-encoding.h" +#include "parquet/encodings/dictionary-encoding.h" +#include "parquet/file/reader-internal.h" +#include "parquet/util/mem-pool.h" namespace parquet { +using format::ColumnChunk; +using schema::PrimitiveNode; + namespace benchmark { +std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) { + auto node = PrimitiveNode::Make("int64", repetition, Type::INT64); + return std::make_shared<ColumnDescriptor>( + node, repetition != Repetition::REQUIRED, repetition == Repetition::REPEATED); +} + static void BM_PlainEncodingBoolean(::benchmark::State& state) { std::vector<bool> values(state.range_x(), 64); PlainEncoder<BooleanType> encoder(nullptr); @@ -86,6 +97,65 @@ static void BM_PlainDecodingInt64(::benchmark::State& state) { BENCHMARK(BM_PlainDecodingInt64)->Range(1024, 65536); +template <typename Type> +static void DecodeDict( + std::vector<typename Type::c_type>& values, ::benchmark::State& state) { + typedef typename Type::c_type T; + int num_values = values.size(); + + MemPool pool; + MemoryAllocator* allocator = default_allocator(); + std::shared_ptr<ColumnDescriptor> descr = Int64Schema(Repetition::REQUIRED); + std::shared_ptr<OwnedMutableBuffer> dict_buffer = + std::make_shared<OwnedMutableBuffer>(); + auto indices = std::make_shared<OwnedMutableBuffer>(); + + DictEncoder<T> encoder(&pool, allocator, descr->type_length()); + for (int i = 0; i < num_values; ++i) { + encoder.Put(values[i]); + } + + dict_buffer->Resize(encoder.dict_encoded_size()); + encoder.WriteDict(dict_buffer->mutable_data()); + indices->Resize(encoder.EstimatedDataEncodedSize()); + int actual_bytes = encoder.WriteIndices(indices->mutable_data(), indices->size()); + indices->Resize(actual_bytes); + + while (state.KeepRunning()) { + PlainDecoder<Type> dict_decoder(descr.get()); + dict_decoder.SetData(encoder.num_entries(), dict_buffer->data(), dict_buffer->size()); + DictionaryDecoder<Type> decoder(descr.get()); + decoder.SetDict(&dict_decoder); + decoder.SetData(num_values, indices->data(), indices->size()); + decoder.Decode(values.data(), num_values); + } + + state.SetBytesProcessed(state.iterations() * state.range_x() * sizeof(T)); +} + +static void BM_DictDecodingInt64_repeats(::benchmark::State& state) { + typedef Int64Type Type; + typedef typename Type::c_type T; + + std::vector<T> values(state.range_x(), 64); + DecodeDict<Type>(values, state); +} + +BENCHMARK(BM_DictDecodingInt64_repeats)->Range(1024, 65536); + +static void BM_DictDecodingInt64_literals(::benchmark::State& state) { + typedef Int64Type Type; + typedef typename Type::c_type T; + + std::vector<T> values(state.range_x()); + for (size_t i = 0; i < values.size(); ++i) { + values[i] = i; + } + DecodeDict<Type>(values, state); +} + +BENCHMARK(BM_DictDecodingInt64_literals)->Range(1024, 65536); + } // namespace benchmark } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/encodings/plain-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h index 71ae740..c169688 100644 --- a/src/parquet/encodings/plain-encoding.h +++ b/src/parquet/encodings/plain-encoding.h @@ -142,10 +142,8 @@ class PlainDecoder<BooleanType> : public Decoder<BooleanType> { virtual int Decode(bool* buffer, int max_values) { max_values = std::min(max_values, num_values_); - bool val; - for (int i = 0; i < max_values; ++i) { - if (!bit_reader_.GetValue(1, &val)) { ParquetException::EofException(); } - buffer[i] = val; + if (bit_reader_.GetBatch(1, buffer, max_values) != max_values) { + ParquetException::EofException(); } num_values_ -= max_values; return max_values; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/util/bit-stream-utils.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/bit-stream-utils.h b/src/parquet/util/bit-stream-utils.h index dd0c9e2..f7d09a9 100644 --- a/src/parquet/util/bit-stream-utils.h +++ b/src/parquet/util/bit-stream-utils.h @@ -122,6 +122,10 @@ class BitReader { template <typename T> bool GetValue(int num_bits, T* v); + /// Get a number of values from the buffer. Return the number of values actually read. + template <typename T> + int GetBatch(int num_bits, T* v, int batch_size); + /// Reads a 'num_bytes'-sized value from the buffer and stores it in 'v'. T /// needs to be a little-endian native type and big enough to store /// 'num_bytes'. The value is assumed to be byte-aligned so the stream will http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/38f0ffd5/src/parquet/util/bit-stream-utils.inline.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/bit-stream-utils.inline.h b/src/parquet/util/bit-stream-utils.inline.h index 02c0e25..5cdf76b 100644 --- a/src/parquet/util/bit-stream-utils.inline.h +++ b/src/parquet/util/bit-stream-utils.inline.h @@ -20,7 +20,10 @@ #ifndef PARQUET_UTIL_BIT_STREAM_UTILS_INLINE_H #define PARQUET_UTIL_BIT_STREAM_UTILS_INLINE_H +#include <algorithm> + #include "parquet/util/bit-stream-utils.h" +#include "parquet/util/bpacking.h" namespace parquet { @@ -86,34 +89,97 @@ inline bool BitWriter::PutVlqInt(uint32_t v) { } template <typename T> +inline void GetValue_(int num_bits, T* v, int max_bytes, const uint8_t* buffer, + int* bit_offset, int* byte_offset, uint64_t* buffered_values) { + *v = BitUtil::TrailingBits(*buffered_values, *bit_offset + num_bits) >> *bit_offset; + + *bit_offset += num_bits; + if (*bit_offset >= 64) { + *byte_offset += 8; + *bit_offset -= 64; + + int bytes_remaining = max_bytes - *byte_offset; + if (LIKELY(bytes_remaining >= 8)) { + memcpy(buffered_values, buffer + *byte_offset, 8); + } else { + memcpy(buffered_values, buffer + *byte_offset, bytes_remaining); + } + + // Read bits of v that crossed into new buffered_values_ + *v |= BitUtil::TrailingBits(*buffered_values, *bit_offset) + << (num_bits - *bit_offset); + DCHECK_LE(*bit_offset, 64); + } +} + +template <typename T> inline bool BitReader::GetValue(int num_bits, T* v) { + return GetBatch(num_bits, v, 1) == 1; +} + +template <typename T> +inline int BitReader::GetBatch(int num_bits, T* v, int batch_size) { DCHECK(buffer_ != NULL); // TODO: revisit this limit if necessary DCHECK_LE(num_bits, 32); DCHECK_LE(num_bits, static_cast<int>(sizeof(T) * 8)); - if (UNLIKELY(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) return false; - - *v = BitUtil::TrailingBits(buffered_values_, bit_offset_ + num_bits) >> bit_offset_; - - bit_offset_ += num_bits; - if (bit_offset_ >= 64) { - byte_offset_ += 8; - bit_offset_ -= 64; + int bit_offset = bit_offset_; + int byte_offset = byte_offset_; + uint64_t buffered_values = buffered_values_; + int max_bytes = max_bytes_; + const uint8_t* buffer = buffer_; + + uint64_t needed_bits = num_bits * batch_size; + uint64_t remaining_bits = (max_bytes - byte_offset) * 8 - bit_offset; + if (remaining_bits < needed_bits) { batch_size = remaining_bits / num_bits; } + + int i = 0; + if (UNLIKELY(bit_offset != 0)) { + for (; i < batch_size && bit_offset != 0; ++i) { + GetValue_(num_bits, &v[i], max_bytes, buffer, &bit_offset, &byte_offset, + &buffered_values); + } + } - int bytes_remaining = max_bytes_ - byte_offset_; - if (LIKELY(bytes_remaining >= 8)) { - memcpy(&buffered_values_, buffer_ + byte_offset_, 8); - } else { - memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining); + if (sizeof(T) == 4) { + int num_unpacked = unpack32(reinterpret_cast<const uint32_t*>(buffer + byte_offset), + reinterpret_cast<uint32_t*>(v + i), batch_size - i, num_bits); + i += num_unpacked; + byte_offset += num_unpacked * num_bits / 8; + } else { + const int buffer_size = 1024; + static uint32_t unpack_buffer[buffer_size]; + while (i < batch_size) { + int unpack_size = std::min(buffer_size, batch_size - i); + int num_unpacked = unpack32(reinterpret_cast<const uint32_t*>(buffer + byte_offset), + unpack_buffer, unpack_size, num_bits); + if (num_unpacked == 0) { break; } + for (int k = 0; k < num_unpacked; ++k) { + v[i + k] = unpack_buffer[k]; + } + i += num_unpacked; + byte_offset += num_unpacked * num_bits / 8; } + } - // Read bits of v that crossed into new buffered_values_ - *v |= BitUtil::TrailingBits(buffered_values_, bit_offset_) - << (num_bits - bit_offset_); + int bytes_remaining = max_bytes - byte_offset; + if (bytes_remaining >= 8) { + memcpy(&buffered_values, buffer + byte_offset, 8); + } else { + memcpy(&buffered_values, buffer + byte_offset, bytes_remaining); } - DCHECK_LE(bit_offset_, 64); - return true; + + for (; i < batch_size; ++i) { + GetValue_( + num_bits, &v[i], max_bytes, buffer, &bit_offset, &byte_offset, &buffered_values); + } + + bit_offset_ = bit_offset; + byte_offset_ = byte_offset; + buffered_values_ = buffered_values; + + return batch_size; } template <typename T>
