IMPALA-2736: Optimized ReadValueBatch() for Parquet scalar column readers.

This change builds on top of the recent move to column-wise
materialization of scalar values in the Parquet scanner.

The goal of this patch is to improve the scan efficiency, and
show the future direction for all column readers.

Major TODO:
The current patch has minor code duplication/redundancy,
and the new ReadValueBatch() departs from (but improves) the
existing column reader control flow. To improve code reuse
and readability we should overhaul all column readers to be
more uniform.

Summary of changes:
- refactor ReadValueBatch() to simplify control flow
- introduce caching of def/rep levels for faster level
  decoding, and for a tigher value materialization loop
- new templated function for value materialization that
  takes the value encoding as a template argument

Mini benchmark vs. cdh5-trunk
I ran the following queries on a single impalad before and after my
change using a synthetic 'huge_lineitem' table.
I modified hdfs-scan-node.cc to set the number of rows of any row
batch to 0 to focus the measurement on the scan time.

Query options:
set num_scanner_threads=1;
set disable_codegen=true;
set num_nodes=1;

select * from huge_lineitem;
Before: 22.39s
Afer:   13.62s

select * from huge_lineitem where l_linenumber < 0;
Before: 25.11s
After:  17.73s

select * from huge_lineitem where l_linenumber % 2 = 0;
Before: 26.32s
After:  16.68s

select l_linenumber from huge_lineitem;
Before: 1.74s
After:  0.92s

Testing:
I ran a private exhaustive build and all tests passed.

Change-Id: I21fa9b050a45f2dd45cc0091ea5b008d3c0a3f30
Reviewed-on: http://gerrit.cloudera.org:8080/2843
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Alex Behm <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/14cdb049
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/14cdb049
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/14cdb049

Branch: refs/heads/master
Commit: 14cdb0497c52f102623b386bc35482f4c2fa7f8b
Parents: df8bf3a
Author: Alex Behm <[email protected]>
Authored: Thu Apr 21 23:39:21 2016 -0700
Committer: Tim Armstrong <[email protected]>
Committed: Thu May 12 14:18:05 2016 -0700

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc | 468 ++++++++++++++++++++++---------
 be/src/exec/hdfs-parquet-scanner.h  |  11 +
 be/src/util/rle-encoding.h          |  10 +-
 3 files changed, 356 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/14cdb049/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc 
b/be/src/exec/hdfs-parquet-scanner.cc
index 52fe3fa..47e19a6 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -64,6 +64,9 @@ DEFINE_double(parquet_min_filter_reject_ratio, 0.1, 
"(Advanced) If the percentag
     "rows rejected by a runtime filter drops below this value, the filter is 
disabled.");
 
 const int64_t HdfsParquetScanner::FOOTER_SIZE = 100 * 1024;
+const int16_t HdfsParquetScanner::ROW_GROUP_END = 
numeric_limits<int16_t>::min();
+const int16_t HdfsParquetScanner::INVALID_LEVEL = -1;
+const int16_t HdfsParquetScanner::INVALID_POS = -1;
 
 // Max data page header size in bytes. This is an estimate and only needs to 
be an upper
 // bound. It is theoretically possible to have a page header of any size due 
to string
@@ -230,22 +233,87 @@ HdfsParquetScanner::~HdfsParquetScanner() {
 
 // TODO for 2.3: move column readers to separate file
 
-/// Decoder for all supported Parquet level encodings.
-/// Overrides RleDecoder so it can use RLE decoding and internal bit reader.
-class HdfsParquetScanner::LevelDecoder : protected RleDecoder {
+/// Decoder for all supported Parquet level encodings. Optionally reads, 
decodes, and
+/// caches level values in batches.
+/// Level values are unsigned 8-bit integers because we support a maximum 
nesting
+/// depth of 100, as enforced by the FE. Using a small type saves memory and 
speeds up
+/// populating the level cache (e.g., with RLE we can memset() repeated 
values).
+///
+/// Inherits from RleDecoder instead of containing one for performance reasons.
+/// The containment design would require two BitReaders per column reader. The 
extra
+/// BitReader causes enough bloat for a column reader to require another cache 
line.
+/// TODO: It is not clear whether the inheritance vs. containment choice still 
makes
+/// sense with column-wise materialization. The containment design seems 
cleaner and
+/// we should revisit.
+class HdfsParquetScanner::LevelDecoder : public RleDecoder {
  public:
-  LevelDecoder() {};
+  LevelDecoder(bool is_def_level_decoder)
+    : cached_levels_(NULL),
+      num_cached_levels_(0),
+      cached_level_idx_(0),
+      encoding_(parquet::Encoding::PLAIN),
+      max_level_(0),
+      cache_size_(0),
+      num_buffered_values_(0),
+      decoding_error_code_(is_def_level_decoder ?
+          TErrorCode::PARQUET_DEF_LEVEL_ERROR : 
TErrorCode::PARQUET_REP_LEVEL_ERROR) {
+  }
 
   /// Initialize the LevelDecoder. Reads and advances the provided data buffer 
if the
   /// encoding requires reading metadata from the page header.
-  Status Init(const string& filename, parquet::Encoding::type encoding, int 
max_level,
-      int num_buffered_values, uint8_t** data, int* data_size);
+  Status Init(const string& filename, parquet::Encoding::type encoding,
+      MemPool* cache_pool, int cache_size, int max_level, int 
num_buffered_values,
+      uint8_t** data, int* data_size);
 
-  /// Reads the next level.
+  /// Returns the next level or INVALID_LEVEL if there was an error.
   inline int16_t ReadLevel();
 
+  /// Decodes and caches the next batch of levels. Resets members associated 
with the
+  /// cache. Returns a non-ok status if there was a problem decoding a level, 
or if a
+  /// level was encountered with a value greater than max_level_.
+  Status CacheNextBatch(int batch_size);
+
+  /// Functions for working with the level cache.
+  inline bool CacheHasNext() const { return cached_level_idx_ < 
num_cached_levels_; }
+  inline uint8_t CacheGetNext() {
+    DCHECK_LT(cached_level_idx_, num_cached_levels_);
+    return cached_levels_[cached_level_idx_++];
+  }
+  inline void CacheSkipLevels(int num_levels) {
+    DCHECK_LE(cached_level_idx_ + num_levels, num_cached_levels_);
+    cached_level_idx_ += num_levels;
+  }
+  inline int CacheSize() const { return num_cached_levels_; }
+  inline int CacheRemaining() const { return num_cached_levels_ - 
cached_level_idx_; }
+  inline int CacheCurrIdx() const { return cached_level_idx_; }
+
  private:
+  /// Initializes members associated with the level cache. Allocates memory for
+  /// the cache from pool, if necessary.
+  Status InitCache(MemPool* pool, int cache_size);
+
+  /// Decodes and writes a batch of levels into the cache. Sets the number of
+  /// values written to the cache in *num_cached_levels. Returns false if 
there was
+  /// an error decoding a level or if there was a level value greater than 
max_level_.
+  bool FillCache(int batch_size, int* num_cached_levels);
+
+  /// Buffer for a batch of levels. The memory is allocated and owned by a 
pool in
+  /// passed in Init().
+  uint8_t* cached_levels_;
+  /// Number of valid level values in the cache.
+  int num_cached_levels_;
+  /// Current index into cached_levels_.
+  int cached_level_idx_;
   parquet::Encoding::type encoding_;
+
+  /// For error checking and reporting.
+  int max_level_;
+  /// Number of level values cached_levels_ has memory allocated for.
+  int cache_size_;
+  /// Number of remaining data values in the current data page.
+  int num_buffered_values_;
+  string filename_;
+  TErrorCode::type decoding_error_code_;
 };
 
 /// Base class for reading a column. Reads a logical column, not necessarily a 
column
@@ -306,6 +374,11 @@ class HdfsParquetScanner::ColumnReader {
   /// not in collections.
   virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) = 0;
 
+  /// Returns true if this reader needs to be seeded with NextLevels() before
+  /// calling ReadValueBatch() or ReadNonRepeatedValueBatch().
+  /// Note that all readers need to be seeded before calling the non-batched 
ReadValue().
+  virtual bool NeedsSeedingForBatchedReading() const { return true; }
+
   /// Batched version of ReadValue() that reads up to max_values at once and 
materializes
   /// them into tuples in tuple_mem. Returns the number of values actually 
materialized
   /// in *num_values. The return value, error behavior and state changes are 
generally
@@ -340,9 +413,8 @@ class HdfsParquetScanner::ColumnReader {
   /// 'tuple') and increments pos_current_value_.
   void ReadPosition(Tuple* tuple);
 
-  /// Returns true if this column reader has reached the end of the row group, 
or
-  /// if this column reader has not been seeded with a first NextLevels().
-  inline bool RowGroupAtEnd() { return rep_level_ == -1; }
+  /// Returns true if this column reader has reached the end of the row group.
+  inline bool RowGroupAtEnd() { return rep_level_ == ROW_GROUP_END; }
 
  protected:
   HdfsParquetScanner* parent_;
@@ -385,10 +457,10 @@ class HdfsParquetScanner::ColumnReader {
       node_(node),
       slot_desc_(slot_desc),
       pos_slot_desc_(NULL),
-      pos_current_value_(-1),
-      rep_level_(-1),
+      pos_current_value_(INVALID_POS),
+      rep_level_(INVALID_LEVEL),
       max_rep_level_(node_.max_rep_level),
-      def_level_(-1),
+      def_level_(INVALID_LEVEL),
       max_def_level_(node_.max_def_level),
       tuple_offset_(slot_desc == NULL ? -1 : slot_desc->tuple_offset()),
       null_indicator_offset_(slot_desc == NULL ? NullIndicatorOffset(-1, -1) :
@@ -402,7 +474,6 @@ class HdfsParquetScanner::ColumnReader {
   }
 };
 
-
 /// Collections are not materialized directly in parquet files; only scalar 
values appear
 /// in the file. CollectionColumnReader uses the definition and repetition 
levels of child
 /// column readers to figure out the boundaries of each collection in this 
column.
@@ -476,6 +547,8 @@ class HdfsParquetScanner::BaseScalarColumnReader :
   BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
       const SlotDescriptor* slot_desc)
     : ColumnReader(parent, node, slot_desc),
+      def_levels_(true),
+      rep_levels_(false),
       num_buffered_values_(0),
       num_values_read_(0),
       metadata_(NULL),
@@ -577,18 +650,10 @@ class HdfsParquetScanner::BaseScalarColumnReader :
   /// def_level_ and pos_current_value_ to -1 if no more pages or an error 
encountered.
   bool NextPage();
 
-  /// Implementation for NextLevels() and NextDefLevel().
+  /// Implementation for NextLevels().
   template <bool ADVANCE_REP_LEVEL>
   bool NextLevels();
 
-  /// Returns the definition level for the next value
-  /// Returns -1 and sets parse_status_ if there was a error parsing it.
-  int16_t ReadDefinitionLevel();
-
-  /// Returns the repetition level for the next value
-  /// Returns -1 and sets parse_status_ if there was a error parsing it.
-  int16_t ReadRepetitionLevel();
-
   /// Creates a dictionary decoder from values/size and store in class. 
Subclass must
   /// implement this.
   virtual DictDecoderBase* CreateDictionaryDecoder(uint8_t* values, int size) 
= 0;
@@ -606,12 +671,6 @@ class HdfsParquetScanner::BaseScalarColumnReader :
   virtual Status InitDataPage(uint8_t* data, int size) = 0;
 
  private:
-  // Pull out slow-path Status construction code from ReadRepetitionLevel()/
-  // ReadDefinitionLevel() for performance.
-  void __attribute__((noinline)) SetLevelError(TErrorCode::type error_code) {
-    parent_->parse_status_ = Status(error_code, num_buffered_values_, 
filename());
-  }
-
   /// Writes the next value into *slot using pool if necessary. Also advances 
rep_level_
   /// and def_level_ via NextLevels().
   ///
@@ -667,11 +726,17 @@ class HdfsParquetScanner::ScalarColumnReader :
     return ReadValue<false>(pool, tuple);
   }
 
+  virtual bool NeedsSeedingForBatchedReading() const { return false; }
+
   virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
-      uint8_t* tuple_mem, int* num_values);
+      uint8_t* tuple_mem, int* num_values) {
+    return ReadValueBatch<true>(pool, max_values, tuple_size, tuple_mem, 
num_values);
+  }
 
   virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int 
tuple_size,
-      uint8_t* tuple_mem, int* num_values);
+      uint8_t* tuple_mem, int* num_values) {
+    return ReadValueBatch<false>(pool, max_values, tuple_size, tuple_mem, 
num_values);
+  }
 
  protected:
   template <bool IN_COLLECTION>
@@ -685,15 +750,134 @@ class HdfsParquetScanner::ScalarColumnReader :
     DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
         "Caller should have called NextLevels() until we are ready to read a 
value";
 
-    if (!MATERIALIZED) {
-      return NextLevels<IN_COLLECTION>();
-    } else if (def_level_ >= max_def_level()) {
-      return ReadSlot<IN_COLLECTION>(tuple->GetSlot(tuple_offset_), pool);
-    } else {
-      // Null value
-      tuple->SetNull(null_indicator_offset_);
-      return NextLevels<IN_COLLECTION>();
+    if (MATERIALIZED) {
+      if (def_level_ >= max_def_level()) {
+        if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
+          if (!ReadSlot<true>(tuple->GetSlot(tuple_offset_), pool)) return 
false;
+        } else {
+          if (!ReadSlot<false>(tuple->GetSlot(tuple_offset_), pool)) return 
false;
+        }
+      } else {
+        tuple->SetNull(null_indicator_offset_);
+      }
     }
+    return NextLevels<IN_COLLECTION>();
+  }
+
+  /// Implementation of the ReadValueBatch() functions specialized for this
+  /// column reader type. This function drives the reading of data pages and
+  /// caching of rep/def levels. Once a data page and cached levels are 
available,
+  /// it calls into a more specialized MaterializeValueBatch() for doing the 
actual
+  /// value materialization using the level caches.
+  template<bool IN_COLLECTION>
+  bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
+      uint8_t* tuple_mem, int* num_values) {
+    // Repetition level is only present if this column is nested in a 
collection type.
+    if (!IN_COLLECTION) DCHECK_EQ(max_rep_level(), 0) << 
slot_desc()->DebugString();
+    if (IN_COLLECTION) DCHECK_GT(max_rep_level(), 0) << 
slot_desc()->DebugString();
+
+    int val_count = 0;
+    bool continue_execution = true;
+    while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
+      // Read next page if necessary.
+      if (num_buffered_values_ == 0) {
+        if (!NextPage()) {
+          continue_execution = parent_->parse_status_.ok();
+          continue;
+        }
+      }
+
+      // Fill def/rep level caches if they are empty.
+      int level_batch_size = min(parent_->state_->batch_size(), 
num_buffered_values_);
+      if (!def_levels_.CacheHasNext()) {
+        
parent_->parse_status_.MergeStatus(def_levels_.CacheNextBatch(level_batch_size));
+      }
+      // We only need the repetition levels for populating the position slot 
since we
+      // are only populating top-level tuples.
+      if (IN_COLLECTION && pos_slot_desc_ != NULL && 
!rep_levels_.CacheHasNext()) {
+        
parent_->parse_status_.MergeStatus(rep_levels_.CacheNextBatch(level_batch_size));
+      }
+      if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+
+      // This special case is most efficiently handled here directly.
+      if (!MATERIALIZED && !IN_COLLECTION) {
+        int vals_to_add = min(def_levels_.CacheRemaining(), max_values - 
val_count);
+        val_count += vals_to_add;
+        def_levels_.CacheSkipLevels(vals_to_add);
+        num_buffered_values_ -= vals_to_add;
+        continue;
+      }
+
+      // Read data page and cached levels to materialize values.
+      int cache_start_idx = def_levels_.CacheCurrIdx();
+      uint8_t* next_tuple = tuple_mem + val_count * tuple_size;
+      int remaining_val_capacity = max_values - val_count;
+      int ret_val_count = 0;
+      if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
+        continue_execution = MaterializeValueBatch<IN_COLLECTION, true>(
+            pool, remaining_val_capacity, tuple_size, next_tuple, 
&ret_val_count);
+      } else {
+        continue_execution = MaterializeValueBatch<IN_COLLECTION, false>(
+            pool, remaining_val_capacity, tuple_size, next_tuple, 
&ret_val_count);
+      }
+      val_count += ret_val_count;
+      num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx);
+    }
+    *num_values = val_count;
+    return continue_execution;
+  }
+
+  /// Helper function for ReadValueBatch() above that performs value 
materialization.
+  /// It assumes a data page with remaining values is available, and that the 
def/rep
+  /// level caches have been populated.
+  /// For efficiency, the simple special case of !MATERIALIZED && 
!IN_COLLECTION is not
+  /// handled in this function.
+  template<bool IN_COLLECTION, bool IS_DICT_ENCODED>
+  bool MaterializeValueBatch(MemPool* pool, int max_values, int tuple_size,
+      uint8_t* tuple_mem, int* num_values) {
+    DCHECK(MATERIALIZED || IN_COLLECTION);
+    DCHECK_GT(num_buffered_values_, 0);
+    DCHECK(def_levels_.CacheHasNext());
+    if (IN_COLLECTION && pos_slot_desc_ != NULL) 
DCHECK(rep_levels_.CacheHasNext());
+
+    uint8_t* curr_tuple = tuple_mem;
+    int val_count = 0;
+    while (def_levels_.CacheHasNext()) {
+      Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
+      int def_level = def_levels_.CacheGetNext();
+
+      if (IN_COLLECTION) {
+        if (def_level < def_level_of_immediate_repeated_ancestor()) {
+          // A containing repeated field is empty or NULL. Skip the value but
+          // move to the next repetition level if necessary.
+          if (pos_slot_desc_ != NULL) rep_levels_.CacheGetNext();
+          continue;
+        }
+        if (pos_slot_desc_ != NULL) {
+          int rep_level = rep_levels_.CacheGetNext();
+          // Reset position counter if we are at the start of a new parent 
collection.
+          if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0;
+          void* pos_slot = tuple->GetSlot(pos_slot_desc()->tuple_offset());
+          *reinterpret_cast<int64_t*>(pos_slot) = pos_current_value_++;
+        }
+      }
+
+      if (MATERIALIZED) {
+        if (def_level >= max_def_level()) {
+          bool continue_execution =
+              ReadSlot<IS_DICT_ENCODED>(tuple->GetSlot(tuple_offset_), pool);
+          if (UNLIKELY(!continue_execution)) break;
+        } else {
+          tuple->SetNull(null_indicator_offset_);
+        }
+      }
+
+      curr_tuple += tuple_size;
+      ++val_count;
+      if (UNLIKELY(val_count == max_values)) break;
+    }
+    *num_values = val_count;
+    return true;
   }
 
   virtual DictDecoderBase* CreateDictionaryDecoder(uint8_t* values, int size) {
@@ -734,17 +918,17 @@ class HdfsParquetScanner::ScalarColumnReader :
   }
 
  private:
-  /// Writes the next value into *slot using pool if necessary. Also advances 
def_level_
-  /// and rep_level_ via NextLevels().
+  /// Writes the next value into *slot using pool if necessary.
   ///
   /// Returns false if execution should be aborted for some reason, e.g. 
parse_error_ is
   /// set, the query is cancelled, or the scan node limit was reached. 
Otherwise returns
   /// true.
-  template <bool IN_COLLECTION>
+  template<bool IS_DICT_ENCODED>
   inline bool ReadSlot(void* slot, MemPool* pool) {
     T val;
     T* val_ptr = NeedsConversion() ? &val : reinterpret_cast<T*>(slot);
-    if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
+    if (IS_DICT_ENCODED) {
+      DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN_DICTIONARY);
       if (UNLIKELY(!dict_decoder_.GetValue(val_ptr))) {
         SetDictDecodeError();
         return false;
@@ -757,7 +941,7 @@ class HdfsParquetScanner::ScalarColumnReader :
             !ConvertSlot(&val, reinterpret_cast<T*>(slot), pool))) {
       return false;
     }
-    return NextLevels<IN_COLLECTION>();
+    return true;
   }
 
   /// Most column readers never require conversion, so we can avoid branches by
@@ -794,48 +978,6 @@ class HdfsParquetScanner::ScalarColumnReader :
   int fixed_len_size_;
 };
 
-/// Implementations of the batched ReadValue() functions specialized for this
-/// column reader type.
-/// TODO: The code is only a proof-of-concept. It is almost identical to the
-/// generic ColumnReader::ReadValueBatch() function, except that this version
-/// avoids the virtual function calls for NextLevels()/ReadValue().
-/// This function needs to be optimized further.
-template<typename T, bool MATERIALIZED>
-bool HdfsParquetScanner::ScalarColumnReader<T, MATERIALIZED>::ReadValueBatch(
-    MemPool* pool, int max_values, int tuple_size, uint8_t* tuple_mem, int* 
num_values) {
-  int val_count = 0;
-  bool continue_execution = true;
-  while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
-    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * 
tuple_size);
-    if (def_level_ < def_level_of_immediate_repeated_ancestor()) {
-      // A containing repeated field is empty or NULL
-      continue_execution = NextLevels<true>();
-      continue;
-    }
-    // Fill in position slot if applicable
-    if (pos_slot_desc_ != NULL) ReadPosition(tuple);
-    continue_execution = ReadValue<true>(pool, tuple);
-    ++val_count;
-  }
-  *num_values = val_count;
-  return continue_execution;
-}
-
-template<typename T, bool MATERIALIZED>
-bool HdfsParquetScanner::ScalarColumnReader<T, 
MATERIALIZED>::ReadNonRepeatedValueBatch(
-    MemPool* pool, int max_values, int tuple_size,
-    uint8_t* tuple_mem, int* num_values) {
-  int val_count = 0;
-  bool continue_execution = true;
-  while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
-    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * 
tuple_size);
-    continue_execution = ReadValue<false>(pool, tuple);
-    ++val_count;
-  }
-  *num_values = val_count;
-  return continue_execution;
-}
-
 template<>
 inline bool HdfsParquetScanner::ScalarColumnReader<StringValue, 
true>::NeedsConversion() const {
   return needs_conversion_;
@@ -974,6 +1116,8 @@ Status HdfsParquetScanner::Prepare(ScannerContext* 
context) {
 
   scan_node_->IncNumScannersCodegenDisabled();
 
+  level_cache_pool_.reset(new MemPool(scan_node_->mem_tracker()));
+
   for (int i = 0; i < context->filter_ctxs().size(); ++i) {
     const FilterContext* ctx = &context->filter_ctxs()[i];
     DCHECK(ctx->filter != NULL);
@@ -1023,6 +1167,11 @@ void HdfsParquetScanner::Close() {
   assemble_rows_timer_.Stop();
   assemble_rows_timer_.ReleaseCounter();
 
+  if (level_cache_pool_.get() != NULL) {
+    level_cache_pool_->FreeAll();
+    level_cache_pool_.reset(NULL);
+  }
+
   for (int i = 0; i < filter_ctxs_.size(); ++i) {
     const FilterStats* stats = filter_ctxs_[i]->stats;
     const LocalFilterStats& local = filter_stats_[i];
@@ -1338,19 +1487,18 @@ Status 
HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() {
       FILE_CHECK_EQ(current_page_header_.compressed_page_size, 
uncompressed_size);
     }
 
-    if (max_rep_level() > 0) {
-      // Initialize the repetition level data
-      rep_levels_.Init(filename(),
-          current_page_header_.data_page_header.repetition_level_encoding,
-          max_rep_level(), num_buffered_values_, &data_, &data_size);
-    }
+    // Initialize the repetition level data
+    rep_levels_.Init(filename(),
+        current_page_header_.data_page_header.repetition_level_encoding,
+        parent_->level_cache_pool_.get(), parent_->state_->batch_size(),
+        max_rep_level(), num_buffered_values_,
+        &data_, &data_size);
 
-    if (max_def_level() > 0) {
-      // Initialize the definition level data
-      def_levels_.Init(filename(),
-          current_page_header_.data_page_header.definition_level_encoding,
-          max_def_level(), num_buffered_values_, &data_, &data_size);
-    }
+    // Initialize the definition level data
+    def_levels_.Init(filename(),
+        current_page_header_.data_page_header.definition_level_encoding,
+        parent_->level_cache_pool_.get(), parent_->state_->batch_size(),
+        max_def_level(), num_buffered_values_, &data_, &data_size);
 
     // Data can be empty if the column contains all NULLs
     if (data_size != 0) RETURN_IF_ERROR(InitDataPage(data_, data_size));
@@ -1361,9 +1509,17 @@ Status 
HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() {
 }
 
 Status HdfsParquetScanner::LevelDecoder::Init(const string& filename,
-    parquet::Encoding::type encoding, int max_level,
-    int num_buffered_values, uint8_t** data, int* data_size) {
+    parquet::Encoding::type encoding, MemPool* cache_pool, int cache_size,
+    int max_level, int num_buffered_values, uint8_t** data, int* data_size) {
   encoding_ = encoding;
+  max_level_ = max_level;
+  num_buffered_values_ = num_buffered_values;
+  filename_ = filename;
+  RETURN_IF_ERROR(InitCache(cache_pool, cache_size));
+
+  // Return because there is no level data to read, e.g., required field.
+  if (max_level == 0) return Status::OK();
+
   int32_t num_bytes = 0;
   switch (encoding) {
     case parquet::Encoding::RLE: {
@@ -1394,7 +1550,25 @@ Status HdfsParquetScanner::LevelDecoder::Init(const 
string& filename,
   return Status::OK();
 }
 
-// TODO More codegen here as well.
+Status HdfsParquetScanner::LevelDecoder::InitCache(MemPool* pool, int 
cache_size) {
+  num_cached_levels_ = 0;
+  cached_level_idx_ = 0;
+  // Memory has already been allocated.
+  if (cached_levels_ != NULL) {
+    DCHECK_EQ(cache_size_, cache_size);
+    return Status::OK();
+  }
+
+  cached_levels_ = reinterpret_cast<uint8_t*>(pool->TryAllocate(cache_size));
+  if (cached_levels_ == NULL) {
+    return pool->mem_tracker()->MemLimitExceeded(
+        NULL, "Definition level cache", cache_size);
+  }
+  memset(cached_levels_, 0, cache_size);
+  cache_size_ = cache_size;
+  return Status::OK();
+}
+
 inline int16_t HdfsParquetScanner::LevelDecoder::ReadLevel() {
   bool valid;
   uint8_t level;
@@ -1404,31 +1578,60 @@ inline int16_t 
HdfsParquetScanner::LevelDecoder::ReadLevel() {
     DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED);
     valid = bit_reader_.GetValue(1, &level);
   }
-  return LIKELY(valid) ? level : -1;
+  return LIKELY(valid) ? level : INVALID_LEVEL;
 }
 
-// TODO(skye): try reading + caching many levels at once to avoid error 
checking etc on
-// each call (here and RLE decoder) for perf
-inline int16_t 
HdfsParquetScanner::BaseScalarColumnReader::ReadDefinitionLevel() {
-  DCHECK_GT(max_def_level(), 0);
-  int16_t def_level = def_levels_.ReadLevel();
-
-  if (UNLIKELY(def_level < 0 || def_level > max_def_level())) {
-    SetLevelError(TErrorCode::PARQUET_DEF_LEVEL_ERROR);
-    return -1;
+Status HdfsParquetScanner::LevelDecoder::CacheNextBatch(int batch_size) {
+  DCHECK_LE(batch_size, cache_size_);
+  cached_level_idx_ = 0;
+  if (max_level_ > 0) {
+    if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_))) {
+      return Status(decoding_error_code_, num_buffered_values_, filename_);
+    }
+  } else {
+    // No levels to read, e.g., because the field is required. The cache was
+    // already initialized with all zeros, so we can hand out those values.
+    DCHECK_EQ(max_level_, 0);
+    num_cached_levels_ = batch_size;
   }
-  return def_level;
+  return Status::OK();
 }
 
-inline int16_t 
HdfsParquetScanner::BaseScalarColumnReader::ReadRepetitionLevel() {
-  DCHECK_GT(max_rep_level(), 0);
-  int16_t rep_level = rep_levels_.ReadLevel();
+bool HdfsParquetScanner::LevelDecoder::FillCache(int batch_size,
+    int* num_cached_levels) {
+  DCHECK(num_cached_levels != NULL);
+  int num_values = 0;
+  if (encoding_ == parquet::Encoding::RLE) {
+    while (true) {
+      // Add RLE encoded values by repeating the current value this number of 
times.
+      uint32_t num_repeats_to_set =
+          min<uint32_t>(repeat_count_, batch_size - num_values);
+      memset(cached_levels_ + num_values, current_value_, num_repeats_to_set);
+      num_values += num_repeats_to_set;
+      repeat_count_ -= num_repeats_to_set;
+
+      // Add remaining literal values, if any.
+      uint32_t num_literals_to_set =
+          min<uint32_t>(literal_count_, batch_size - num_values);
+      int num_values_end = min<uint32_t>(num_values + literal_count_, 
batch_size);
+      for (; num_values < num_values_end; ++num_values) {
+        bool valid = bit_reader_.GetValue(bit_width_, 
&cached_levels_[num_values]);
+        if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) 
return false;
+      }
+      literal_count_ -= num_literals_to_set;
 
-  if (UNLIKELY(rep_level < 0 || rep_level > max_rep_level())) {
-    SetLevelError(TErrorCode::PARQUET_REP_LEVEL_ERROR);
-    return -1;
+      if (num_values == batch_size || !NextCounts<int16_t>()) break;
+      if (repeat_count_ > 0 && current_value_ > max_level_) return false;
+    }
+  } else {
+    DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED);
+    for (; num_values < batch_size; ++num_values) {
+      bool valid = bit_reader_.GetValue(1, &cached_levels_[num_values]);
+      if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) return 
false;
+    }
   }
-  return rep_level;
+  *num_cached_levels = num_values;
+  return true;
 }
 
 template <bool ADVANCE_REP_LEVEL>
@@ -1441,11 +1644,11 @@ bool 
HdfsParquetScanner::BaseScalarColumnReader::NextLevels() {
   --num_buffered_values_;
 
   // Definition level is not present if column and any containing structs are 
required.
-  def_level_ = max_def_level() == 0 ? 0 : ReadDefinitionLevel();
+  def_level_ = max_def_level() == 0 ? 0 : def_levels_.ReadLevel();
 
   if (ADVANCE_REP_LEVEL && max_rep_level() > 0) {
     // Repetition level is only present if this column is nested in any 
collection type.
-    rep_level_ = ReadRepetitionLevel();
+    rep_level_ = rep_levels_.ReadLevel();
     // Reset position counter if we are at the start of a new parent 
collection.
     if (rep_level_ <= max_rep_level() - 1) pos_current_value_ = 0;
   }
@@ -1458,9 +1661,9 @@ bool 
HdfsParquetScanner::BaseScalarColumnReader::NextPage() {
   parent_->parse_status_ = ReadDataPage();
   if (UNLIKELY(!parent_->parse_status_.ok())) return false;
   if (num_buffered_values_ == 0) {
-    rep_level_ = -1;
-    def_level_ = -1;
-    pos_current_value_ = -1;
+    rep_level_ = ROW_GROUP_END;
+    def_level_ = INVALID_LEVEL;
+    pos_current_value_ = INVALID_POS;
     return false;
   }
   parent_->assemble_rows_timer_.Start();
@@ -1541,7 +1744,7 @@ void 
HdfsParquetScanner::CollectionColumnReader::UpdateDerivedState() {
 
   if (RowGroupAtEnd()) {
     // No more values
-    pos_current_value_ = -1;
+    pos_current_value_ = INVALID_POS;
   } else if (rep_level_ <= max_rep_level() - 2) {
     // Reset position counter if we are at the start of a new parent 
collection (i.e.,
     // the current collection is the first item in a new parent collection).
@@ -1669,14 +1872,23 @@ Status HdfsParquetScanner::ProcessSplit() {
     // Prepare column readers for first read
     bool continue_execution = true;
     for (ColumnReader* col_reader: column_readers_) {
-      continue_execution = col_reader->NextLevels();
+      // Seed collection and boolean column readers with NextLevel().
+      // The ScalarColumnReaders use an optimized ReadValueBatch() that
+      // should not be seeded.
+      // TODO: Refactor the column readers to look more like the optimized
+      // ScalarColumnReader::ReadValueBatch() which does not need seeding. This
+      // will allow better sharing of code between the row-wise and column-wise
+      // materialization strategies.
+      if (col_reader->NeedsSeedingForBatchedReading()) {
+        continue_execution = col_reader->NextLevels();
+      }
       if (!continue_execution) break;
       DCHECK(parse_status_.ok()) << "Invalid parse_status_" << 
parse_status_.GetDetail();
     }
 
     bool filters_pass = true;
     if (continue_execution) {
-      AssembleRows(column_readers_, i, &filters_pass);
+      continue_execution = AssembleRows(column_readers_, i, &filters_pass);
       assemble_rows_timer_.Stop();
     }
 
@@ -1698,7 +1910,7 @@ Status HdfsParquetScanner::ProcessSplit() {
 
     DCHECK(continue_execution || !state_->abort_on_error());
     // We should be at the end of the row group if we get this far with no 
parse error
-    if (parse_status_.ok()) DCHECK_EQ(column_readers_[0]->rep_level(), -1);
+    if (parse_status_.ok()) DCHECK(column_readers_[0]->RowGroupAtEnd());
     // Reset parse_status_ for the next row group.
     parse_status_ = Status::OK();
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/14cdb049/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h 
b/be/src/exec/hdfs-parquet-scanner.h
index 3935332..0700cbe 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -383,6 +383,13 @@ class HdfsParquetScanner : public HdfsScanner {
   /// need to issue another read.
   static const int64_t FOOTER_SIZE;
 
+  /// The repetition level is set to this value to indicate the end of a row 
group.
+  static const int16_t ROW_GROUP_END;
+  /// Indicates an invalid definition or repetition level.
+  static const int16_t INVALID_LEVEL;
+  /// Indicates an invalid position value.
+  static const int16_t INVALID_POS;
+
   /// Class that implements Parquet definition and repetition level decoding.
   class LevelDecoder;
 
@@ -425,6 +432,10 @@ class HdfsParquetScanner : public HdfsScanner {
     LocalFilterStats() : considered(0), rejected(0), total_possible(0), 
enabled(1) { }
   };
 
+  /// Pool used for allocating caches of definition/repetition levels that are
+  /// populated by the level readers. The pool is freed in Close().
+  boost::scoped_ptr<MemPool> level_cache_pool_;
+
   /// Track statistics of each filter (one for each filter in filter_ctxs_) 
per scanner so
   /// that expensive aggregation up to the scan node can be performed once, 
during
   /// Close().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/14cdb049/be/src/util/rle-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/rle-encoding.h b/be/src/util/rle-encoding.h
index 73472f0..c6686e3 100644
--- a/be/src/util/rle-encoding.h
+++ b/be/src/util/rle-encoding.h
@@ -105,17 +105,17 @@ class RleDecoder {
   bool Get(T* val);
 
  protected:
+  /// Fills literal_count_ and repeat_count_ with next values. Returns false 
if there
+  /// are no more.
+  template<typename T>
+  bool NextCounts();
+
   BitReader bit_reader_;
   /// Number of bits needed to encode the value. Must be between 0 and 64.
   int bit_width_;
   uint64_t current_value_;
   uint32_t repeat_count_;
   uint32_t literal_count_;
- private:
-  /// Fills literal_count_ and repeat_count_ with next values. Returns false 
if there
-  /// are no more.
-  template<typename T>
-  bool NextCounts();
 };
 
 /// Class to incrementally build the rle data.   This class does not allocate 
any memory.

Reply via email to