IMPALA-6946: handle negative counts in RLE decoder This improves the handling of out-of-range values to avoid hitting various DCHECKs, including the one in the JIRA. repeat_count_ and literal_count_ are int32_ts. Avoid setting them to a negative value directly or by integer overflow.
Switch to using uint32_t for "VLQ" encoding, which should be ULEB-128 encoding according to the Parquet standard. This fixes an infinite loop in PutVlqInt() for negative values - the bug was that shifting right sign-extended the signed value. Testing: Added backend test to exercise handling of large literal and repeat counts that don't fit in an int32_t. Before these fixes it could trigger several DCHECKs. Change-Id: If75ef3fb12494209918c100f26407cd93b17addb Reviewed-on: http://gerrit.cloudera.org:8080/10233 Reviewed-by: Lars Volker <l...@cloudera.com> Reviewed-by: Dan Hecht <dhe...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/10187bff Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/10187bff Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/10187bff Branch: refs/heads/master Commit: 10187bfff4b926945d648f42290be60a1367e43e Parents: 14f6c24 Author: Tim Armstrong <tarmstr...@cloudera.com> Authored: Fri Apr 27 10:21:15 2018 -0700 Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Committed: Mon May 7 21:50:56 2018 +0000 ---------------------------------------------------------------------- be/src/util/bit-stream-utils.h | 18 ++++++------ be/src/util/bit-stream-utils.inline.h | 4 +-- be/src/util/dict-encoding.h | 23 ++++++++------- be/src/util/rle-encoding.h | 45 ++++++++++++++++++------------ be/src/util/rle-test.cc | 39 ++++++++++++++++++++++++++ 5 files changed, 90 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/10187bff/be/src/util/bit-stream-utils.h ---------------------------------------------------------------------- diff --git a/be/src/util/bit-stream-utils.h b/be/src/util/bit-stream-utils.h index e527dc8..67f1a00 100644 --- a/be/src/util/bit-stream-utils.h +++ b/be/src/util/bit-stream-utils.h @@ -61,11 +61,10 @@ class BitWriter { template<typename T> bool PutAligned(T v, int num_bytes); - /// Write a Vlq encoded int to the buffer. Returns false if there was not enough - /// room. The value is written byte aligned. - /// For more details on vlq: - /// en.wikipedia.org/wiki/Variable-length_quantity - bool PutVlqInt(int32_t v); + /// Write an unsigned ULEB-128 encoded int to the buffer. Return false if there was not + /// enough room. The value is written byte aligned. For more details on ULEB-128: + /// https://en.wikipedia.org/wiki/LEB128 + bool PutUleb128Int(uint32_t v); /// Get a pointer to the next aligned byte and advance the underlying buffer /// by num_bytes. @@ -148,10 +147,11 @@ class BatchedBitReader { template<typename T> bool GetBytes(int num_bytes, T* v); - /// Reads a vlq encoded int from the stream. The encoded int must start at the - /// beginning of a byte. Return false if there were not enough bytes in the buffer or - /// the int is invalid. - bool GetVlqInt(int32_t* v); + /// Read an unsigned ULEB-128 encoded int from the stream. The encoded int must start + /// at the beginning of a byte. Return false if there were not enough bytes in the + /// buffer or the int is invalid. For more details on ULEB-128: + /// https://en.wikipedia.org/wiki/LEB128 + bool GetUleb128Int(uint32_t* v); /// Returns the number of bytes left in the stream. int bytes_left() { return buffer_end_ - buffer_pos_; } http://git-wip-us.apache.org/repos/asf/impala/blob/10187bff/be/src/util/bit-stream-utils.inline.h ---------------------------------------------------------------------- diff --git a/be/src/util/bit-stream-utils.inline.h b/be/src/util/bit-stream-utils.inline.h index 3974492..b85b252 100644 --- a/be/src/util/bit-stream-utils.inline.h +++ b/be/src/util/bit-stream-utils.inline.h @@ -76,7 +76,7 @@ inline bool BitWriter::PutAligned(T val, int num_bytes) { return true; } -inline bool BitWriter::PutVlqInt(int32_t v) { +inline bool BitWriter::PutUleb128Int(uint32_t v) { bool result = true; while ((v & 0xFFFFFF80) != 0L) { result &= PutAligned<uint8_t>((v & 0x7F) | 0x80, 1); @@ -135,7 +135,7 @@ inline bool BatchedBitReader::GetBytes(int num_bytes, T* v) { return true; } -inline bool BatchedBitReader::GetVlqInt(int32_t* v) { +inline bool BatchedBitReader::GetUleb128Int(uint32_t* v) { *v = 0; int shift = 0; uint8_t byte = 0; http://git-wip-us.apache.org/repos/asf/impala/blob/10187bff/be/src/util/dict-encoding.h ---------------------------------------------------------------------- diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h index d916bab..a76d9b5 100644 --- a/be/src/util/dict-encoding.h +++ b/be/src/util/dict-encoding.h @@ -230,6 +230,13 @@ class DictEncoder : public DictEncoderBase { int AddToTable(const T& value, NodeIndex* bucket); }; +/// Number of decoded values to buffer at a time. A multiple of 32 is chosen to allow +/// efficient reading in batches from data_decoder_. Increasing the batch size up to +/// 128 seems to improve performance, but increasing further did not make a noticeable +/// difference. Defined outside DictDecoderBase to get static linkage because there is +/// no dict-encoding.cc file. +static constexpr int32_t DICT_DECODER_BUFFER_SIZE = 128; + /// Decoder class for dictionary encoded data. This class does not allocate any /// buffers. The input buffers (dictionary buffer and RLE buffer) must be maintained /// by the caller and valid as long as this object is. @@ -274,12 +281,6 @@ class DictDecoderBase { } protected: - /// Number of decoded values to buffer at a time. A multiple of 32 is chosen to allow - /// efficient reading in batches from data_decoder_. Increasing the batch size up to - /// 128 seems to improve performance, but increasing further did not make a noticeable - /// difference. - static const int DECODED_BUFFER_SIZE = 128; - RleBatchDecoder<uint32_t> data_decoder_; /// Greater than zero if we've started decoding a repeated run. @@ -362,7 +363,7 @@ class DictDecoder : public DictDecoderBase { /// a repeated run, the first element is the current dict value. If in a literal run, /// this contains 'num_literal_values_' values, with the next value to be returned at /// 'next_literal_idx_'. - T decoded_values_[DECODED_BUFFER_SIZE]; + T decoded_values_[DICT_DECODER_BUFFER_SIZE]; /// Slow path for GetNextValue() where we need to decode new values. Should not be /// inlined everywhere. @@ -450,7 +451,8 @@ template <typename T> bool DictDecoder<T>::DecodeNextValue(T* value) { // IMPALA-959: Use memcpy() instead of '=' to set *value: addresses are not always 16 // byte aligned for Decimal16Values. - uint32_t num_repeats = data_decoder_.NextNumRepeats(); + int32_t num_repeats = data_decoder_.NextNumRepeats(); + DCHECK_GE(num_repeats, 0); if (num_repeats > 0) { uint32_t idx = data_decoder_.GetRepeatedValue(num_repeats); if (UNLIKELY(idx >= dict_.size())) return false; @@ -459,10 +461,11 @@ bool DictDecoder<T>::DecodeNextValue(T* value) { num_repeats_ = num_repeats - 1; return true; } else { - uint32_t num_literals = data_decoder_.NextNumLiterals(); + int32_t num_literals = data_decoder_.NextNumLiterals(); if (UNLIKELY(num_literals == 0)) return false; - uint32_t num_to_decode = std::min<uint32_t>(num_literals, DECODED_BUFFER_SIZE); + DCHECK_GT(num_literals, 0); + int32_t num_to_decode = std::min(num_literals, DICT_DECODER_BUFFER_SIZE); if (UNLIKELY(!data_decoder_.DecodeLiteralValues( num_to_decode, dict_.data(), dict_.size(), &decoded_values_[0]))) { return false; http://git-wip-us.apache.org/repos/asf/impala/blob/10187bff/be/src/util/rle-encoding.h ---------------------------------------------------------------------- diff --git a/be/src/util/rle-encoding.h b/be/src/util/rle-encoding.h index 1491066..babb0fd 100644 --- a/be/src/util/rle-encoding.h +++ b/be/src/util/rle-encoding.h @@ -323,7 +323,8 @@ inline bool RleEncoder::Put(uint64_t value) { DCHECK(bit_width_ == 64 || value < (1LL << bit_width_)); if (UNLIKELY(buffer_full_)) return false; - if (LIKELY(current_value_ == value)) { + if (LIKELY(current_value_ == value + && repeat_count_ <= std::numeric_limits<int32_t>::max())) { ++repeat_count_; if (repeat_count_ > 8) { // This is just a continuation of the current run, no need to buffer the @@ -333,8 +334,9 @@ inline bool RleEncoder::Put(uint64_t value) { } } else { if (repeat_count_ >= 8) { - // We had a run that was long enough but it has ended. Flush the - // current repeated run. + // We had a run that was long enough but it ended, either because of a different + // value or because it exceeded the maximum run length. Flush the current repeated + // run. DCHECK_EQ(literal_count_, 0); FlushRepeatedRun(); } @@ -384,8 +386,8 @@ inline void RleEncoder::FlushRepeatedRun() { DCHECK_GT(repeat_count_, 0); bool result = true; // The lsb of 0 indicates this is a repeated run - int32_t indicator_value = repeat_count_ << 1 | 0; - result &= bit_writer_.PutVlqInt(indicator_value); + uint32_t indicator_value = static_cast<uint32_t>(repeat_count_) << 1; + result &= bit_writer_.PutUleb128Int(indicator_value); result &= bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8)); DCHECK(result); num_buffered_values_ = 0; @@ -603,21 +605,28 @@ inline void RleBatchDecoder<T>::NextCounts() { DCHECK_EQ(0, literal_count_); DCHECK_EQ(0, repeat_count_); // Read the next run's indicator int, it could be a literal or repeated run. - // The int is encoded as a vlq-encoded value. - int32_t indicator_value = 0; - if (UNLIKELY(!bit_reader_.GetVlqInt(&indicator_value))) return; + // The int is encoded as a ULEB128-encoded value. + uint32_t indicator_value = 0; + if (UNLIKELY(!bit_reader_.GetUleb128Int(&indicator_value))) return; // lsb indicates if it is a literal run or repeated run bool is_literal = indicator_value & 1; + // Don't try to handle run lengths that don't fit in an int32_t - just fail gracefully. + // The Parquet standard does not allow longer runs - see PARQUET-1290. + uint32_t run_len = indicator_value >> 1; + DCHECK_LE(run_len, std::numeric_limits<int32_t>::max()) + << "Right-shifted uint32_t should fit in int32_t"; if (is_literal) { - literal_count_ = (indicator_value >> 1) * 8; + // Use int64_t to avoid overflowing multiplication. + int64_t literal_count = static_cast<int64_t>(run_len) * 8; + if (UNLIKELY(literal_count > std::numeric_limits<int32_t>::max())) return; + literal_count_ = literal_count; } else { - int32_t repeat_count = indicator_value >> 1; - if (UNLIKELY(repeat_count == 0)) return; + if (UNLIKELY(run_len == 0)) return; bool result = bit_reader_.GetBytes<T>(BitUtil::Ceil(bit_width_, 8), &repeated_value_); if (UNLIKELY(!result)) return; - repeat_count_ = repeat_count; + repeat_count_ = run_len; } } @@ -668,10 +677,10 @@ inline int32_t RleBatchDecoder<T>::GetValues(int32_t num_values_to_consume, T* v int32_t num_consumed = 0; while (num_consumed < num_values_to_consume) { // Add RLE encoded values by repeating the current value this number of times. - uint32_t num_repeats = NextNumRepeats(); + int32_t num_repeats = NextNumRepeats(); if (num_repeats > 0) { - uint32_t num_repeats_to_set = - std::min<uint32_t>(num_repeats, num_values_to_consume - num_consumed); + int32_t num_repeats_to_set = + std::min(num_repeats, num_values_to_consume - num_consumed); T repeated_value = GetRepeatedValue(num_repeats_to_set); for (int i = 0; i < num_repeats_to_set; ++i) { values[num_consumed + i] = repeated_value; @@ -681,10 +690,10 @@ inline int32_t RleBatchDecoder<T>::GetValues(int32_t num_values_to_consume, T* v } // Add remaining literal values, if any. - uint32_t num_literals = NextNumLiterals(); + int32_t num_literals = NextNumLiterals(); if (num_literals == 0) break; - uint32_t num_literals_to_set = - std::min<uint32_t>(num_literals, num_values_to_consume - num_consumed); + int32_t num_literals_to_set = + std::min(num_literals, num_values_to_consume - num_consumed); if (!GetLiteralValues(num_literals_to_set, values + num_consumed)) { return 0; } http://git-wip-us.apache.org/repos/asf/impala/blob/10187bff/be/src/util/rle-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/rle-test.cc b/be/src/util/rle-test.cc index 217d43e..2fdd355 100644 --- a/be/src/util/rle-test.cc +++ b/be/src/util/rle-test.cc @@ -484,6 +484,45 @@ TEST(Rle, ZeroLiteralOrRepeatCount) { } } +// Regression test for handing of repeat counts >= 2^31: IMPALA-6946. +TEST(Rle, RepeatCountOverflow) { + const int BUFFER_LEN = 1024; + uint8_t buffer[BUFFER_LEN]; + + for (bool literal_run : {true, false}) { + memset(buffer, 0, BUFFER_LEN); + LOG(INFO) << "Testing negative " << (literal_run ? "literal" : "repeated"); + BitWriter writer(buffer, BUFFER_LEN); + // Literal runs have lowest bit 1. Repeated runs have lowest bit 0. All other bits + // are 1. + const uint32_t REPEATED_RUN_HEADER = 0xfffffffe; + const uint32_t LITERAL_RUN_HEADER = 0xffffffff; + writer.PutUleb128Int(literal_run ? LITERAL_RUN_HEADER : REPEATED_RUN_HEADER); + writer.Flush(); + + RleBatchDecoder<uint64_t> decoder(buffer, BUFFER_LEN, 1); + // Repeated run length fits in an int32_t. + if (literal_run) { + EXPECT_EQ(0, decoder.NextNumRepeats()) << "Not a repeated run"; + // Literal run length would overflow int32_t - should gracefully fail decoding. + EXPECT_EQ(0, decoder.NextNumLiterals()); + } else { + EXPECT_EQ(0x7fffffff, decoder.NextNumRepeats()); + EXPECT_EQ(0, decoder.NextNumLiterals()) << "Not a literal run"; + } + + // IMPALA-6946: reading back run lengths that don't fit in int32_t hit various + // DCHECKs. + uint64_t val; + if (literal_run) { + EXPECT_EQ(0, decoder.GetValues(1, &val)) << "Decoding failed above."; + } else { + EXPECT_EQ(1, decoder.GetValues(1, &val)); + EXPECT_EQ(0, val) << "Buffer was initialized with all zeroes"; + } + } +} + } IMPALA_TEST_MAIN();