http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/parquet-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc
index d5b0d01..3c83c23 100644
--- a/be/src/util/parquet-reader.cc
+++ b/be/src/util/parquet-reader.cc
@@ -49,6 +49,7 @@ using namespace apache::thrift::protocol;
 using namespace apache::thrift::transport;
 using namespace apache::thrift;
 using namespace parquet;
+using impala::RleBatchDecoder;
 using std::min;
 
 using impala::PARQUET_VERSION_NUMBER;
@@ -119,15 +120,6 @@ string GetSchema(const FileMetaData& md) {
   return ss.str();
 }
 
-// Inherit from RleDecoder to get access to repeat_count_, which is protected.
-class ParquetLevelReader : public impala::RleDecoder {
- public:
-  ParquetLevelReader(uint8_t* buffer, int buffer_len, int bit_width)
-    : RleDecoder(buffer, buffer_len, bit_width) {}
-
-  uint32_t repeat_count() const { return repeat_count_; }
-};
-
 // Performs sanity checking on the contents of data pages, to ensure that:
 //   - Compressed pages can be uncompressed successfully.
 //   - The number of def levels matches num_values in the page header when 
using RLE.
@@ -163,18 +155,19 @@ int CheckDataPage(const ColumnChunk& col, const 
PageHeader& header, const uint8_
     // Parquet data pages always start with the encoded definition level data, 
and
     // RLE sections in Parquet always start with a 4 byte length followed by 
the data.
     int num_def_level_bytes = *reinterpret_cast<const int32_t*>(data);
-    ParquetLevelReader def_levels(const_cast<uint8_t*>(data) + sizeof(int32_t),
+    RleBatchDecoder<uint8_t> def_levels(const_cast<uint8_t*>(data) + 
sizeof(int32_t),
         num_def_level_bytes, sizeof(uint8_t));
     uint8_t level;
     for (int i = 0; i < header.data_page_header.num_values; ++i) {
-      if (!def_levels.Get(&level)) {
+      if (!def_levels.GetSingleValue(&level)) {
         cerr << "Error: Decoding of def levels failed.\n";
         exit(1);
       }
 
-      if (i + def_levels.repeat_count() + 1 > 
header.data_page_header.num_values) {
-        cerr << "Error: More def levels encoded (" << (i + 
def_levels.repeat_count() + 1)
-             << ") than num_values (" << header.data_page_header.num_values << 
").\n";
+      if (i + def_levels.NextNumRepeats() + 1 > 
header.data_page_header.num_values) {
+        cerr << "Error: More def levels encoded ("
+              << (i + def_levels.NextNumRepeats() + 1) << ") than num_values ("
+              << header.data_page_header.num_values << ").\n";
         exit(1);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/rle-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/rle-encoding.h b/be/src/util/rle-encoding.h
index 2e2dd7f..861bf8e 100644
--- a/be/src/util/rle-encoding.h
+++ b/be/src/util/rle-encoding.h
@@ -76,49 +76,116 @@ namespace impala {
 /// (total 26 bytes, 1 byte overhead)
 //
 
-/// Decoder class for RLE encoded data.
-class RleDecoder {
+/// RLE decoder with a batch-oriented interface that enables fast decoding.
+/// Users of this class must first initialize the class to point to a buffer of
+/// RLE-encoded data, passed into the constructor or Reset(). Then they can
+/// decode data by checking NextNumRepeats()/NextNumLiterals() to see if the
+/// next run is a repeated or literal run, then calling GetRepeatedValue()
+/// or GetLiteralValues() respectively to read the values.
+///
+/// End-of-input is signalled by NextNumRepeats() == NextNumLiterals() == 0.
+/// Other decoding errors are signalled by functions returning false. If an
+/// error is encountered then it is not valid to read any more data until
+/// Reset() is called.
+template <typename T>
+class RleBatchDecoder {
  public:
-  /// Create a decoder object. buffer/buffer_len is the decoded data.
-  /// bit_width is the width of each value (before encoding).
-  RleDecoder(uint8_t* buffer, int buffer_len, int bit_width)
-    : bit_reader_(buffer, buffer_len),
-      bit_width_(bit_width),
-      current_value_(0),
-      repeat_count_(0),
-      literal_count_(0) {
-    DCHECK_GE(bit_width_, 0);
-    DCHECK_LE(bit_width_, BitReader::MAX_BITWIDTH);
+  RleBatchDecoder(uint8_t* buffer, int buffer_len, int bit_width) {
+    Reset(buffer, buffer_len, bit_width);
   }
+  RleBatchDecoder() : bit_width_(-1) {}
+
+  /// Reset the decoder to read from a new buffer.
+  void Reset(uint8_t* buffer, int buffer_len, int bit_width);
+
+  /// Return the size of the current repeated run. Returns zero if the current 
run is
+  /// a literal run or if no more runs can be read from the input.
+  int32_t NextNumRepeats();
+
+  /// Get the value of the current repeated run and consume the given number 
of repeats.
+  /// Only valid to call when NextNumRepeats() > 0. The given number of 
repeats cannot
+  /// be greater than the remaining number of repeats in the run.
+  T GetRepeatedValue(int32_t num_repeats_to_consume);
+
+  /// Return the size of the current literal run. Returns zero if the current 
run is
+  /// a repeated run or if no more runs can be read from the input.
+  int32_t NextNumLiterals();
+
+  /// Consume 'num_literals_to_consume' literals from the current literal run,
+  /// copying the values to 'values'. 'num_literals_to_consume' must be <=
+  /// NextNumLiterals(). Returns true if the requested number of literals were
+  /// successfully read or false if an error was encountered, e.g. the input 
was
+  /// truncated.
+  bool GetLiteralValues(int32_t num_literals_to_consume, T* values) 
WARN_UNUSED_RESULT;
+
+  /// Consume 'num_literals_to_consume' literals from the current literal run,
+  /// decoding them using 'dict' and outputting them to 'values'.
+  /// 'num_literals_to_consume' must be <= NextNumLiterals(). Returns true if
+  /// the requested number of literals were successfully read or false if an 
error
+  /// was encountered, e.g. the input was truncated or the value was not 
present
+  /// in the dictionary. Errors can only be recovered from by calling Reset()
+  /// to read from a new buffer.
+  template <typename OutType>
+  bool DecodeLiteralValues(int32_t num_literals_to_consume, OutType* dict,
+      int64_t dict_len, OutType* values) WARN_UNUSED_RESULT;
+
+  /// Convenience method to get the next value. Not efficient. Returns true on 
success
+  /// or false if no more values can be read from the input or an error was 
encountered
+  /// decoding the values.
+  bool GetSingleValue(T* val) WARN_UNUSED_RESULT;
 
-  RleDecoder() : bit_width_(-1) {}
+ private:
+  BatchedBitReader bit_reader_;
 
-  void Reset(uint8_t* buffer, int buffer_len, int bit_width) {
-    DCHECK_GE(bit_width, 0);
-    DCHECK_LE(bit_width, BitReader::MAX_BITWIDTH);
-    bit_reader_.Reset(buffer, buffer_len);
-    bit_width_ = bit_width;
-    current_value_ = 0;
-    repeat_count_ = 0;
-    literal_count_ = 0;
-  }
+  /// Number of bits needed to encode the value. Must be between 0 and 64.
+  int bit_width_;
 
-  /// Gets the next value.  Returns false if there are no more.
-  template<typename T>
-  bool Get(T* val);
+  /// If a repeated run, the number of repeats remaining in the current run to 
be read.
+  /// If the current run is a literal run, this is 0.
+  int32_t repeat_count_;
 
- protected:
-  /// Fills literal_count_ and repeat_count_ with next values. Returns false 
if there
-  /// are no more.
-  template<typename T>
-  bool NextCounts();
+  /// If a literal run, the number of literals remaining in the current run to 
be read.
+  /// If the current run is a repeated run, this is 0.
+  int32_t literal_count_;
 
-  BitReader bit_reader_;
-  /// Number of bits needed to encode the value. Must be between 0 and 64.
-  int bit_width_;
-  uint64_t current_value_;
-  uint32_t repeat_count_;
-  uint32_t literal_count_;
+  /// If a repeated run, the current repeated value.
+  T repeated_value_;
+
+  /// Size of buffer for literal values. Large enough to decode a full batch 
of 32
+  /// literals. The buffer is needed to allow clients to read in batches that 
are not
+  /// multiples of 32.
+  static constexpr int LITERAL_BUFFER_LEN = 32;
+
+  /// Buffer containing 'num_buffered_literals_' values. 'literal_buffer_pos_' 
is the
+  /// position of the next literal to be read from the buffer.
+  T literal_buffer_[LITERAL_BUFFER_LEN];
+  int num_buffered_literals_;
+  int literal_buffer_pos_;
+
+  /// Called when both 'literal_count_' and 'repeat_count_' have been 
exhausted.
+  /// Sets either 'literal_count_' or 'repeat_count_' to the size of the next 
literal
+  /// or repeated run, or leaves both at 0 if no more values can be read 
(either because
+  /// the end of the input was reached or an error was encountered decoding).
+  void NextCounts();
+
+  /// Fill the literal buffer. Invalid to call if there are already buffered 
literals.
+  /// Return false if the input was truncated. This does not advance 
'literal_count_'.
+  bool FillLiteralBuffer() WARN_UNUSED_RESULT;
+
+  bool HaveBufferedLiterals() const {
+    return literal_buffer_pos_ < num_buffered_literals_;
+  }
+
+  /// Output buffered literals, advancing 'literal_buffer_pos_' and 
decrementing
+  /// 'literal_count_'. Returns the number of literals outputted.
+  int32_t OutputBufferedLiterals(int32_t max_to_output, T* values);
+
+  /// Output buffered literals, advancing 'literal_buffer_pos_' and 
decrementing
+  /// 'literal_count_'. Returns the number of literals outputted or 0 if a
+  /// decoding error is encountered.
+  template <typename OutType>
+  int32_t DecodeBufferedLiterals(
+      int32_t max_to_output, OutType* dict, int64_t dict_len, OutType* values);
 };
 
 /// Class to incrementally build the rle data.   This class does not allocate 
any memory.
@@ -153,7 +220,8 @@ class RleEncoder {
     int max_literal_run_size = 1 +
         BitUtil::Ceil(MAX_VALUES_PER_LITERAL_RUN * bit_width, 8);
     /// Up to MAX_VLQ_BYTE_LEN indicator and a single 'bit_width' value.
-    int max_repeated_run_size = BitReader::MAX_VLQ_BYTE_LEN + 
BitUtil::Ceil(bit_width, 8);
+    int max_repeated_run_size =
+        BatchedBitReader::MAX_VLQ_BYTE_LEN + BitUtil::Ceil(bit_width, 8);
     return std::max(max_literal_run_size, max_repeated_run_size);
   }
 
@@ -167,7 +235,7 @@ class RleEncoder {
 
   /// Encode value.  Returns true if the value fits in buffer, false otherwise.
   /// This value must be representable with bit_width_ bits.
-  bool Put(uint64_t value);
+  bool Put(uint64_t value) WARN_UNUSED_RESULT;
 
   /// Flushes any pending values to the underlying buffer.
   /// Returns the total number of bytes written
@@ -245,53 +313,6 @@ class RleEncoder {
   uint8_t* literal_indicator_byte_;
 };
 
-// Force inlining - this is used in perf-critical loops in Parquet and GCC 
often
-// doesn't inline it in cases where it's beneficial.
-template <typename T>
-ALWAYS_INLINE inline bool RleDecoder::Get(T* val) {
-  DCHECK_GE(bit_width_, 0);
-  // Profiling has shown that the quality and performance of the generated 
code is very
-  // sensitive to the exact shape of this check. For example, the version 
below performs
-  // significantly better than UNLIKELY(literal_count_ == 0 && repeat_count_ 
== 0)
-  if (repeat_count_ == 0) {
-    if (literal_count_ == 0) {
-      if (!NextCounts<T>()) return false;
-    }
-  }
-
-  if (LIKELY(repeat_count_ > 0)) {
-    *val = current_value_;
-    --repeat_count_;
-  } else {
-    DCHECK_GT(literal_count_, 0);
-    if (UNLIKELY(!bit_reader_.GetValue(bit_width_, val))) return false;
-    --literal_count_;
-  }
-
-  return true;
-}
-
-template<typename T>
-bool RleDecoder::NextCounts() {
-  // Read the next run's indicator int, it could be a literal or repeated run.
-  // The int is encoded as a vlq-encoded value.
-  int32_t indicator_value = 0;
-  if (UNLIKELY(!bit_reader_.GetVlqInt(&indicator_value))) return false;
-
-  // lsb indicates if it is a literal run or repeated run
-  bool is_literal = indicator_value & 1;
-  if (is_literal) {
-    literal_count_ = (indicator_value >> 1) * 8;
-    if (UNLIKELY(literal_count_ == 0)) return false;
-  } else {
-    repeat_count_ = indicator_value >> 1;
-    bool result = bit_reader_.GetAligned<T>(
-        BitUtil::Ceil(bit_width_, 8), reinterpret_cast<T*>(&current_value_));
-    if (UNLIKELY(!result || repeat_count_ == 0)) return false;
-  }
-  return true;
-}
-
 /// This function buffers input values 8 at a time.  After seeing all 8 values,
 /// it decides whether they should be encoded as a literal or repeated run.
 inline bool RleEncoder::Put(uint64_t value) {
@@ -444,5 +465,197 @@ inline void RleEncoder::Clear() {
   bit_writer_.Clear();
 }
 
+template <typename T>
+inline void RleBatchDecoder<T>::Reset(uint8_t* buffer, int buffer_len, int 
bit_width) {
+  DCHECK_GE(bit_width, 0);
+  DCHECK_LE(bit_width, BatchedBitReader::MAX_BITWIDTH);
+  bit_reader_.Reset(buffer, buffer_len);
+  bit_width_ = bit_width;
+  repeat_count_ = 0;
+  literal_count_ = 0;
+  num_buffered_literals_ = 0;
+  literal_buffer_pos_ = 0;
+}
+
+template <typename T>
+inline int32_t RleBatchDecoder<T>::NextNumRepeats() {
+  if (repeat_count_ > 0) return repeat_count_;
+  if (literal_count_ == 0) NextCounts();
+  return repeat_count_;
+}
+
+template <typename T>
+inline T RleBatchDecoder<T>::GetRepeatedValue(int32_t num_repeats_to_consume) {
+  DCHECK_GT(num_repeats_to_consume, 0);
+  DCHECK_GE(repeat_count_, num_repeats_to_consume);
+  repeat_count_ -= num_repeats_to_consume;
+  return repeated_value_;
+}
+
+template <typename T>
+inline int32_t RleBatchDecoder<T>::NextNumLiterals() {
+  if (literal_count_ > 0) return literal_count_;
+  if (repeat_count_ == 0) NextCounts();
+  return literal_count_;
 }
+
+template <typename T>
+inline bool RleBatchDecoder<T>::GetLiteralValues(
+    int32_t num_literals_to_consume, T* values) {
+  DCHECK_GE(num_literals_to_consume, 0);
+  DCHECK_GE(literal_count_, num_literals_to_consume);
+  int32_t num_consumed = 0;
+  // Copy any buffered literals left over from previous calls.
+  if (HaveBufferedLiterals()) {
+    num_consumed = OutputBufferedLiterals(num_literals_to_consume, values);
+  }
+
+  int32_t num_remaining = num_literals_to_consume - num_consumed;
+  // Copy literals directly to the output, bypassing 'literal_buffer_' when 
possible.
+  // Need to round to a batch of 32 if the caller is consuming only part of 
the current
+  // run avoid ending on a non-byte boundary.
+  int32_t num_to_bypass = std::min<int32_t>(literal_count_,
+      BitUtil::RoundDownToPowerOf2(num_remaining, 32));
+  if (num_to_bypass > 0) {
+    int num_read =
+        bit_reader_.UnpackBatch(bit_width_, num_to_bypass, values + 
num_consumed);
+    // If we couldn't read the expected number, that means the input was 
truncated.
+    if (num_read < num_to_bypass) return false;
+    literal_count_ -= num_to_bypass;
+    num_consumed += num_to_bypass;
+    num_remaining = num_literals_to_consume - num_consumed;
+  }
+
+  if (num_remaining > 0) {
+    // We weren't able to copy all the literals requested directly from the 
input.
+    // Buffer literals and copy over the requested number.
+    if (UNLIKELY(!FillLiteralBuffer())) return false;
+    int32_t num_copied = OutputBufferedLiterals(num_remaining, values + 
num_consumed);
+    DCHECK_EQ(num_copied, num_remaining) << "Should have buffered enough 
literals";
+  }
+  return true;
+}
+
+template <typename T>
+template <typename OutType>
+inline bool RleBatchDecoder<T>::DecodeLiteralValues(
+    int32_t num_literals_to_consume, OutType* dict, int64_t dict_len, OutType* 
values) {
+  DCHECK_GE(num_literals_to_consume, 0);
+  DCHECK_GE(literal_count_, num_literals_to_consume);
+  int32_t num_consumed = 0;
+  // Decode any buffered literals left over from previous calls.
+  if (HaveBufferedLiterals()) {
+    num_consumed =
+        DecodeBufferedLiterals(num_literals_to_consume, dict, dict_len, 
values);
+    if (UNLIKELY(num_consumed == 0)) return false;
+  }
+
+  int32_t num_remaining = num_literals_to_consume - num_consumed;
+  // Copy literals directly to the output, bypassing 'literal_buffer_' when 
possible.
+  // Need to round to a batch of 32 if the caller is consuming only part of 
the current
+  // run avoid ending on a non-byte boundery.
+  int32_t num_to_bypass =
+      std::min<int32_t>(literal_count_, 
BitUtil::RoundDownToPowerOf2(num_remaining, 32));
+  if (num_to_bypass > 0) {
+    int num_read = bit_reader_.UnpackAndDecodeBatch(
+        bit_width_, dict, dict_len, num_to_bypass, values + num_consumed);
+    // If we couldn't read the expected number, that means the input was 
truncated.
+    if (num_read < num_to_bypass) return false;
+    literal_count_ -= num_to_bypass;
+    num_consumed += num_to_bypass;
+    num_remaining = num_literals_to_consume - num_consumed;
+  }
+
+  if (num_remaining > 0) {
+    // We weren't able to copy all the literals requested directly from the 
input.
+    // Buffer literals and copy over the requested number.
+    if (UNLIKELY(!FillLiteralBuffer())) return false;
+    int32_t num_copied =
+        DecodeBufferedLiterals(num_remaining, dict, dict_len, values + 
num_consumed);
+    if (UNLIKELY(num_copied == 0)) return false;
+    DCHECK_EQ(num_copied, num_remaining) << "Should have buffered enough 
literals";
+  }
+  return true;
+}
+
+template <typename T>
+inline bool RleBatchDecoder<T>::GetSingleValue(T* val) {
+  if (NextNumRepeats() > 0) {
+    DCHECK_EQ(0, NextNumLiterals());
+    *val = GetRepeatedValue(1);
+    return true;
+  }
+  if (NextNumLiterals() > 0) {
+    DCHECK_EQ(0, NextNumRepeats());
+    return GetLiteralValues(1, val);
+  }
+  return false;
+}
+
+template <typename T>
+inline void RleBatchDecoder<T>::NextCounts() {
+  DCHECK_EQ(0, literal_count_);
+  DCHECK_EQ(0, repeat_count_);
+  // Read the next run's indicator int, it could be a literal or repeated run.
+  // The int is encoded as a vlq-encoded value.
+  int32_t indicator_value = 0;
+  if (UNLIKELY(!bit_reader_.GetVlqInt(&indicator_value))) return;
+
+  // lsb indicates if it is a literal run or repeated run
+  bool is_literal = indicator_value & 1;
+  if (is_literal) {
+    literal_count_ = (indicator_value >> 1) * 8;
+  } else {
+    int32_t repeat_count = indicator_value >> 1;
+    if (UNLIKELY(repeat_count == 0)) return;
+    bool result =
+        bit_reader_.GetBytes<T>(BitUtil::Ceil(bit_width_, 8), 
&repeated_value_);
+    if (UNLIKELY(!result)) return;
+    repeat_count_ = repeat_count;
+  }
+}
+
+template <typename T>
+inline bool RleBatchDecoder<T>::FillLiteralBuffer() {
+  DCHECK(!HaveBufferedLiterals());
+  int32_t num_to_buffer = std::min<int32_t>(LITERAL_BUFFER_LEN, 
literal_count_);
+  num_buffered_literals_ =
+      bit_reader_.UnpackBatch(bit_width_, num_to_buffer, literal_buffer_);
+  // If we couldn't read the expected number, that means the input was 
truncated.
+  if (UNLIKELY(num_buffered_literals_ < num_to_buffer)) return false;
+  literal_buffer_pos_ = 0;
+  return true;
+}
+
+template <typename T>
+inline int32_t RleBatchDecoder<T>::OutputBufferedLiterals(
+    int32_t max_to_output, T* values) {
+  int32_t num_to_output =
+      std::min<int32_t>(max_to_output, num_buffered_literals_ - 
literal_buffer_pos_);
+  memcpy(values, &literal_buffer_[literal_buffer_pos_], sizeof(T) * 
num_to_output);
+  literal_buffer_pos_ += num_to_output;
+  literal_count_ -= num_to_output;
+  return num_to_output;
+}
+
+template <typename T>
+template <typename OutType>
+inline int32_t RleBatchDecoder<T>::DecodeBufferedLiterals(
+    int32_t max_to_output, OutType* dict, int64_t dict_len, OutType* values) {
+  int32_t num_to_output =
+      std::min<int32_t>(max_to_output, num_buffered_literals_ - 
literal_buffer_pos_);
+  for (int32_t i = 0; i < num_to_output; ++i) {
+    T idx = literal_buffer_[literal_buffer_pos_ + i];
+    if (UNLIKELY(idx < 0 || idx >= dict_len)) return 0;
+    memcpy(&values[i], &dict[idx], sizeof(OutType));
+  }
+  literal_buffer_pos_ += num_to_output;
+  literal_count_ -= num_to_output;
+  return num_to_output;
+}
+
+template <typename T>
+constexpr int RleBatchDecoder<T>::LITERAL_BUFFER_LEN;
+}
+
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/rle-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/rle-test.cc b/be/src/util/rle-test.cc
index 0501d71..b8a1d94 100644
--- a/be/src/util/rle-test.cc
+++ b/be/src/util/rle-test.cc
@@ -30,7 +30,7 @@
 
 namespace impala {
 
-const int MAX_WIDTH = BitReader::MAX_BITWIDTH;
+const int MAX_WIDTH = BatchedBitReader::MAX_BITWIDTH;
 
 TEST(BitArray, TestBool) {
   const int len = 8;
@@ -69,29 +69,24 @@ TEST(BitArray, TestBool) {
   EXPECT_EQ((int)buffer[1], BOOST_BINARY(1 1 0 0 1 1 0 0));
 
   // Use the reader and validate
-  BitReader reader(buffer, len);
+  BatchedBitReader reader(buffer, len);
+
   // Ensure it returns the same results after Reset().
   for (int trial = 0; trial < 2; ++trial) {
-    for (int i = 0; i < 8; ++i) {
-      bool val = false;
-      bool result = reader.GetValue(1, &val);
-      EXPECT_TRUE(result);
-      EXPECT_EQ(val, i % 2);
-    }
+    bool batch_vals[16];
+    EXPECT_EQ(16, reader.UnpackBatch(1, 16, batch_vals));
+    for (int i = 0; i < 8; ++i)  EXPECT_EQ(batch_vals[i], i % 2);
 
     for (int i = 0; i < 8; ++i) {
-      bool val = false;
-      bool result = reader.GetValue(1, &val);
-      EXPECT_TRUE(result);
       switch (i) {
         case 0:
         case 1:
         case 4:
         case 5:
-          EXPECT_EQ(val, false);
+          EXPECT_EQ(batch_vals[8 + i], false);
           break;
         default:
-          EXPECT_EQ(val, true);
+          EXPECT_EQ(batch_vals[8 + i], true);
           break;
       }
     }
@@ -113,17 +108,30 @@ void TestBitArrayValues(int bit_width, int num_vals) {
   writer.Flush();
   EXPECT_EQ(writer.bytes_written(), len);
 
-  BitReader reader(buffer, len);
+  BatchedBitReader reader(buffer, len);
+  BatchedBitReader reader2(reader); // Test copy constructor.
   // Ensure it returns the same results after Reset().
   for (int trial = 0; trial < 2; ++trial) {
+    // Unpack all values at once with one batched reader and in small batches 
with the
+    // other batched reader.
+    vector<int64_t> batch_vals(num_vals);
+    const int BATCH_SIZE = 32;
+    vector<int64_t> batch_vals2(BATCH_SIZE);
+    EXPECT_EQ(num_vals,
+        reader.UnpackBatch(bit_width, num_vals, batch_vals.data()));
     for (int i = 0; i < num_vals; ++i) {
-      int64_t val;
-      bool result = reader.GetValue(bit_width, &val);
-      EXPECT_TRUE(result);
-      EXPECT_EQ(val, i % mod);
+      if (i % BATCH_SIZE == 0) {
+        int num_to_unpack = min(BATCH_SIZE, num_vals - i);
+        EXPECT_EQ(num_to_unpack,
+           reader2.UnpackBatch(bit_width, num_to_unpack, batch_vals2.data()));
+      }
+      EXPECT_EQ(i % mod, batch_vals[i]);
+      EXPECT_EQ(i % mod, batch_vals2[i % BATCH_SIZE]);
     }
     EXPECT_EQ(reader.bytes_left(), 0);
+    EXPECT_EQ(reader2.bytes_left(), 0);
     reader.Reset(buffer, len);
+    reader2.Reset(buffer, len);
   }
 }
 
@@ -137,45 +145,32 @@ TEST(BitArray, TestValues) {
   }
 }
 
-// Test some mixed values
-TEST(BitArray, TestMixed) {
-  const int len = 1024;
-  uint8_t buffer[len];
-  bool parity = true;
-
-  BitWriter writer(buffer, len);
-  for (int i = 0; i < len; ++i) {
-    bool result;
-    if (i % 2 == 0) {
-      result = writer.PutValue(parity, 1);
-      parity = !parity;
-    } else {
-      result = writer.PutValue(i, 10);
-    }
-    EXPECT_TRUE(result);
-  }
-  writer.Flush();
-
-  parity = true;
-  BitReader reader(buffer, len);
-  // Ensure it returns the same results after Reset().
-  for (int trial = 0; trial < 2; ++trial) {
-    for (int i = 0; i < len; ++i) {
-      bool result;
-      if (i % 2 == 0) {
-        bool val;
-        result = reader.GetValue(1, &val);
-        EXPECT_EQ(val, parity);
-        parity = !parity;
-      } else {
-        int val;
-        result = reader.GetValue(10, &val);
-        EXPECT_EQ(val, i);
+/// Get many values from a batch RLE decoder.
+template <typename T>
+static bool GetRleValues(RleBatchDecoder<T>* decoder, int num_vals, T* vals) {
+  int decoded = 0;
+  // Decode repeated and literal runs until we've filled the output.
+  while (decoded < num_vals) {
+    if (decoder->NextNumRepeats() > 0) {
+      EXPECT_EQ(0, decoder->NextNumLiterals());
+      int num_repeats_to_output =
+          min<int>(decoder->NextNumRepeats(), num_vals - decoded);
+      T repeated_val = decoder->GetRepeatedValue(num_repeats_to_output);
+      for (int i = 0; i < num_repeats_to_output; ++i) {
+        *vals = repeated_val;
+        ++vals;
       }
-      EXPECT_TRUE(result);
+      decoded += num_repeats_to_output;
+      continue;
     }
-    reader.Reset(buffer, len);
+    int num_literals_to_output =
+          min<int>(decoder->NextNumLiterals(), num_vals - decoded);
+    if (num_literals_to_output == 0) return false;
+    if (!decoder->GetLiteralValues(num_literals_to_output, vals)) return false;
+    decoded += num_literals_to_output;
+    vals += num_literals_to_output;
   }
+  return true;
 }
 
 // Validates encoding of values by encoding and decoding them.  If
@@ -203,16 +198,32 @@ void ValidateRle(const vector<int>& values, int bit_width,
   }
 
   // Verify read
-  RleDecoder decoder(buffer, len, bit_width);
+  RleBatchDecoder<uint64_t> decoder(buffer, len, bit_width);
+  RleBatchDecoder<uint64_t> decoder2(buffer, len, bit_width);
   // Ensure it returns the same results after Reset().
   for (int trial = 0; trial < 2; ++trial) {
     for (int i = 0; i < values.size(); ++i) {
       uint64_t val;
-      bool result = decoder.Get(&val);
-      EXPECT_TRUE(result);
-      EXPECT_EQ(values[i], val);
+      EXPECT_TRUE(decoder.GetSingleValue(&val));
+      EXPECT_EQ(values[i], val) << i;
+    }
+    // Unpack everything at once from the second batch decoder.
+    vector<uint64_t> decoded_values(values.size());
+    EXPECT_TRUE(GetRleValues(&decoder2, values.size(), decoded_values.data()));
+    for (int i = 0; i < values.size(); ++i) {
+      EXPECT_EQ(values[i], decoded_values[i]) << i;
     }
     decoder.Reset(buffer, len, bit_width);
+    decoder2.Reset(buffer, len, bit_width);
+  }
+}
+
+/// Basic test case for literal unpacking - two literals in a run.
+TEST(Rle, TwoLiteralRun) {
+  vector<int> values{1, 0};
+  ValidateRle(values, 1, nullptr, -1);
+  for (int width = 1; width <= MAX_WIDTH; ++width) {
+    ValidateRle(values, width, nullptr, -1);
   }
 }
 
@@ -287,16 +298,22 @@ TEST(Rle, BitWidthZeroRepeated) {
   uint8_t buffer[1];
   const int num_values = 15;
   buffer[0] = num_values << 1; // repeated indicator byte
-  RleDecoder decoder(buffer, sizeof(buffer), 0);
+  RleBatchDecoder<uint8_t> decoder(buffer, sizeof(buffer), 0);
   // Ensure it returns the same results after Reset().
   for (int trial = 0; trial < 2; ++trial) {
     uint8_t val;
     for (int i = 0; i < num_values; ++i) {
-      bool result = decoder.Get(&val);
-      EXPECT_TRUE(result);
-      EXPECT_EQ(val, 0); // can only encode 0s with bit width 0
+      EXPECT_TRUE(decoder.GetSingleValue(&val));
+      EXPECT_EQ(val, 0);
     }
-    EXPECT_FALSE(decoder.Get(&val));
+    EXPECT_FALSE(decoder.GetSingleValue(&val));
+
+    // Test decoding all values in a batch.
+    decoder.Reset(buffer, sizeof(buffer), 0);
+    uint8_t decoded_values[num_values];
+    EXPECT_TRUE(GetRleValues(&decoder, num_values, decoded_values));
+    for (int i = 0; i < num_values; i++) EXPECT_EQ(0, decoded_values[i]) << i;
+    EXPECT_FALSE(decoder.GetSingleValue(&val));
     decoder.Reset(buffer, sizeof(buffer), 0);
   }
 }
@@ -305,17 +322,23 @@ TEST(Rle, BitWidthZeroLiteral) {
   uint8_t buffer[1];
   const int num_groups = 4;
   buffer[0] = num_groups << 1 | 1; // literal indicator byte
-  RleDecoder decoder = RleDecoder(buffer, sizeof(buffer), 0);
+  RleBatchDecoder<uint8_t> decoder(buffer, sizeof(buffer), 0);
   // Ensure it returns the same results after Reset().
   for (int trial = 0; trial < 2; ++trial) {
     const int num_values = num_groups * 8;
     uint8_t val;
     for (int i = 0; i < num_values; ++i) {
-      bool result = decoder.Get(&val);
-      EXPECT_TRUE(result);
+      EXPECT_TRUE(decoder.GetSingleValue(&val));
       EXPECT_EQ(val, 0); // can only encode 0s with bit width 0
     }
-    EXPECT_FALSE(decoder.Get(&val));
+
+    // Test decoding the whole batch at once.
+    decoder.Reset(buffer, sizeof(buffer), 0);
+    uint8_t decoded_values[num_values];
+    EXPECT_TRUE(GetRleValues(&decoder, num_values, decoded_values));
+    for (int i = 0; i < num_values; ++i) EXPECT_EQ(0, decoded_values[i]);
+
+    EXPECT_FALSE(GetRleValues(&decoder, 1, decoded_values));
     decoder.Reset(buffer, sizeof(buffer), 0);
   }
 }
@@ -402,20 +425,25 @@ TEST(BitRle, Overflow) {
     EXPECT_LE(bytes_written, len);
     EXPECT_GT(num_added, 0);
 
-    RleDecoder decoder(buffer, bytes_written, bit_width);
+    RleBatchDecoder<uint32_t> decoder(buffer, bytes_written, bit_width);
     // Ensure it returns the same results after Reset().
     for (int trial = 0; trial < 2; ++trial) {
       parity = true;
       uint32_t v;
       for (int i = 0; i < num_added; ++i) {
-        bool result = decoder.Get(&v);
-        EXPECT_TRUE(result);
+        EXPECT_TRUE(decoder.GetSingleValue(&v));
         EXPECT_EQ(v, parity);
         parity = !parity;
       }
       // Make sure we get false when reading past end a couple times.
-      EXPECT_FALSE(decoder.Get(&v));
-      EXPECT_FALSE(decoder.Get(&v));
+      EXPECT_FALSE(decoder.GetSingleValue(&v));
+      EXPECT_FALSE(decoder.GetSingleValue(&v));
+
+      decoder.Reset(buffer, bytes_written, bit_width);
+      uint32_t decoded_values[num_added];
+      EXPECT_TRUE(GetRleValues(&decoder, num_added, decoded_values));
+      for (int i = 0; i < num_added; ++i) EXPECT_EQ(i % 2 == 0, 
decoded_values[i]) << i;
+
       decoder.Reset(buffer, bytes_written, bit_width);
     }
   }
@@ -426,21 +454,20 @@ TEST(BitRle, Overflow) {
 TEST(Rle, ZeroLiteralOrRepeatCount) {
   const int len = 1024;
   uint8_t buffer[len];
-  RleDecoder decoder(buffer, len, 0);
-  uint64_t val;
-
+  RleBatchDecoder<uint64_t> decoder(buffer, len, 0);
   // Test the RLE repeated values path.
   memset(buffer, 0, len);
   for (int i = 0; i < 10; ++i) {
-    bool result = decoder.Get(&val);
-    EXPECT_FALSE(result);
+    EXPECT_EQ(0, decoder.NextNumLiterals());
+    EXPECT_EQ(0, decoder.NextNumRepeats());
   }
 
   // Test the RLE literal values path
   memset(buffer, 1, len);
+  decoder.Reset(buffer, len, 0);
   for (int i = 0; i < 10; ++i) {
-    bool result = decoder.Get(&val);
-    EXPECT_FALSE(result);
+    EXPECT_EQ(0, decoder.NextNumLiterals());
+    EXPECT_EQ(0, decoder.NextNumRepeats());
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/testdata/data/README
----------------------------------------------------------------------
diff --git a/testdata/data/README b/testdata/data/README
index e1dc496..8b9db3a 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -123,3 +123,12 @@ Generated using parquet-mr and contents verified using 
parquet-tools-1.9.1.
 Contains decimals stored as variable sized BYTE_ARRAY with both dictionary
 and non-dictionary encoding respectively.
 
+alltypes_agg_bitpacked_def_levels.parquet:
+Generated by hacking Impala's Parquet writer to write out bitpacked def levels 
instead
+of the standard RLE-encoded levels. See
+https://github.com/timarmstrong/incubator-impala/tree/hack-bit-packed-levels. 
This
+is a single file containing all of the alltypesagg data, which includes a mix 
of
+null and non-null values. This is not actually a valid Parquet file because the
+bit-packed levels are written in the reverse order specified in the Parquet 
spec
+for BIT_PACKED. However, this is the order that Impala attempts to read the 
levels
+in - see IMPALA-3006.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/testdata/data/alltypes_agg_bitpacked_def_levels.parquet
----------------------------------------------------------------------
diff --git a/testdata/data/alltypes_agg_bitpacked_def_levels.parquet 
b/testdata/data/alltypes_agg_bitpacked_def_levels.parquet
new file mode 100644
index 0000000..bd2d9d8
Binary files /dev/null and 
b/testdata/data/alltypes_agg_bitpacked_def_levels.parquet differ

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test
new file mode 100644
index 0000000..d48c333
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test
@@ -0,0 +1,52 @@
+====
+---- QUERY
+# Verify that total counts of non-null values are correct.
+select count(id), count(tinyint_col), count(smallint_col), count(int_col),
+  count(bigint_col), count(float_col), count(double_col), 
count(date_string_col),
+  count(string_col), count(timestamp_col), count(year), count(month), 
count(day)
+from alltypesagg
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+---- RESULTS
+11000,9000,10800,10980,10980,10980,10980,11000,11000,11000,11000,11000,10000
+====
+---- QUERY
+# Spot-check a subset of values.
+select *
+from alltypesagg
+where year = 2010 and month = 1 and int_col is null or int_col % 1000 = 77
+order by id, year, month, day
+---- TYPES
+INT,BOOLEAN,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,STRING,STRING,TIMESTAMP,INT,INT,INT
+---- RESULTS
+0,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/01/10','0',2010-01-01 
00:00:00,2010,1,1
+0,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/01/10','0',2010-01-01 
00:00:00,2010,1,NULL
+77,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/01/10','77',2010-01-01
 01:17:29.260000000,2010,1,1
+1000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/02/10','0',2010-01-02 
00:00:00,2010,1,2
+1000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/02/10','0',2010-01-02 
00:00:00,2010,1,NULL
+1077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/02/10','77',2010-01-02
 01:17:29.260000000,2010,1,2
+2000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/03/10','0',2010-01-03 
00:00:00,2010,1,3
+2000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/03/10','0',2010-01-03 
00:00:00,2010,1,NULL
+2077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/03/10','77',2010-01-03
 01:17:29.260000000,2010,1,3
+3000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/04/10','0',2010-01-04 
00:00:00,2010,1,4
+3000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/04/10','0',2010-01-04 
00:00:00,2010,1,NULL
+3077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/04/10','77',2010-01-04
 01:17:29.260000000,2010,1,4
+4000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/05/10','0',2010-01-05 
00:00:00,2010,1,5
+4000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/05/10','0',2010-01-05 
00:00:00,2010,1,NULL
+4077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/05/10','77',2010-01-05
 01:17:29.260000000,2010,1,5
+5000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/06/10','0',2010-01-06 
00:00:00,2010,1,6
+5000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/06/10','0',2010-01-06 
00:00:00,2010,1,NULL
+5077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/06/10','77',2010-01-06
 01:17:29.260000000,2010,1,6
+6000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/07/10','0',2010-01-07 
00:00:00,2010,1,7
+6000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/07/10','0',2010-01-07 
00:00:00,2010,1,NULL
+6077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/07/10','77',2010-01-07
 01:17:29.260000000,2010,1,7
+7000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/08/10','0',2010-01-08 
00:00:00,2010,1,8
+7000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/08/10','0',2010-01-08 
00:00:00,2010,1,NULL
+7077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/08/10','77',2010-01-08
 01:17:29.260000000,2010,1,8
+8000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/09/10','0',2010-01-09 
00:00:00,2010,1,9
+8000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/09/10','0',2010-01-09 
00:00:00,2010,1,NULL
+8077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/09/10','77',2010-01-09
 01:17:29.260000000,2010,1,9
+9000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/10/10','0',2010-01-10 
00:00:00,2010,1,10
+9000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/10/10','0',2010-01-10 
00:00:00,2010,1,NULL
+9077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/10/10','77',2010-01-10
 01:17:29.260000000,2010,1,10
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py 
b/tests/query_test/test_scanners.py
index 17b9503..fe0577a 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -388,6 +388,22 @@ class TestParquet(ImpalaTestSuite):
     self.run_test_case('QueryTest/parquet-corrupt-rle-counts-abort',
                        vector, unique_database)
 
+  def test_bitpacked_def_levels(self, vector, unique_database):
+    """Test that Impala can read a Parquet file with the deprecated bit-packed 
def
+       level encoding."""
+    self.client.execute(("""CREATE TABLE {0}.alltypesagg (
+          id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT,
+          int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE,
+          date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP,
+          year INT, month INT, day INT) STORED AS 
PARQUET""").format(unique_database))
+    alltypesagg_loc = get_fs_path(
+        "/test-warehouse/{0}.db/{1}".format(unique_database, "alltypesagg"))
+    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
+        "/testdata/data/alltypes_agg_bitpacked_def_levels.parquet", 
alltypesagg_loc])
+    self.client.execute("refresh {0}.alltypesagg".format(unique_database));
+
+    self.run_test_case('QueryTest/parquet-def-levels', vector, unique_database)
+
   @SkipIfS3.hdfs_block_size
   @SkipIfADLS.hdfs_block_size
   @SkipIfIsilon.hdfs_block_size


Reply via email to