IMPALA-6946: handle negative counts in RLE decoder

This improves the handling of out-of-range values to avoid hitting various
DCHECKs, including the one in the JIRA. repeat_count_ and literal_count_
are int32_ts. Avoid setting them to a negative value directly or by
integer overflow.

Switch to using uint32_t for "VLQ" encoding, which should be ULEB-128
encoding according to the Parquet standard. This fixes an infinite loop
in PutVlqInt() for negative values - the bug was that shifting right
sign-extended the signed value.

Testing:
Added backend test to exercise handling of large literal and repeat
counts that don't fit in an int32_t. Before these fixes it could trigger
several DCHECKs.

Change-Id: If75ef3fb12494209918c100f26407cd93b17addb
Reviewed-on: http://gerrit.cloudera.org:8080/10233
Reviewed-by: Lars Volker <l...@cloudera.com>
Reviewed-by: Dan Hecht <dhe...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/10187bff
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/10187bff
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/10187bff

Branch: refs/heads/master
Commit: 10187bfff4b926945d648f42290be60a1367e43e
Parents: 14f6c24
Author: Tim Armstrong <tarmstr...@cloudera.com>
Authored: Fri Apr 27 10:21:15 2018 -0700
Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Committed: Mon May 7 21:50:56 2018 +0000

----------------------------------------------------------------------
 be/src/util/bit-stream-utils.h        | 18 ++++++------
 be/src/util/bit-stream-utils.inline.h |  4 +--
 be/src/util/dict-encoding.h           | 23 ++++++++-------
 be/src/util/rle-encoding.h            | 45 ++++++++++++++++++------------
 be/src/util/rle-test.cc               | 39 ++++++++++++++++++++++++++
 5 files changed, 90 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/10187bff/be/src/util/bit-stream-utils.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-stream-utils.h b/be/src/util/bit-stream-utils.h
index e527dc8..67f1a00 100644
--- a/be/src/util/bit-stream-utils.h
+++ b/be/src/util/bit-stream-utils.h
@@ -61,11 +61,10 @@ class BitWriter {
   template<typename T>
   bool PutAligned(T v, int num_bytes);
 
-  /// Write a Vlq encoded int to the buffer.  Returns false if there was not 
enough
-  /// room.  The value is written byte aligned.
-  /// For more details on vlq:
-  /// en.wikipedia.org/wiki/Variable-length_quantity
-  bool PutVlqInt(int32_t v);
+  /// Write an unsigned ULEB-128 encoded int to the buffer. Return false if 
there was not
+  /// enough room. The value is written byte aligned. For more details on 
ULEB-128:
+  /// https://en.wikipedia.org/wiki/LEB128
+  bool PutUleb128Int(uint32_t v);
 
   /// Get a pointer to the next aligned byte and advance the underlying buffer
   /// by num_bytes.
@@ -148,10 +147,11 @@ class BatchedBitReader {
   template<typename T>
   bool GetBytes(int num_bytes, T* v);
 
-  /// Reads a vlq encoded int from the stream.  The encoded int must start at 
the
-  /// beginning of a byte. Return false if there were not enough bytes in the 
buffer or
-  /// the int is invalid.
-  bool GetVlqInt(int32_t* v);
+  /// Read an unsigned ULEB-128 encoded int from the stream. The encoded int 
must start
+  /// at the beginning of a byte. Return false if there were not enough bytes 
in the
+  /// buffer or the int is invalid. For more details on ULEB-128:
+  /// https://en.wikipedia.org/wiki/LEB128
+  bool GetUleb128Int(uint32_t* v);
 
   /// Returns the number of bytes left in the stream.
   int bytes_left() { return buffer_end_ - buffer_pos_; }

http://git-wip-us.apache.org/repos/asf/impala/blob/10187bff/be/src/util/bit-stream-utils.inline.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-stream-utils.inline.h 
b/be/src/util/bit-stream-utils.inline.h
index 3974492..b85b252 100644
--- a/be/src/util/bit-stream-utils.inline.h
+++ b/be/src/util/bit-stream-utils.inline.h
@@ -76,7 +76,7 @@ inline bool BitWriter::PutAligned(T val, int num_bytes) {
   return true;
 }
 
-inline bool BitWriter::PutVlqInt(int32_t v) {
+inline bool BitWriter::PutUleb128Int(uint32_t v) {
   bool result = true;
   while ((v & 0xFFFFFF80) != 0L) {
     result &= PutAligned<uint8_t>((v & 0x7F) | 0x80, 1);
@@ -135,7 +135,7 @@ inline bool BatchedBitReader::GetBytes(int num_bytes, T* v) 
{
   return true;
 }
 
-inline bool BatchedBitReader::GetVlqInt(int32_t* v) {
+inline bool BatchedBitReader::GetUleb128Int(uint32_t* v) {
   *v = 0;
   int shift = 0;
   uint8_t byte = 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/10187bff/be/src/util/dict-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index d916bab..a76d9b5 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -230,6 +230,13 @@ class DictEncoder : public DictEncoderBase {
   int AddToTable(const T& value, NodeIndex* bucket);
 };
 
+/// Number of decoded values to buffer at a time. A multiple of 32 is chosen 
to allow
+/// efficient reading in batches from data_decoder_. Increasing the batch size 
up to
+/// 128 seems to improve performance, but increasing further did not make a 
noticeable
+/// difference. Defined outside DictDecoderBase to get static linkage because 
there is
+/// no dict-encoding.cc file.
+static constexpr int32_t DICT_DECODER_BUFFER_SIZE = 128;
+
 /// Decoder class for dictionary encoded data. This class does not allocate any
 /// buffers. The input buffers (dictionary buffer and RLE buffer) must be 
maintained
 /// by the caller and valid as long as this object is.
@@ -274,12 +281,6 @@ class DictDecoderBase {
   }
 
  protected:
-  /// Number of decoded values to buffer at a time. A multiple of 32 is chosen 
to allow
-  /// efficient reading in batches from data_decoder_. Increasing the batch 
size up to
-  /// 128 seems to improve performance, but increasing further did not make a 
noticeable
-  /// difference.
-  static const int DECODED_BUFFER_SIZE = 128;
-
   RleBatchDecoder<uint32_t> data_decoder_;
 
   /// Greater than zero if we've started decoding a repeated run.
@@ -362,7 +363,7 @@ class DictDecoder : public DictDecoderBase {
   /// a repeated run, the first element is the current dict value. If in a 
literal run,
   /// this contains 'num_literal_values_' values, with the next value to be 
returned at
   /// 'next_literal_idx_'.
-  T decoded_values_[DECODED_BUFFER_SIZE];
+  T decoded_values_[DICT_DECODER_BUFFER_SIZE];
 
   /// Slow path for GetNextValue() where we need to decode new values. Should 
not be
   /// inlined everywhere.
@@ -450,7 +451,8 @@ template <typename T>
 bool DictDecoder<T>::DecodeNextValue(T* value) {
   // IMPALA-959: Use memcpy() instead of '=' to set *value: addresses are not 
always 16
   // byte aligned for Decimal16Values.
-  uint32_t num_repeats = data_decoder_.NextNumRepeats();
+  int32_t num_repeats = data_decoder_.NextNumRepeats();
+  DCHECK_GE(num_repeats, 0);
   if (num_repeats > 0) {
     uint32_t idx = data_decoder_.GetRepeatedValue(num_repeats);
     if (UNLIKELY(idx >= dict_.size())) return false;
@@ -459,10 +461,11 @@ bool DictDecoder<T>::DecodeNextValue(T* value) {
     num_repeats_ = num_repeats - 1;
     return true;
   } else {
-    uint32_t num_literals = data_decoder_.NextNumLiterals();
+    int32_t num_literals = data_decoder_.NextNumLiterals();
     if (UNLIKELY(num_literals == 0)) return false;
 
-    uint32_t num_to_decode = std::min<uint32_t>(num_literals, 
DECODED_BUFFER_SIZE);
+    DCHECK_GT(num_literals, 0);
+    int32_t num_to_decode = std::min(num_literals, DICT_DECODER_BUFFER_SIZE);
     if (UNLIKELY(!data_decoder_.DecodeLiteralValues(
             num_to_decode, dict_.data(), dict_.size(), &decoded_values_[0]))) {
       return false;

http://git-wip-us.apache.org/repos/asf/impala/blob/10187bff/be/src/util/rle-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/rle-encoding.h b/be/src/util/rle-encoding.h
index 1491066..babb0fd 100644
--- a/be/src/util/rle-encoding.h
+++ b/be/src/util/rle-encoding.h
@@ -323,7 +323,8 @@ inline bool RleEncoder::Put(uint64_t value) {
   DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
   if (UNLIKELY(buffer_full_)) return false;
 
-  if (LIKELY(current_value_ == value)) {
+  if (LIKELY(current_value_ == value
+      && repeat_count_ <= std::numeric_limits<int32_t>::max())) {
     ++repeat_count_;
     if (repeat_count_ > 8) {
       // This is just a continuation of the current run, no need to buffer the
@@ -333,8 +334,9 @@ inline bool RleEncoder::Put(uint64_t value) {
     }
   } else {
     if (repeat_count_ >= 8) {
-      // We had a run that was long enough but it has ended.  Flush the
-      // current repeated run.
+      // We had a run that was long enough but it ended, either because of a 
different
+      // value or because it exceeded the maximum run length. Flush the 
current repeated
+      // run.
       DCHECK_EQ(literal_count_, 0);
       FlushRepeatedRun();
     }
@@ -384,8 +386,8 @@ inline void RleEncoder::FlushRepeatedRun() {
   DCHECK_GT(repeat_count_, 0);
   bool result = true;
   // The lsb of 0 indicates this is a repeated run
-  int32_t indicator_value = repeat_count_ << 1 | 0;
-  result &= bit_writer_.PutVlqInt(indicator_value);
+  uint32_t indicator_value = static_cast<uint32_t>(repeat_count_) << 1;
+  result &= bit_writer_.PutUleb128Int(indicator_value);
   result &= bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 
8));
   DCHECK(result);
   num_buffered_values_ = 0;
@@ -603,21 +605,28 @@ inline void RleBatchDecoder<T>::NextCounts() {
   DCHECK_EQ(0, literal_count_);
   DCHECK_EQ(0, repeat_count_);
   // Read the next run's indicator int, it could be a literal or repeated run.
-  // The int is encoded as a vlq-encoded value.
-  int32_t indicator_value = 0;
-  if (UNLIKELY(!bit_reader_.GetVlqInt(&indicator_value))) return;
+  // The int is encoded as a ULEB128-encoded value.
+  uint32_t indicator_value = 0;
+  if (UNLIKELY(!bit_reader_.GetUleb128Int(&indicator_value))) return;
 
   // lsb indicates if it is a literal run or repeated run
   bool is_literal = indicator_value & 1;
+  // Don't try to handle run lengths that don't fit in an int32_t - just fail 
gracefully.
+  // The Parquet standard does not allow longer runs - see PARQUET-1290.
+  uint32_t run_len = indicator_value >> 1;
+  DCHECK_LE(run_len, std::numeric_limits<int32_t>::max())
+      << "Right-shifted uint32_t should fit in int32_t";
   if (is_literal) {
-    literal_count_ = (indicator_value >> 1) * 8;
+    // Use int64_t to avoid overflowing multiplication.
+    int64_t literal_count = static_cast<int64_t>(run_len) * 8;
+    if (UNLIKELY(literal_count > std::numeric_limits<int32_t>::max())) return;
+    literal_count_ = literal_count;
   } else {
-    int32_t repeat_count = indicator_value >> 1;
-    if (UNLIKELY(repeat_count == 0)) return;
+    if (UNLIKELY(run_len == 0)) return;
     bool result =
         bit_reader_.GetBytes<T>(BitUtil::Ceil(bit_width_, 8), 
&repeated_value_);
     if (UNLIKELY(!result)) return;
-    repeat_count_ = repeat_count;
+    repeat_count_ = run_len;
   }
 }
 
@@ -668,10 +677,10 @@ inline int32_t RleBatchDecoder<T>::GetValues(int32_t 
num_values_to_consume, T* v
   int32_t num_consumed = 0;
   while (num_consumed < num_values_to_consume) {
     // Add RLE encoded values by repeating the current value this number of 
times.
-    uint32_t num_repeats = NextNumRepeats();
+    int32_t num_repeats = NextNumRepeats();
     if (num_repeats > 0) {
-       uint32_t num_repeats_to_set =
-           std::min<uint32_t>(num_repeats, num_values_to_consume - 
num_consumed);
+       int32_t num_repeats_to_set =
+           std::min(num_repeats, num_values_to_consume - num_consumed);
        T repeated_value = GetRepeatedValue(num_repeats_to_set);
        for (int i = 0; i < num_repeats_to_set; ++i) {
          values[num_consumed + i] = repeated_value;
@@ -681,10 +690,10 @@ inline int32_t RleBatchDecoder<T>::GetValues(int32_t 
num_values_to_consume, T* v
     }
 
     // Add remaining literal values, if any.
-    uint32_t num_literals = NextNumLiterals();
+    int32_t num_literals = NextNumLiterals();
     if (num_literals == 0) break;
-    uint32_t num_literals_to_set =
-        std::min<uint32_t>(num_literals, num_values_to_consume - num_consumed);
+    int32_t num_literals_to_set =
+        std::min(num_literals, num_values_to_consume - num_consumed);
     if (!GetLiteralValues(num_literals_to_set, values + num_consumed)) {
       return 0;
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/10187bff/be/src/util/rle-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/rle-test.cc b/be/src/util/rle-test.cc
index 217d43e..2fdd355 100644
--- a/be/src/util/rle-test.cc
+++ b/be/src/util/rle-test.cc
@@ -484,6 +484,45 @@ TEST(Rle, ZeroLiteralOrRepeatCount) {
   }
 }
 
+// Regression test for handing of repeat counts >= 2^31: IMPALA-6946.
+TEST(Rle, RepeatCountOverflow) {
+  const int BUFFER_LEN = 1024;
+  uint8_t buffer[BUFFER_LEN];
+
+  for (bool literal_run : {true, false}) {
+    memset(buffer, 0, BUFFER_LEN);
+    LOG(INFO) << "Testing negative " << (literal_run ? "literal" : "repeated");
+    BitWriter writer(buffer, BUFFER_LEN);
+    // Literal runs have lowest bit 1. Repeated runs have lowest bit 0. All 
other bits
+    // are 1.
+    const uint32_t REPEATED_RUN_HEADER = 0xfffffffe;
+    const uint32_t LITERAL_RUN_HEADER = 0xffffffff;
+    writer.PutUleb128Int(literal_run ? LITERAL_RUN_HEADER : 
REPEATED_RUN_HEADER);
+    writer.Flush();
+
+    RleBatchDecoder<uint64_t> decoder(buffer, BUFFER_LEN, 1);
+    // Repeated run length fits in an int32_t.
+    if (literal_run) {
+      EXPECT_EQ(0, decoder.NextNumRepeats()) << "Not a repeated run";
+      // Literal run length would overflow int32_t - should gracefully fail 
decoding.
+      EXPECT_EQ(0, decoder.NextNumLiterals());
+    } else {
+      EXPECT_EQ(0x7fffffff, decoder.NextNumRepeats());
+      EXPECT_EQ(0, decoder.NextNumLiterals()) << "Not a literal run";
+    }
+
+    // IMPALA-6946: reading back run lengths that don't fit in int32_t hit 
various
+    // DCHECKs.
+    uint64_t val;
+    if (literal_run) {
+      EXPECT_EQ(0, decoder.GetValues(1, &val)) << "Decoding failed above.";
+    } else {
+      EXPECT_EQ(1, decoder.GetValues(1, &val));
+      EXPECT_EQ(0, val) << "Buffer was initialized with all zeroes";
+    }
+  }
+}
+
 }
 
 IMPALA_TEST_MAIN();

Reply via email to