http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc 
b/be/src/exec/hdfs-parquet-scanner.cc
index fa64f84..1ebb650 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -17,17 +17,15 @@
 #include <limits> // for std::numeric_limits
 #include <queue>
 
-#include <boost/algorithm/string.hpp>
 #include <gflags/gflags.h>
 #include <gutil/strings/substitute.h>
 
-#include "common/object-pool.h"
 #include "common/logging.h"
+#include "exec/hdfs-scanner.h"
 #include "exec/hdfs-scan-node.h"
+#include "exec/parquet-column-readers.h"
 #include "exec/scanner-context.inline.h"
-#include "exec/read-write-util.h"
 #include "exprs/expr.h"
-#include "gutil/bits.h"
 #include "runtime/collection-value-builder.h"
 #include "runtime/descriptors.h"
 #include "runtime/runtime-state.h"
@@ -37,47 +35,18 @@
 #include "runtime/tuple-row.h"
 #include "runtime/tuple.h"
 #include "runtime/string-value.h"
-#include "util/bitmap.h"
-#include "util/bit-util.h"
-#include "util/decompress.h"
 #include "util/debug-util.h"
 #include "util/error-util.h"
-#include "util/dict-encoding.h"
-#include "util/rle-encoding.h"
-#include "util/runtime-profile-counters.h"
 #include "rpc/thrift-util.h"
 
 #include "common/names.h"
 
-using boost::algorithm::is_any_of;
-using boost::algorithm::split;
-using boost::algorithm::token_compress_on;
+using strings::Substitute;
 using namespace impala;
-using namespace strings;
-
-// Provide a workaround for IMPALA-1658.
-DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
-    "When true, TIMESTAMPs read from files written by Parquet-MR (used by 
Hive) will "
-    "be converted from UTC to local time. Writes are unaffected.");
 
 DEFINE_double(parquet_min_filter_reject_ratio, 0.1, "(Advanced) If the 
percentage of "
     "rows rejected by a runtime filter drops below this value, the filter is 
disabled.");
 
-const int64_t HdfsParquetScanner::FOOTER_SIZE = 100 * 1024;
-const int16_t HdfsParquetScanner::ROW_GROUP_END = 
numeric_limits<int16_t>::min();
-const int16_t HdfsParquetScanner::INVALID_LEVEL = -1;
-const int16_t HdfsParquetScanner::INVALID_POS = -1;
-
-// Max data page header size in bytes. This is an estimate and only needs to 
be an upper
-// bound. It is theoretically possible to have a page header of any size due 
to string
-// value statistics, but in practice we'll have trouble reading string values 
this large.
-// Also, this limit is in place to prevent impala from reading corrupt parquet 
files.
-DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size 
in bytes");
-
-// Max dictionary page header size in bytes. This is an estimate and only 
needs to be an
-// upper bound.
-const int MAX_DICT_HEADER_SIZE = 100;
-
 // The number of rows between checks to see if a filter is not effective, and 
should be
 // disabled. Must be a power of two.
 const int ROWS_PER_FILTER_SELECTIVITY_CHECK = 16 * 1024;
@@ -85,19 +54,14 @@ static_assert(
     !(ROWS_PER_FILTER_SELECTIVITY_CHECK & (ROWS_PER_FILTER_SELECTIVITY_CHECK - 
1)),
     "ROWS_PER_FILTER_SELECTIVITY_CHECK must be a power of two");
 
-// FILE_CHECKs are conditions that we expect to be true but could fail due to 
a malformed
-// input file. They differentiate these cases from DCHECKs, which indicate 
conditions that
-// are true unless there's a bug in Impala. We would ideally always return a 
bad Status
-// instead of failing a FILE_CHECK, but in many cases we use FILE_CHECK 
instead because
-// there's a performance cost to doing the check in a release build, or just 
due to legacy
-// code.
-#define FILE_CHECK(a) DCHECK(a)
-#define FILE_CHECK_EQ(a, b) DCHECK_EQ(a, b)
-#define FILE_CHECK_NE(a, b) DCHECK_NE(a, b)
-#define FILE_CHECK_GT(a, b) DCHECK_GT(a, b)
-#define FILE_CHECK_LT(a, b) DCHECK_LT(a, b)
-#define FILE_CHECK_GE(a, b) DCHECK_GE(a, b)
-#define FILE_CHECK_LE(a, b) DCHECK_LE(a, b)
+// Max dictionary page header size in bytes. This is an estimate and only 
needs to be an
+// upper bound.
+const int MAX_DICT_HEADER_SIZE = 100;
+
+const int64_t HdfsParquetScanner::FOOTER_SIZE;
+const int16_t HdfsParquetScanner::ROW_GROUP_END;
+const int16_t HdfsParquetScanner::INVALID_LEVEL;
+const int16_t HdfsParquetScanner::INVALID_POS;
 
 Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNode* scan_node,
     const std::vector<HdfsFileDesc*>& files) {
@@ -168,967 +132,18 @@ DiskIoMgr::ScanRange* 
HdfsParquetScanner::FindFooterSplit(HdfsFileDesc* file) {
 
 namespace impala {
 
-/// Helper struct that holds a batch of tuples allocated from a mem pool, as 
well
-/// as state associated with iterating over its tuples and transferring
-/// them to an output batch in TransferScratchTuples().
-struct ScratchTupleBatch {
-  // Memory backing the batch of tuples. Allocated from batch's tuple data 
pool.
-  uint8_t* tuple_mem;
-  // Keeps track of the current tuple index.
-  int tuple_idx;
-  // Number of valid tuples in tuple_mem.
-  int num_tuples;
-  // Cached for convenient access.
-  const int tuple_byte_size;
-
-  // Helper batch for safely allocating tuple_mem from its tuple data pool 
using
-  // ResizeAndAllocateTupleBuffer().
-  RowBatch batch;
-
-  ScratchTupleBatch(
-      const RowDescriptor& row_desc, int batch_size, MemTracker* mem_tracker)
-    : tuple_mem(NULL),
-      tuple_idx(0),
-      num_tuples(0),
-      tuple_byte_size(row_desc.GetRowSize()),
-      batch(row_desc, batch_size, mem_tracker) {
-    DCHECK_EQ(row_desc.tuple_descriptors().size(), 1);
-  }
-
-  Status Reset(RuntimeState* state) {
-    tuple_idx = 0;
-    num_tuples = 0;
-    // Buffer size is not needed.
-    int64_t buffer_size;
-    RETURN_IF_ERROR(batch.ResizeAndAllocateTupleBuffer(state, &buffer_size, 
&tuple_mem));
-    return Status::OK();
-  }
-
-  inline Tuple* GetTuple(int tuple_idx) const {
-    return reinterpret_cast<Tuple*>(tuple_mem + tuple_idx * tuple_byte_size);
-  }
-
-  inline MemPool* mem_pool() { return batch.tuple_data_pool(); }
-  inline int capacity() const { return batch.capacity(); }
-  inline uint8_t* CurrTuple() const { return tuple_mem + tuple_idx * 
tuple_byte_size; }
-  inline uint8_t* TupleEnd() const { return tuple_mem + num_tuples * 
tuple_byte_size; }
-  inline bool AtEnd() const { return tuple_idx == num_tuples; }
-};
-
-const string PARQUET_MEM_LIMIT_EXCEEDED = "HdfsParquetScanner::$0() failed to 
allocate "
-    "$1 bytes for $2.";
-
 HdfsParquetScanner::HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* 
state)
     : HdfsScanner(scan_node, state),
       scratch_batch_(new ScratchTupleBatch(
           scan_node->row_desc(), state_->batch_size(), 
scan_node->mem_tracker())),
       metadata_range_(NULL),
       dictionary_pool_(new MemPool(scan_node->mem_tracker())),
-      assemble_rows_timer_(scan_node_->materialize_tuple_timer()) {
+      assemble_rows_timer_(scan_node_->materialize_tuple_timer()),
+      num_cols_counter_(NULL),
+      num_row_groups_counter_(NULL) {
   assemble_rows_timer_.Stop();
 }
 
-HdfsParquetScanner::~HdfsParquetScanner() {
-}
-
-// TODO for 2.3: move column readers to separate file
-
-/// Decoder for all supported Parquet level encodings. Optionally reads, 
decodes, and
-/// caches level values in batches.
-/// Level values are unsigned 8-bit integers because we support a maximum 
nesting
-/// depth of 100, as enforced by the FE. Using a small type saves memory and 
speeds up
-/// populating the level cache (e.g., with RLE we can memset() repeated 
values).
-///
-/// Inherits from RleDecoder instead of containing one for performance reasons.
-/// The containment design would require two BitReaders per column reader. The 
extra
-/// BitReader causes enough bloat for a column reader to require another cache 
line.
-/// TODO: It is not clear whether the inheritance vs. containment choice still 
makes
-/// sense with column-wise materialization. The containment design seems 
cleaner and
-/// we should revisit.
-class HdfsParquetScanner::LevelDecoder : public RleDecoder {
- public:
-  LevelDecoder(bool is_def_level_decoder)
-    : cached_levels_(NULL),
-      num_cached_levels_(0),
-      cached_level_idx_(0),
-      encoding_(parquet::Encoding::PLAIN),
-      max_level_(0),
-      cache_size_(0),
-      num_buffered_values_(0),
-      decoding_error_code_(is_def_level_decoder ?
-          TErrorCode::PARQUET_DEF_LEVEL_ERROR : 
TErrorCode::PARQUET_REP_LEVEL_ERROR) {
-  }
-
-  /// Initialize the LevelDecoder. Reads and advances the provided data buffer 
if the
-  /// encoding requires reading metadata from the page header.
-  Status Init(const string& filename, parquet::Encoding::type encoding,
-      MemPool* cache_pool, int cache_size, int max_level, int 
num_buffered_values,
-      uint8_t** data, int* data_size);
-
-  /// Returns the next level or INVALID_LEVEL if there was an error.
-  inline int16_t ReadLevel();
-
-  /// Decodes and caches the next batch of levels. Resets members associated 
with the
-  /// cache. Returns a non-ok status if there was a problem decoding a level, 
or if a
-  /// level was encountered with a value greater than max_level_.
-  Status CacheNextBatch(int batch_size);
-
-  /// Functions for working with the level cache.
-  inline bool CacheHasNext() const { return cached_level_idx_ < 
num_cached_levels_; }
-  inline uint8_t CacheGetNext() {
-    DCHECK_LT(cached_level_idx_, num_cached_levels_);
-    return cached_levels_[cached_level_idx_++];
-  }
-  inline void CacheSkipLevels(int num_levels) {
-    DCHECK_LE(cached_level_idx_ + num_levels, num_cached_levels_);
-    cached_level_idx_ += num_levels;
-  }
-  inline int CacheSize() const { return num_cached_levels_; }
-  inline int CacheRemaining() const { return num_cached_levels_ - 
cached_level_idx_; }
-  inline int CacheCurrIdx() const { return cached_level_idx_; }
-
- private:
-  /// Initializes members associated with the level cache. Allocates memory for
-  /// the cache from pool, if necessary.
-  Status InitCache(MemPool* pool, int cache_size);
-
-  /// Decodes and writes a batch of levels into the cache. Sets the number of
-  /// values written to the cache in *num_cached_levels. Returns false if 
there was
-  /// an error decoding a level or if there was a level value greater than 
max_level_.
-  bool FillCache(int batch_size, int* num_cached_levels);
-
-  /// Buffer for a batch of levels. The memory is allocated and owned by a 
pool in
-  /// passed in Init().
-  uint8_t* cached_levels_;
-  /// Number of valid level values in the cache.
-  int num_cached_levels_;
-  /// Current index into cached_levels_.
-  int cached_level_idx_;
-  parquet::Encoding::type encoding_;
-
-  /// For error checking and reporting.
-  int max_level_;
-  /// Number of level values cached_levels_ has memory allocated for.
-  int cache_size_;
-  /// Number of remaining data values in the current data page.
-  int num_buffered_values_;
-  string filename_;
-  TErrorCode::type decoding_error_code_;
-};
-
-/// Base class for reading a column. Reads a logical column, not necessarily a 
column
-/// materialized in the file (e.g. collections). The two subclasses are
-/// BaseScalarColumnReader and CollectionColumnReader. Column readers read one 
def and rep
-/// level pair at a time. The current def and rep level are exposed to the 
user, and the
-/// corresponding value (if defined) can optionally be copied into a slot via
-/// ReadValue(). Can also write position slots.
-class HdfsParquetScanner::ColumnReader {
- public:
-  virtual ~ColumnReader() { }
-
-  int def_level() const { return def_level_; }
-  int rep_level() const { return rep_level_; }
-
-  const SlotDescriptor* slot_desc() const { return slot_desc_; }
-  const parquet::SchemaElement& schema_element() const { return 
*node_.element; }
-  int16_t max_def_level() const { return max_def_level_; }
-  int16_t max_rep_level() const { return max_rep_level_; }
-  int def_level_of_immediate_repeated_ancestor() const {
-    return node_.def_level_of_immediate_repeated_ancestor;
-  }
-  const SlotDescriptor* pos_slot_desc() const { return pos_slot_desc_; }
-  void set_pos_slot_desc(const SlotDescriptor* pos_slot_desc) {
-    DCHECK(pos_slot_desc_ == NULL);
-    pos_slot_desc_ = pos_slot_desc;
-  }
-
-  /// Returns true if this reader materializes collections (i.e. 
CollectionValues).
-  virtual bool IsCollectionReader() const { return false; }
-
-  const char* filename() const { return parent_->filename(); };
-
-  /// Read the current value (or null) into 'tuple' for this column. This 
should only be
-  /// called when a value is defined, i.e., def_level() >=
-  /// def_level_of_immediate_repeated_ancestor() (since empty or NULL 
collections produce
-  /// no output values), otherwise NextLevels() should be called instead.
-  ///
-  /// Advances this column reader to the next value (i.e. NextLevels() doesn't 
need to be
-  /// called after calling ReadValue()).
-  ///
-  /// Returns false if execution should be aborted for some reason, e.g. 
parse_error_ is
-  /// set, the query is cancelled, or the scan node limit was reached. 
Otherwise returns
-  /// true.
-  ///
-  /// NextLevels() must be called on this reader before calling ReadValue() 
for the first
-  /// time. This is to initialize the current value that ReadValue() will read.
-  ///
-  /// TODO: this is the function that needs to be codegen'd (e.g. 
CodegenReadValue())
-  /// The codegened functions from all the materialized cols will then be 
combined
-  /// into one function.
-  /// TODO: another option is to materialize col by col for the entire row 
batch in
-  /// one call.  e.g. MaterializeCol would write out 1024 values.  Our row 
batches
-  /// are currently dense so we'll need to figure out something there.
-  virtual bool ReadValue(MemPool* pool, Tuple* tuple) = 0;
-
-  /// Same as ReadValue() but does not advance repetition level. Only valid 
for columns
-  /// not in collections.
-  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) = 0;
-
-  /// Returns true if this reader needs to be seeded with NextLevels() before
-  /// calling ReadValueBatch() or ReadNonRepeatedValueBatch().
-  /// Note that all readers need to be seeded before calling the non-batched 
ReadValue().
-  virtual bool NeedsSeedingForBatchedReading() const { return true; }
-
-  /// Batched version of ReadValue() that reads up to max_values at once and 
materializes
-  /// them into tuples in tuple_mem. Returns the number of values actually 
materialized
-  /// in *num_values. The return value, error behavior and state changes are 
generally
-  /// the same as in ReadValue(). For example, if an error occurs in the 
middle of
-  /// materializing a batch then false is returned, and num_values, tuple_mem, 
as well as
-  /// this column reader are left in an undefined state, assuming that the 
caller will
-  /// immediately abort execution.
-  virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
-      uint8_t* tuple_mem, int* num_values);
-
-  /// Batched version of ReadNonRepeatedValue() that reads up to max_values at 
once and
-  /// materializes them into tuples in tuple_mem.
-  /// The return value and error behavior are the same as in ReadValueBatch().
-  virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int 
tuple_size,
-      uint8_t* tuple_mem, int* num_values);
-
-  /// Advances this column reader's def and rep levels to the next logical 
value, i.e. to
-  /// the next scalar value or the beginning of the next collection, without 
attempting to
-  /// read the value. This is used to skip past def/rep levels that don't 
materialize a
-  /// value, such as the def/rep levels corresponding to an empty containing 
collection.
-  ///
-  /// NextLevels() must be called on this reader before calling ReadValue() 
for the first
-  /// time. This is to initialize the current value that ReadValue() will read.
-  ///
-  /// Returns false if execution should be aborted for some reason, e.g. 
parse_error_ is
-  /// set, the query is cancelled, or the scan node limit was reached. 
Otherwise returns
-  /// true.
-  virtual bool NextLevels() = 0;
-
-  /// Should only be called if pos_slot_desc_ is non-NULL. Writes 
pos_current_value_ to
-  /// 'tuple' (i.e. "reads" the synthetic position field of the parent 
collection into
-  /// 'tuple') and increments pos_current_value_.
-  void ReadPosition(Tuple* tuple);
-
-  /// Returns true if this column reader has reached the end of the row group.
-  inline bool RowGroupAtEnd() { return rep_level_ == ROW_GROUP_END; }
-
- protected:
-  HdfsParquetScanner* parent_;
-  const SchemaNode& node_;
-  const SlotDescriptor* slot_desc_;
-
-  /// The slot descriptor for the position field of the tuple, if there is 
one. NULL if
-  /// there's not. Only one column reader for a given tuple desc will have 
this set.
-  const SlotDescriptor* pos_slot_desc_;
-
-  /// The next value to write into the position slot, if there is one. 64-bit 
int because
-  /// the pos slot is always a BIGINT Set to -1 when this column reader does 
not have a
-  /// current rep and def level (i.e. before the first NextLevels() call or 
after the last
-  /// value in the column has been read).
-  int64_t pos_current_value_;
-
-  /// The current repetition and definition levels of this reader. Advanced via
-  /// ReadValue() and NextLevels(). Set to -1 when this column reader does not 
have a
-  /// current rep and def level (i.e. before the first NextLevels() call or 
after the last
-  /// value in the column has been read). If this is not inside a collection, 
rep_level_ is
-  /// always 0.
-  /// int16_t is large enough to hold the valid levels 0-255 and sentinel 
value -1.
-  /// The maximum values are cached here because they are accessed in inner 
loops.
-  int16_t rep_level_;
-  int16_t max_rep_level_;
-  int16_t def_level_;
-  int16_t max_def_level_;
-
-  // Cache frequently accessed members of slot_desc_ for perf.
-
-  /// slot_desc_->tuple_offset(). -1 if slot_desc_ is NULL.
-  int tuple_offset_;
-
-  /// slot_desc_->null_indicator_offset(). Invalid if slot_desc_ is NULL.
-  NullIndicatorOffset null_indicator_offset_;
-
-  ColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
-      const SlotDescriptor* slot_desc)
-    : parent_(parent),
-      node_(node),
-      slot_desc_(slot_desc),
-      pos_slot_desc_(NULL),
-      pos_current_value_(INVALID_POS),
-      rep_level_(INVALID_LEVEL),
-      max_rep_level_(node_.max_rep_level),
-      def_level_(INVALID_LEVEL),
-      max_def_level_(node_.max_def_level),
-      tuple_offset_(slot_desc == NULL ? -1 : slot_desc->tuple_offset()),
-      null_indicator_offset_(slot_desc == NULL ? NullIndicatorOffset(-1, -1) :
-          slot_desc->null_indicator_offset()) {
-    DCHECK_GE(node_.max_rep_level, 0);
-    DCHECK_LE(node_.max_rep_level, std::numeric_limits<int16_t>::max());
-    DCHECK_GE(node_.max_def_level, 0);
-    DCHECK_LE(node_.max_def_level, std::numeric_limits<int16_t>::max());
-    // rep_level_ is always valid and equal to 0 if col not in collection.
-    if (max_rep_level() == 0) rep_level_ = 0;
-  }
-};
-
-/// Collections are not materialized directly in parquet files; only scalar 
values appear
-/// in the file. CollectionColumnReader uses the definition and repetition 
levels of child
-/// column readers to figure out the boundaries of each collection in this 
column.
-class HdfsParquetScanner::CollectionColumnReader :
-      public HdfsParquetScanner::ColumnReader {
- public:
-  CollectionColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
-      const SlotDescriptor* slot_desc)
-    : ColumnReader(parent, node, slot_desc) {
-    DCHECK(node_.is_repeated());
-    if (slot_desc != NULL) DCHECK(slot_desc->type().IsCollectionType());
-  }
-
-  virtual ~CollectionColumnReader() { }
-
-  vector<ColumnReader*>* children() { return &children_; }
-
-  virtual bool IsCollectionReader() const { return true; }
-
-  /// The repetition level indicating that the current value is the first in a 
new
-  /// collection (meaning the last value read was the final item in the 
previous
-  /// collection).
-  int new_collection_rep_level() const { return max_rep_level() - 1; }
-
-  /// Materializes CollectionValue into tuple slot (if materializing) and 
advances to next
-  /// value.
-  virtual bool ReadValue(MemPool* pool, Tuple* tuple);
-
-  /// Same as ReadValue but does not advance repetition level. Only valid for 
columns not
-  /// in collections.
-  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple);
-
-  /// Advances all child readers to the beginning of the next collection and 
updates this
-  /// reader's state.
-  virtual bool NextLevels();
-
-  /// This is called once for each row group in the file.
-  void Reset() {
-    def_level_ = -1;
-    rep_level_ = -1;
-    pos_current_value_ = -1;
-  }
-
- private:
-  /// Column readers of fields contained within this collection. There is at 
least one
-  /// child reader per collection reader. Child readers either materialize 
slots in the
-  /// collection item tuples, or there is a single child reader that does not 
materialize
-  /// any slot and is only used by this reader to read def and rep levels.
-  vector<ColumnReader*> children_;
-
-  /// Updates this reader's def_level_, rep_level_, and pos_current_value_ 
based on child
-  /// reader's state.
-  void UpdateDerivedState();
-
-  /// Recursively reads from children_ to assemble a single CollectionValue 
into
-  /// *slot. Also advances rep_level_ and def_level_ via NextLevels().
-  ///
-  /// Returns false if execution should be aborted for some reason, e.g. 
parse_error_ is
-  /// set, the query is cancelled, or the scan node limit was reached. 
Otherwise returns
-  /// true.
-  inline bool ReadSlot(void* slot, MemPool* pool);
-};
-
-/// Reader for a single column from the parquet file.  It's associated with a
-/// ScannerContext::Stream and is responsible for decoding the data.  Super 
class for
-/// per-type column readers. This contains most of the logic, the type 
specific functions
-/// must be implemented in the subclass.
-class HdfsParquetScanner::BaseScalarColumnReader :
-      public HdfsParquetScanner::ColumnReader {
- public:
-  BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
-      const SlotDescriptor* slot_desc)
-    : ColumnReader(parent, node, slot_desc),
-      def_levels_(true),
-      rep_levels_(false),
-      num_buffered_values_(0),
-      num_values_read_(0),
-      metadata_(NULL),
-      stream_(NULL),
-      decompressed_data_pool_(new MemPool(parent->scan_node_->mem_tracker())) {
-    DCHECK_GE(node_.col_idx, 0) << node_.DebugString();
-
-  }
-
-  virtual ~BaseScalarColumnReader() { }
-
-  /// This is called once for each row group in the file.
-  Status Reset(const parquet::ColumnMetaData* metadata, 
ScannerContext::Stream* stream) {
-    DCHECK(stream != NULL);
-    DCHECK(metadata != NULL);
-
-    num_buffered_values_ = 0;
-    data_ = NULL;
-    data_end_ = NULL;
-    stream_ = stream;
-    metadata_ = metadata;
-    num_values_read_ = 0;
-    def_level_ = -1;
-    // See ColumnReader constructor.
-    rep_level_ = max_rep_level() == 0 ? 0 : -1;
-    pos_current_value_ = -1;
-
-    if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
-      RETURN_IF_ERROR(Codec::CreateDecompressor(
-          NULL, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], 
&decompressor_));
-    }
-    ClearDictionaryDecoder();
-    return Status::OK();
-  }
-
-  /// Called once when the scanner is complete for final cleanup.
-  void Close() {
-    if (decompressor_.get() != NULL) decompressor_->Close();
-  }
-
-  int64_t total_len() const { return metadata_->total_compressed_size; }
-  int col_idx() const { return node_.col_idx; }
-  THdfsCompression::type codec() const {
-    if (metadata_ == NULL) return THdfsCompression::NONE;
-    return PARQUET_TO_IMPALA_CODEC[metadata_->codec];
-  }
-  MemPool* decompressed_data_pool() const { return 
decompressed_data_pool_.get(); }
-
-  /// Reads the next definition and repetition levels for this column. 
Initializes the
-  /// next data page if necessary.
-  virtual bool NextLevels() { return NextLevels<true>(); }
-
-  // TODO: Some encodings might benefit a lot from a SkipValues(int num_rows) 
if
-  // we know this row can be skipped. This could be very useful with stats and 
big
-  // sections can be skipped. Implement that when we can benefit from it.
-
- protected:
-  // Friend parent scanner so it can perform validation (e.g. 
ValidateEndOfRowGroup())
-  friend class HdfsParquetScanner;
-
-  // Class members that are accessed for every column should be included up 
here so they
-  // fit in as few cache lines as possible.
-
-  /// Pointer to start of next value in data page
-  uint8_t* data_;
-
-  /// End of the data page.
-  const uint8_t* data_end_;
-
-  /// Decoder for definition levels.
-  LevelDecoder def_levels_;
-
-  /// Decoder for repetition levels.
-  LevelDecoder rep_levels_;
-
-  /// Page encoding for values. Cached here for perf.
-  parquet::Encoding::type page_encoding_;
-
-  /// Num values remaining in the current data page
-  int num_buffered_values_;
-
-  // Less frequently used members that are not accessed in inner loop should 
go below
-  // here so they do not occupy precious cache line space.
-
-  /// The number of values seen so far. Updated per data page.
-  int64_t num_values_read_;
-
-  const parquet::ColumnMetaData* metadata_;
-  scoped_ptr<Codec> decompressor_;
-  ScannerContext::Stream* stream_;
-
-  /// Pool to allocate decompression buffers from.
-  boost::scoped_ptr<MemPool> decompressed_data_pool_;
-
-  /// Header for current data page.
-  parquet::PageHeader current_page_header_;
-
-  /// Read the next data page. If a dictionary page is encountered, that will 
be read and
-  /// this function will continue reading the next data page.
-  Status ReadDataPage();
-
-  /// Try to move the the next page and buffer more values. Return false and 
sets rep_level_,
-  /// def_level_ and pos_current_value_ to -1 if no more pages or an error 
encountered.
-  bool NextPage();
-
-  /// Implementation for NextLevels().
-  template <bool ADVANCE_REP_LEVEL>
-  bool NextLevels();
-
-  /// Creates a dictionary decoder from values/size. 'decoder' is set to point 
to a
-  /// dictionary decoder stored in this object. Subclass must implement this. 
Returns
-  /// an error status if the dictionary values could not be decoded 
successfully.
-  virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
-      DictDecoderBase** decoder) = 0;
-
-  /// Return true if the column has an initialized dictionary decoder. 
Subclass must
-  /// implement this.
-  virtual bool HasDictionaryDecoder() = 0;
-
-  /// Clear the dictionary decoder so HasDictionaryDecoder() will return 
false. Subclass
-  /// must implement this.
-  virtual void ClearDictionaryDecoder() = 0;
-
-  /// Initializes the reader with the data contents. This is the content for 
the entire
-  /// decompressed data page. Decoders can initialize state from here.
-  virtual Status InitDataPage(uint8_t* data, int size) = 0;
-
- private:
-  /// Writes the next value into *slot using pool if necessary. Also advances 
rep_level_
-  /// and def_level_ via NextLevels().
-  ///
-  /// Returns false if execution should be aborted for some reason, e.g. 
parse_error_ is
-  /// set, the query is cancelled, or the scan node limit was reached. 
Otherwise returns
-  /// true.
-  template <bool IN_COLLECTION>
-  inline bool ReadSlot(void* slot, MemPool* pool);
-};
-
-/// Per column type reader. If MATERIALIZED is true, the column values are 
materialized
-/// into the slot described by slot_desc. If MATERIALIZED is false, the column 
values
-/// are not materialized, but the position can be accessed.
-template<typename T, bool MATERIALIZED>
-class HdfsParquetScanner::ScalarColumnReader :
-      public HdfsParquetScanner::BaseScalarColumnReader {
- public:
-  ScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
-      const SlotDescriptor* slot_desc)
-    : BaseScalarColumnReader(parent, node, slot_desc),
-      dict_decoder_init_(false) {
-    if (!MATERIALIZED) {
-      // We're not materializing any values, just counting them. No need (or 
ability) to
-      // initialize state used to materialize values.
-      DCHECK(slot_desc_ == NULL);
-      return;
-    }
-
-    DCHECK(slot_desc_ != NULL);
-    DCHECK_NE(slot_desc_->type().type, TYPE_BOOLEAN);
-    if (slot_desc_->type().type == TYPE_DECIMAL) {
-      fixed_len_size_ = ParquetPlainEncoder::DecimalSize(slot_desc_->type());
-    } else if (slot_desc_->type().type == TYPE_VARCHAR) {
-      fixed_len_size_ = slot_desc_->type().len;
-    } else {
-      fixed_len_size_ = -1;
-    }
-    needs_conversion_ = slot_desc_->type().type == TYPE_CHAR ||
-        // TODO: Add logic to detect file versions that have unconverted 
TIMESTAMP
-        // values. Currently all versions have converted values.
-        (FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
-        slot_desc_->type().type == TYPE_TIMESTAMP &&
-        parent->file_version_.application == "parquet-mr");
-  }
-
-  virtual ~ScalarColumnReader() { }
-
-  virtual bool ReadValue(MemPool* pool, Tuple* tuple) {
-    return ReadValue<true>(pool, tuple);
-  }
-
-  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) {
-    return ReadValue<false>(pool, tuple);
-  }
-
-  virtual bool NeedsSeedingForBatchedReading() const { return false; }
-
-  virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
-      uint8_t* tuple_mem, int* num_values) {
-    return ReadValueBatch<true>(pool, max_values, tuple_size, tuple_mem, 
num_values);
-  }
-
-  virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int 
tuple_size,
-      uint8_t* tuple_mem, int* num_values) {
-    return ReadValueBatch<false>(pool, max_values, tuple_size, tuple_mem, 
num_values);
-  }
-
- protected:
-  template <bool IN_COLLECTION>
-  inline bool ReadValue(MemPool* pool, Tuple* tuple) {
-    // NextLevels() should have already been called and def and rep levels 
should be in
-    // valid range.
-    DCHECK_GE(rep_level_, 0);
-    DCHECK_LE(rep_level_, max_rep_level());
-    DCHECK_GE(def_level_, 0);
-    DCHECK_LE(def_level_, max_def_level());
-    DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
-        "Caller should have called NextLevels() until we are ready to read a 
value";
-
-    if (MATERIALIZED) {
-      if (def_level_ >= max_def_level()) {
-        if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
-          if (!ReadSlot<true>(tuple->GetSlot(tuple_offset_), pool)) return 
false;
-        } else {
-          if (!ReadSlot<false>(tuple->GetSlot(tuple_offset_), pool)) return 
false;
-        }
-      } else {
-        tuple->SetNull(null_indicator_offset_);
-      }
-    }
-    return NextLevels<IN_COLLECTION>();
-  }
-
-  /// Implementation of the ReadValueBatch() functions specialized for this
-  /// column reader type. This function drives the reading of data pages and
-  /// caching of rep/def levels. Once a data page and cached levels are 
available,
-  /// it calls into a more specialized MaterializeValueBatch() for doing the 
actual
-  /// value materialization using the level caches.
-  template<bool IN_COLLECTION>
-  bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
-      uint8_t* tuple_mem, int* num_values) {
-    // Repetition level is only present if this column is nested in a 
collection type.
-    if (!IN_COLLECTION) DCHECK_EQ(max_rep_level(), 0) << 
slot_desc()->DebugString();
-    if (IN_COLLECTION) DCHECK_GT(max_rep_level(), 0) << 
slot_desc()->DebugString();
-
-    int val_count = 0;
-    bool continue_execution = true;
-    while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
-      // Read next page if necessary.
-      if (num_buffered_values_ == 0) {
-        if (!NextPage()) {
-          continue_execution = parent_->parse_status_.ok();
-          continue;
-        }
-      }
-
-      // Fill def/rep level caches if they are empty.
-      int level_batch_size = min(parent_->state_->batch_size(), 
num_buffered_values_);
-      if (!def_levels_.CacheHasNext()) {
-        
parent_->parse_status_.MergeStatus(def_levels_.CacheNextBatch(level_batch_size));
-      }
-      // We only need the repetition levels for populating the position slot 
since we
-      // are only populating top-level tuples.
-      if (IN_COLLECTION && pos_slot_desc_ != NULL && 
!rep_levels_.CacheHasNext()) {
-        
parent_->parse_status_.MergeStatus(rep_levels_.CacheNextBatch(level_batch_size));
-      }
-      if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-
-      // This special case is most efficiently handled here directly.
-      if (!MATERIALIZED && !IN_COLLECTION) {
-        int vals_to_add = min(def_levels_.CacheRemaining(), max_values - 
val_count);
-        val_count += vals_to_add;
-        def_levels_.CacheSkipLevels(vals_to_add);
-        num_buffered_values_ -= vals_to_add;
-        continue;
-      }
-
-      // Read data page and cached levels to materialize values.
-      int cache_start_idx = def_levels_.CacheCurrIdx();
-      uint8_t* next_tuple = tuple_mem + val_count * tuple_size;
-      int remaining_val_capacity = max_values - val_count;
-      int ret_val_count = 0;
-      if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
-        continue_execution = MaterializeValueBatch<IN_COLLECTION, true>(
-            pool, remaining_val_capacity, tuple_size, next_tuple, 
&ret_val_count);
-      } else {
-        continue_execution = MaterializeValueBatch<IN_COLLECTION, false>(
-            pool, remaining_val_capacity, tuple_size, next_tuple, 
&ret_val_count);
-      }
-      val_count += ret_val_count;
-      num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx);
-    }
-    *num_values = val_count;
-    return continue_execution;
-  }
-
-  /// Helper function for ReadValueBatch() above that performs value 
materialization.
-  /// It assumes a data page with remaining values is available, and that the 
def/rep
-  /// level caches have been populated.
-  /// For efficiency, the simple special case of !MATERIALIZED && 
!IN_COLLECTION is not
-  /// handled in this function.
-  template<bool IN_COLLECTION, bool IS_DICT_ENCODED>
-  bool MaterializeValueBatch(MemPool* pool, int max_values, int tuple_size,
-      uint8_t* tuple_mem, int* num_values) {
-    DCHECK(MATERIALIZED || IN_COLLECTION);
-    DCHECK_GT(num_buffered_values_, 0);
-    DCHECK(def_levels_.CacheHasNext());
-    if (IN_COLLECTION && pos_slot_desc_ != NULL) 
DCHECK(rep_levels_.CacheHasNext());
-
-    uint8_t* curr_tuple = tuple_mem;
-    int val_count = 0;
-    while (def_levels_.CacheHasNext()) {
-      Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
-      int def_level = def_levels_.CacheGetNext();
-
-      if (IN_COLLECTION) {
-        if (def_level < def_level_of_immediate_repeated_ancestor()) {
-          // A containing repeated field is empty or NULL. Skip the value but
-          // move to the next repetition level if necessary.
-          if (pos_slot_desc_ != NULL) rep_levels_.CacheGetNext();
-          continue;
-        }
-        if (pos_slot_desc_ != NULL) {
-          int rep_level = rep_levels_.CacheGetNext();
-          // Reset position counter if we are at the start of a new parent 
collection.
-          if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0;
-          void* pos_slot = tuple->GetSlot(pos_slot_desc()->tuple_offset());
-          *reinterpret_cast<int64_t*>(pos_slot) = pos_current_value_++;
-        }
-      }
-
-      if (MATERIALIZED) {
-        if (def_level >= max_def_level()) {
-          bool continue_execution =
-              ReadSlot<IS_DICT_ENCODED>(tuple->GetSlot(tuple_offset_), pool);
-          if (UNLIKELY(!continue_execution)) return false;
-        } else {
-          tuple->SetNull(null_indicator_offset_);
-        }
-      }
-
-      curr_tuple += tuple_size;
-      ++val_count;
-      if (UNLIKELY(val_count == max_values)) break;
-    }
-    *num_values = val_count;
-    return true;
-  }
-
-  virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
-      DictDecoderBase** decoder) {
-    if (!dict_decoder_.Reset(values, size, fixed_len_size_)) {
-        return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
-            slot_desc_->type().DebugString(), "could not decode dictionary");
-    }
-    dict_decoder_init_ = true;
-    *decoder = &dict_decoder_;
-    return Status::OK();
-  }
-
-  virtual bool HasDictionaryDecoder() {
-    return dict_decoder_init_;
-  }
-
-  virtual void ClearDictionaryDecoder() {
-    dict_decoder_init_ = false;
-  }
-
-  virtual Status InitDataPage(uint8_t* data, int size) {
-    page_encoding_ = current_page_header_.data_page_header.encoding;
-    if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY &&
-        page_encoding_ != parquet::Encoding::PLAIN) {
-      stringstream ss;
-      ss << "File '" << filename() << "' is corrupt: unexpected encoding: "
-         << PrintEncoding(page_encoding_) << " for data page of column '"
-         << schema_element().name << "'.";
-      return Status(ss.str());
-    }
-
-    // If slot_desc_ is NULL, dict_decoder_ is uninitialized
-    if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY && slot_desc_ != 
NULL) {
-      if (!dict_decoder_init_) {
-        return Status("File corrupt. Missing dictionary page.");
-      }
-      dict_decoder_.SetData(data, size);
-    }
-
-    // TODO: Perform filter selectivity checks here.
-    return Status::OK();
-  }
-
- private:
-  /// Writes the next value into *slot using pool if necessary.
-  ///
-  /// Returns false if execution should be aborted for some reason, e.g. 
parse_error_ is
-  /// set, the query is cancelled, or the scan node limit was reached. 
Otherwise returns
-  /// true.
-  template<bool IS_DICT_ENCODED>
-  inline bool ReadSlot(void* slot, MemPool* pool) {
-    T val;
-    T* val_ptr = NeedsConversion() ? &val : reinterpret_cast<T*>(slot);
-    if (IS_DICT_ENCODED) {
-      DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN_DICTIONARY);
-      if (UNLIKELY(!dict_decoder_.GetValue(val_ptr))) {
-        SetDictDecodeError();
-        return false;
-      }
-    } else {
-      DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN);
-      int encoded_len =
-          ParquetPlainEncoder::Decode<T>(data_, data_end_, fixed_len_size_, 
val_ptr);
-      if (UNLIKELY(encoded_len < 0)) {
-        SetPlainDecodeError();
-        return false;
-      }
-      data_ += encoded_len;
-    }
-    if (UNLIKELY(NeedsConversion() &&
-            !ConvertSlot(&val, reinterpret_cast<T*>(slot), pool))) {
-      return false;
-    }
-    return true;
-  }
-
-  /// Most column readers never require conversion, so we can avoid branches by
-  /// returning constant false. Column readers for types that require 
conversion
-  /// must specialize this function.
-  inline bool NeedsConversion() const {
-    DCHECK(!needs_conversion_);
-    return false;
-  }
-
-  /// Converts and writes src into dst based on desc_->type()
-  bool ConvertSlot(const T* src, T* dst, MemPool* pool) {
-    DCHECK(false);
-    return false;
-  }
-
-  /// Pull out slow-path Status construction code from ReadRepetitionLevel()/
-  /// ReadDefinitionLevel() for performance.
-  void __attribute__((noinline)) SetDictDecodeError() {
-    parent_->parse_status_ = Status(TErrorCode::PARQUET_DICT_DECODE_FAILURE, 
filename(),
-        slot_desc_->type().DebugString(), stream_->file_offset());
-  }
-  void __attribute__((noinline)) SetPlainDecodeError() {
-    parent_->parse_status_ = Status(TErrorCode::PARQUET_CORRUPT_PLAIN_VALUE, 
filename(),
-        slot_desc_->type().DebugString(), stream_->file_offset());
-  }
-
-  /// Dictionary decoder for decoding column values.
-  DictDecoder<T> dict_decoder_;
-
-  /// True if dict_decoder_ has been initialized with a dictionary page.
-  bool dict_decoder_init_;
-
-  /// true if decoded values must be converted before being written to an 
output tuple.
-  bool needs_conversion_;
-
-  /// The size of this column with plain encoding for FIXED_LEN_BYTE_ARRAY, or
-  /// the max length for VARCHAR columns. Unused otherwise.
-  int fixed_len_size_;
-};
-
-template<>
-inline bool HdfsParquetScanner::ScalarColumnReader<StringValue, 
true>::NeedsConversion() const {
-  return needs_conversion_;
-}
-
-template<>
-bool HdfsParquetScanner::ScalarColumnReader<StringValue, true>::ConvertSlot(
-    const StringValue* src, StringValue* dst, MemPool* pool) {
-  DCHECK(slot_desc() != NULL);
-  DCHECK(slot_desc()->type().type == TYPE_CHAR);
-  int len = slot_desc()->type().len;
-  StringValue sv;
-  sv.len = len;
-  if (slot_desc()->type().IsVarLenStringType()) {
-    sv.ptr = reinterpret_cast<char*>(pool->TryAllocate(len));
-    if (UNLIKELY(sv.ptr == NULL)) {
-      string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ConvertSlot",
-          len, "StringValue");
-      parent_->parse_status_ =
-          pool->mem_tracker()->MemLimitExceeded(parent_->state_, details, len);
-      return false;
-    }
-  } else {
-    sv.ptr = reinterpret_cast<char*>(dst);
-  }
-  int unpadded_len = min(len, src->len);
-  memcpy(sv.ptr, src->ptr, unpadded_len);
-  StringValue::PadWithSpaces(sv.ptr, len, unpadded_len);
-
-  if (slot_desc()->type().IsVarLenStringType()) *dst = sv;
-  return true;
-}
-
-template<>
-inline bool HdfsParquetScanner::ScalarColumnReader<TimestampValue, 
true>::NeedsConversion() const {
-  return needs_conversion_;
-}
-
-template<>
-bool HdfsParquetScanner::ScalarColumnReader<TimestampValue, true>::ConvertSlot(
-    const TimestampValue* src, TimestampValue* dst, MemPool* pool) {
-  // Conversion should only happen when this flag is enabled.
-  DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps);
-  *dst = *src;
-  if (dst->HasDateAndTime()) dst->UtcToLocal();
-  return true;
-}
-
-class HdfsParquetScanner::BoolColumnReader :
-      public HdfsParquetScanner::BaseScalarColumnReader {
- public:
-  BoolColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
-      const SlotDescriptor* slot_desc)
-    : BaseScalarColumnReader(parent, node, slot_desc) {
-    if (slot_desc_ != NULL) DCHECK_EQ(slot_desc_->type().type, TYPE_BOOLEAN);
-  }
-
-  virtual ~BoolColumnReader() { }
-
-  virtual bool ReadValue(MemPool* pool, Tuple* tuple) {
-    return ReadValue<true>(pool, tuple);
-  }
-
-  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) {
-    return ReadValue<false>(pool, tuple);
-  }
-
- protected:
-  virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
-      DictDecoderBase** decoder) {
-    DCHECK(false) << "Dictionary encoding is not supported for bools. Should 
never "
-                  << "have gotten this far.";
-    return Status::OK();
-  }
-
-  virtual bool HasDictionaryDecoder() {
-    // Decoder should never be created for bools.
-    return false;
-  }
-
-  virtual void ClearDictionaryDecoder() { }
-
-  virtual Status InitDataPage(uint8_t* data, int size) {
-    // Initialize bool decoder
-    bool_values_ = BitReader(data, size);
-    return Status::OK();
-  }
-
- private:
-  template<bool IN_COLLECTION>
-  inline bool ReadValue(MemPool* pool, Tuple* tuple) {
-    DCHECK(slot_desc_ != NULL);
-    // Def and rep levels should be in valid range.
-    DCHECK_GE(rep_level_, 0);
-    DCHECK_LE(rep_level_, max_rep_level());
-    DCHECK_GE(def_level_, 0);
-    DCHECK_LE(def_level_, max_def_level());
-    DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
-        "Caller should have called NextLevels() until we are ready to read a 
value";
-
-    if (def_level_ >= max_def_level()) {
-      return ReadSlot<IN_COLLECTION>(tuple->GetSlot(tuple_offset_), pool);
-    } else {
-      // Null value
-      tuple->SetNull(null_indicator_offset_);
-      return NextLevels<IN_COLLECTION>();
-    }
-  }
-
-  /// Writes the next value into *slot using pool if necessary. Also advances 
def_level_
-  /// and rep_level_ via NextLevels().
-  ///
-  /// Returns false if execution should be aborted for some reason, e.g. 
parse_error_ is
-  /// set, the query is cancelled, or the scan node limit was reached. 
Otherwise returns
-  /// true.
-  template <bool IN_COLLECTION>
-  inline bool ReadSlot(void* slot, MemPool* pool)  {
-    if (!bool_values_.GetValue(1, reinterpret_cast<bool*>(slot))) {
-      parent_->parse_status_ = Status("Invalid bool column.");
-      return false;
-    }
-    return NextLevels<IN_COLLECTION>();
-  }
-
-  BitReader bool_values_;
-};
-
-}
-
 Status HdfsParquetScanner::Prepare(ScannerContext* context) {
   RETURN_IF_ERROR(HdfsScanner::Prepare(context));
   metadata_range_ = stream_->scan_range();
@@ -1154,16 +169,16 @@ void HdfsParquetScanner::Close() {
   vector<THdfsCompression::type> compression_types;
 
   // Visit each column reader, including collection reader children.
-  stack<ColumnReader*> readers;
-  for (ColumnReader* r: column_readers_) readers.push(r);
+  stack<ParquetColumnReader*> readers;
+  for (ParquetColumnReader* r: column_readers_) readers.push(r);
   while (!readers.empty()) {
-    ColumnReader* col_reader = readers.top();
+    ParquetColumnReader* col_reader = readers.top();
     readers.pop();
 
     if (col_reader->IsCollectionReader()) {
       CollectionColumnReader* collection_reader =
           static_cast<CollectionColumnReader*>(col_reader);
-      for (ColumnReader* r: *collection_reader->children()) readers.push(r);
+      for (ParquetColumnReader* r: *collection_reader->children()) 
readers.push(r);
       continue;
     }
 
@@ -1207,629 +222,6 @@ void HdfsParquetScanner::Close() {
   HdfsScanner::Close();
 }
 
-HdfsParquetScanner::ColumnReader* HdfsParquetScanner::CreateReader(
-    const SchemaNode& node, bool is_collection_field, const SlotDescriptor* 
slot_desc) {
-  ColumnReader* reader = NULL;
-  if (is_collection_field) {
-    // Create collection reader (note this handles both NULL and non-NULL 
'slot_desc')
-    reader = new CollectionColumnReader(this, node, slot_desc);
-  } else if (slot_desc != NULL) {
-    // Create the appropriate ScalarColumnReader type to read values into 
'slot_desc'
-    switch (slot_desc->type().type) {
-      case TYPE_BOOLEAN:
-        reader = new BoolColumnReader(this, node, slot_desc);
-        break;
-      case TYPE_TINYINT:
-        reader = new ScalarColumnReader<int8_t, true>(this, node, slot_desc);
-        break;
-      case TYPE_SMALLINT:
-        reader = new ScalarColumnReader<int16_t, true>(this, node, slot_desc);
-        break;
-      case TYPE_INT:
-        reader = new ScalarColumnReader<int32_t, true>(this, node, slot_desc);
-        break;
-      case TYPE_BIGINT:
-        reader = new ScalarColumnReader<int64_t, true>(this, node, slot_desc);
-        break;
-      case TYPE_FLOAT:
-        reader = new ScalarColumnReader<float, true>(this, node, slot_desc);
-        break;
-      case TYPE_DOUBLE:
-        reader = new ScalarColumnReader<double, true>(this, node, slot_desc);
-        break;
-      case TYPE_TIMESTAMP:
-        reader = new ScalarColumnReader<TimestampValue, true>(this, node, 
slot_desc);
-        break;
-      case TYPE_STRING:
-      case TYPE_VARCHAR:
-      case TYPE_CHAR:
-        reader = new ScalarColumnReader<StringValue, true>(this, node, 
slot_desc);
-        break;
-      case TYPE_DECIMAL:
-        switch (slot_desc->type().GetByteSize()) {
-          case 4:
-            reader = new ScalarColumnReader<Decimal4Value, true>(this, node, 
slot_desc);
-            break;
-          case 8:
-            reader = new ScalarColumnReader<Decimal8Value, true>(this, node, 
slot_desc);
-            break;
-          case 16:
-            reader = new ScalarColumnReader<Decimal16Value, true>(this, node, 
slot_desc);
-            break;
-        }
-        break;
-      default:
-        DCHECK(false) << slot_desc->type().DebugString();
-    }
-  } else {
-    // Special case for counting scalar values (e.g. count(*), no materialized 
columns in
-    // the file, only materializing a position slot). We won't actually read 
any values,
-    // only the rep and def levels, so it doesn't matter what kind of reader 
we make.
-    reader = new ScalarColumnReader<int8_t, false>(this, node, slot_desc);
-  }
-  return obj_pool_.Add(reader);
-}
-
-bool HdfsParquetScanner::ColumnReader::ReadValueBatch(MemPool* pool, int 
max_values,
-    int tuple_size, uint8_t* tuple_mem, int* num_values) {
-  int val_count = 0;
-  bool continue_execution = true;
-  while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
-    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * 
tuple_size);
-    if (def_level_ < def_level_of_immediate_repeated_ancestor()) {
-      // A containing repeated field is empty or NULL
-      continue_execution = NextLevels();
-      continue;
-    }
-    // Fill in position slot if applicable
-    if (pos_slot_desc_ != NULL) ReadPosition(tuple);
-    continue_execution = ReadValue(pool, tuple);
-    ++val_count;
-  }
-  *num_values = val_count;
-  return continue_execution;
-}
-
-bool HdfsParquetScanner::ColumnReader::ReadNonRepeatedValueBatch(MemPool* pool,
-    int max_values, int tuple_size, uint8_t* tuple_mem, int* num_values) {
-  int val_count = 0;
-  bool continue_execution = true;
-  while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
-    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * 
tuple_size);
-    continue_execution = ReadNonRepeatedValue(pool, tuple);
-    ++val_count;
-  }
-  *num_values = val_count;
-  return continue_execution;
-}
-
-void HdfsParquetScanner::ColumnReader::ReadPosition(Tuple* tuple) {
-  DCHECK(pos_slot_desc() != NULL);
-  // NextLevels() should have already been called
-  DCHECK_GE(rep_level_, 0);
-  DCHECK_GE(def_level_, 0);
-  DCHECK_GE(pos_current_value_, 0);
-  DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
-      "Caller should have called NextLevels() until we are ready to read a 
value";
-
-  void* slot = tuple->GetSlot(pos_slot_desc()->tuple_offset());
-  *reinterpret_cast<int64_t*>(slot) = pos_current_value_++;
-}
-
-// In 1.1, we had a bug where the dictionary page metadata was not set. 
Returns true
-// if this matches those versions and compatibility workarounds need to be 
used.
-static bool RequiresSkippedDictionaryHeaderCheck(
-    const HdfsParquetScanner::FileVersion& v) {
-  if (v.application != "impala") return false;
-  return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal);
-}
-
-Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() {
-  Status status;
-  uint8_t* buffer;
-
-  // We're about to move to the next data page.  The previous data page is
-  // now complete, pass along the memory allocated for it.
-  
parent_->scratch_batch_->mem_pool()->AcquireData(decompressed_data_pool_.get(), 
false);
-
-  // Read the next data page, skipping page types we don't care about.
-  // We break out of this loop on the non-error case (a data page was found or 
we read all
-  // the pages).
-  while (true) {
-    DCHECK_EQ(num_buffered_values_, 0);
-    if (num_values_read_ == metadata_->num_values) {
-      // No more pages to read
-      // TODO: should we check for stream_->eosr()?
-      break;
-    } else if (num_values_read_ > metadata_->num_values) {
-      ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
-          metadata_->num_values, num_values_read_, node_.element->name, 
filename());
-      RETURN_IF_ERROR(parent_->LogOrReturnError(msg));
-      return Status::OK();
-    }
-
-    int64_t buffer_size;
-    RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &buffer_size));
-    if (buffer_size == 0) {
-      // The data pages contain fewer values than stated in the column 
metadata.
-      DCHECK(stream_->eosr());
-      DCHECK_LT(num_values_read_, metadata_->num_values);
-      // TODO for 2.3: node_.element->name isn't necessarily useful
-      ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
-          metadata_->num_values, num_values_read_, node_.element->name, 
filename());
-      RETURN_IF_ERROR(parent_->LogOrReturnError(msg));
-      return Status::OK();
-    }
-
-    // We don't know the actual header size until the thrift object is 
deserialized.  Loop
-    // until we successfully deserialize the header or exceed the maximum 
header size.
-    uint32_t header_size;
-    while (true) {
-      header_size = buffer_size;
-      status = DeserializeThriftMsg(
-          buffer, &header_size, true, &current_page_header_);
-      if (status.ok()) break;
-
-      if (buffer_size >= FLAGS_max_page_header_size) {
-        stringstream ss;
-        ss << "ParquetScanner: could not read data page because page header 
exceeded "
-           << "maximum size of "
-           << PrettyPrinter::Print(FLAGS_max_page_header_size, TUnit::BYTES);
-        status.AddDetail(ss.str());
-        return status;
-      }
-
-      // Didn't read entire header, increase buffer size and try again
-      Status status;
-      int64_t new_buffer_size = max<int64_t>(buffer_size * 2, 1024);
-      bool success = stream_->GetBytes(
-          new_buffer_size, &buffer, &new_buffer_size, &status, /* peek */ 
true);
-      if (!success) {
-        DCHECK(!status.ok());
-        return status;
-      }
-      DCHECK(status.ok());
-
-      if (buffer_size == new_buffer_size) {
-        DCHECK_NE(new_buffer_size, 0);
-        return Status(TErrorCode::PARQUET_HEADER_EOF, filename());
-      }
-      DCHECK_GT(new_buffer_size, buffer_size);
-      buffer_size = new_buffer_size;
-    }
-
-    // Successfully deserialized current_page_header_
-    if (!stream_->SkipBytes(header_size, &status)) return status;
-
-    int data_size = current_page_header_.compressed_page_size;
-    int uncompressed_size = current_page_header_.uncompressed_page_size;
-
-    if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) {
-      if (slot_desc_ == NULL) {
-        // Skip processing the dictionary page if we don't need to decode any 
values. In
-        // addition to being unnecessary, we are likely unable to successfully 
decode the
-        // dictionary values because we don't necessarily create the right 
type of scalar
-        // reader if there's no slot to read into (see CreateReader()).
-        if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
-        continue;
-      }
-
-      if (HasDictionaryDecoder()) {
-        return Status("Column chunk should not contain two dictionary pages.");
-      }
-      if (node_.element->type == parquet::Type::BOOLEAN) {
-        return Status("Unexpected dictionary page. Dictionary page is not"
-            " supported for booleans.");
-      }
-      const parquet::DictionaryPageHeader* dict_header = NULL;
-      if (current_page_header_.__isset.dictionary_page_header) {
-        dict_header = &current_page_header_.dictionary_page_header;
-      } else {
-        if (!RequiresSkippedDictionaryHeaderCheck(parent_->file_version_)) {
-          return Status("Dictionary page does not have dictionary header 
set.");
-        }
-      }
-      if (dict_header != NULL &&
-          dict_header->encoding != parquet::Encoding::PLAIN &&
-          dict_header->encoding != parquet::Encoding::PLAIN_DICTIONARY) {
-        return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported 
"
-            "for dictionary pages.");
-      }
-
-      if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
-      data_end_ = data_ + data_size;
-
-      uint8_t* dict_values = NULL;
-      if (decompressor_.get() != NULL) {
-        dict_values = 
parent_->dictionary_pool_->TryAllocate(uncompressed_size);
-        if (UNLIKELY(dict_values == NULL)) {
-          string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, 
"ReadDataPage",
-              uncompressed_size, "dictionary");
-          return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
-              parent_->state_, details, uncompressed_size);
-        }
-        RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, data_size, data_,
-            &uncompressed_size, &dict_values));
-        VLOG_FILE << "Decompressed " << data_size << " to " << 
uncompressed_size;
-        if (current_page_header_.uncompressed_page_size != uncompressed_size) {
-          return Status(Substitute("Error decompressing dictionary page in 
file '$0'. "
-              "Expected $1 uncompressed bytes but got $2", filename(),
-              current_page_header_.uncompressed_page_size, uncompressed_size));
-        }
-        data_size = uncompressed_size;
-      } else {
-        if (current_page_header_.uncompressed_page_size != data_size) {
-          return Status(Substitute("Error reading dictionary page in file 
'$0'. "
-              "Expected $1 bytes but got $2", filename(),
-              current_page_header_.uncompressed_page_size, data_size));
-        }
-        // Copy dictionary from io buffer (which will be recycled as we read
-        // more data) to a new buffer
-        dict_values = parent_->dictionary_pool_->TryAllocate(data_size);
-        if (UNLIKELY(dict_values == NULL)) {
-          string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, 
"ReadDataPage",
-              data_size, "dictionary");
-          return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
-              parent_->state_, details, data_size);
-        }
-        memcpy(dict_values, data_, data_size);
-      }
-
-      DictDecoderBase* dict_decoder;
-      RETURN_IF_ERROR(CreateDictionaryDecoder(dict_values, data_size, 
&dict_decoder));
-      if (dict_header != NULL &&
-          dict_header->num_values != dict_decoder->num_entries()) {
-        return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
-            slot_desc_->type().DebugString(),
-            Substitute("Expected $0 entries but data contained $1 entries",
-            dict_header->num_values, dict_decoder->num_entries()));
-      }
-      // Done with dictionary page, read next page
-      continue;
-    }
-
-    if (current_page_header_.type != parquet::PageType::DATA_PAGE) {
-      // We can safely skip non-data pages
-      if (!stream_->SkipBytes(data_size, &status)) return status;
-      continue;
-    }
-
-    // Read Data Page
-    // TODO: when we start using page statistics, we will need to ignore 
certain corrupt
-    // statistics. See IMPALA-2208 and PARQUET-251.
-    if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
-    data_end_ = data_ + data_size;
-    num_buffered_values_ = current_page_header_.data_page_header.num_values;
-    num_values_read_ += num_buffered_values_;
-
-    if (decompressor_.get() != NULL) {
-      SCOPED_TIMER(parent_->decompress_timer_);
-      uint8_t* decompressed_buffer =
-          decompressed_data_pool_->TryAllocate(uncompressed_size);
-      if (UNLIKELY(decompressed_buffer == NULL)) {
-        string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage",
-            uncompressed_size, "decompressed data");
-        return decompressed_data_pool_->mem_tracker()->MemLimitExceeded(
-            parent_->state_, details, uncompressed_size);
-      }
-      RETURN_IF_ERROR(decompressor_->ProcessBlock32(true,
-          current_page_header_.compressed_page_size, data_, &uncompressed_size,
-          &decompressed_buffer));
-      VLOG_FILE << "Decompressed " << current_page_header_.compressed_page_size
-                << " to " << uncompressed_size;
-      if (current_page_header_.uncompressed_page_size != uncompressed_size) {
-        return Status(Substitute("Error decompressing data page in file '$0'. "
-            "Expected $1 uncompressed bytes but got $2", filename(),
-            current_page_header_.uncompressed_page_size, uncompressed_size));
-      }
-      data_ = decompressed_buffer;
-      data_size = current_page_header_.uncompressed_page_size;
-      data_end_ = data_ + data_size;
-    } else {
-      DCHECK_EQ(metadata_->codec, parquet::CompressionCodec::UNCOMPRESSED);
-      if (current_page_header_.compressed_page_size != uncompressed_size) {
-        return Status(Substitute("Error reading data page in file '$0'. "
-            "Expected $1 bytes but got $2", filename(),
-            current_page_header_.compressed_page_size, uncompressed_size));
-      }
-    }
-
-    // Initialize the repetition level data
-    RETURN_IF_ERROR(rep_levels_.Init(filename(),
-        current_page_header_.data_page_header.repetition_level_encoding,
-        parent_->level_cache_pool_.get(), parent_->state_->batch_size(),
-        max_rep_level(), num_buffered_values_,
-        &data_, &data_size));
-
-    // Initialize the definition level data
-    RETURN_IF_ERROR(def_levels_.Init(filename(),
-        current_page_header_.data_page_header.definition_level_encoding,
-        parent_->level_cache_pool_.get(), parent_->state_->batch_size(),
-        max_def_level(), num_buffered_values_, &data_, &data_size));
-
-    // Data can be empty if the column contains all NULLs
-    if (data_size != 0) RETURN_IF_ERROR(InitDataPage(data_, data_size));
-    break;
-  }
-
-  return Status::OK();
-}
-
-Status HdfsParquetScanner::LevelDecoder::Init(const string& filename,
-    parquet::Encoding::type encoding, MemPool* cache_pool, int cache_size,
-    int max_level, int num_buffered_values, uint8_t** data, int* data_size) {
-  encoding_ = encoding;
-  max_level_ = max_level;
-  num_buffered_values_ = num_buffered_values;
-  filename_ = filename;
-  RETURN_IF_ERROR(InitCache(cache_pool, cache_size));
-
-  // Return because there is no level data to read, e.g., required field.
-  if (max_level == 0) return Status::OK();
-
-  int32_t num_bytes = 0;
-  switch (encoding) {
-    case parquet::Encoding::RLE: {
-      Status status;
-      if (!ReadWriteUtil::Read(data, data_size, &num_bytes, &status)) {
-        return status;
-      }
-      if (num_bytes < 0) {
-        return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename, 
num_bytes);
-      }
-      int bit_width = Bits::Log2Ceiling64(max_level + 1);
-      Reset(*data, num_bytes, bit_width);
-      break;
-    }
-    case parquet::Encoding::BIT_PACKED:
-      num_bytes = BitUtil::Ceil(num_buffered_values, 8);
-      bit_reader_.Reset(*data, num_bytes);
-      break;
-    default: {
-      stringstream ss;
-      ss << "Unsupported encoding: " << encoding;
-      return Status(ss.str());
-    }
-  }
-  DCHECK_GT(num_bytes, 0);
-  *data += num_bytes;
-  *data_size -= num_bytes;
-  return Status::OK();
-}
-
-Status HdfsParquetScanner::LevelDecoder::InitCache(MemPool* pool, int 
cache_size) {
-  num_cached_levels_ = 0;
-  cached_level_idx_ = 0;
-  // Memory has already been allocated.
-  if (cached_levels_ != NULL) {
-    DCHECK_EQ(cache_size_, cache_size);
-    return Status::OK();
-  }
-
-  cached_levels_ = reinterpret_cast<uint8_t*>(pool->TryAllocate(cache_size));
-  if (cached_levels_ == NULL) {
-    return pool->mem_tracker()->MemLimitExceeded(
-        NULL, "Definition level cache", cache_size);
-  }
-  memset(cached_levels_, 0, cache_size);
-  cache_size_ = cache_size;
-  return Status::OK();
-}
-
-inline int16_t HdfsParquetScanner::LevelDecoder::ReadLevel() {
-  bool valid;
-  uint8_t level;
-  if (encoding_ == parquet::Encoding::RLE) {
-    valid = Get(&level);
-  } else {
-    DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED);
-    valid = bit_reader_.GetValue(1, &level);
-  }
-  return LIKELY(valid) ? level : INVALID_LEVEL;
-}
-
-Status HdfsParquetScanner::LevelDecoder::CacheNextBatch(int batch_size) {
-  DCHECK_LE(batch_size, cache_size_);
-  cached_level_idx_ = 0;
-  if (max_level_ > 0) {
-    if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_))) {
-      return Status(decoding_error_code_, num_buffered_values_, filename_);
-    }
-  } else {
-    // No levels to read, e.g., because the field is required. The cache was
-    // already initialized with all zeros, so we can hand out those values.
-    DCHECK_EQ(max_level_, 0);
-    num_cached_levels_ = batch_size;
-  }
-  return Status::OK();
-}
-
-bool HdfsParquetScanner::LevelDecoder::FillCache(int batch_size,
-    int* num_cached_levels) {
-  DCHECK(num_cached_levels != NULL);
-  int num_values = 0;
-  if (encoding_ == parquet::Encoding::RLE) {
-    while (true) {
-      // Add RLE encoded values by repeating the current value this number of 
times.
-      uint32_t num_repeats_to_set =
-          min<uint32_t>(repeat_count_, batch_size - num_values);
-      memset(cached_levels_ + num_values, current_value_, num_repeats_to_set);
-      num_values += num_repeats_to_set;
-      repeat_count_ -= num_repeats_to_set;
-
-      // Add remaining literal values, if any.
-      uint32_t num_literals_to_set =
-          min<uint32_t>(literal_count_, batch_size - num_values);
-      int num_values_end = min<uint32_t>(num_values + literal_count_, 
batch_size);
-      for (; num_values < num_values_end; ++num_values) {
-        bool valid = bit_reader_.GetValue(bit_width_, 
&cached_levels_[num_values]);
-        if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) 
return false;
-      }
-      literal_count_ -= num_literals_to_set;
-
-      if (num_values == batch_size) break;
-      if (UNLIKELY(!NextCounts<int16_t>())) return false;
-      if (repeat_count_ > 0 && current_value_ > max_level_) return false;
-    }
-  } else {
-    DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED);
-    for (; num_values < batch_size; ++num_values) {
-      bool valid = bit_reader_.GetValue(1, &cached_levels_[num_values]);
-      if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) return 
false;
-    }
-  }
-  *num_cached_levels = num_values;
-  return true;
-}
-
-template <bool ADVANCE_REP_LEVEL>
-bool HdfsParquetScanner::BaseScalarColumnReader::NextLevels() {
-  if (!ADVANCE_REP_LEVEL) DCHECK_EQ(max_rep_level(), 0) << 
slot_desc()->DebugString();
-
-  if (UNLIKELY(num_buffered_values_ == 0)) {
-    if (!NextPage()) return parent_->parse_status_.ok();
-  }
-  --num_buffered_values_;
-
-  // Definition level is not present if column and any containing structs are 
required.
-  def_level_ = max_def_level() == 0 ? 0 : def_levels_.ReadLevel();
-
-  if (ADVANCE_REP_LEVEL && max_rep_level() > 0) {
-    // Repetition level is only present if this column is nested in any 
collection type.
-    rep_level_ = rep_levels_.ReadLevel();
-    // Reset position counter if we are at the start of a new parent 
collection.
-    if (rep_level_ <= max_rep_level() - 1) pos_current_value_ = 0;
-  }
-
-  return parent_->parse_status_.ok();
-}
-
-bool HdfsParquetScanner::BaseScalarColumnReader::NextPage() {
-  parent_->assemble_rows_timer_.Stop();
-  parent_->parse_status_ = ReadDataPage();
-  if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-  if (num_buffered_values_ == 0) {
-    rep_level_ = ROW_GROUP_END;
-    def_level_ = INVALID_LEVEL;
-    pos_current_value_ = INVALID_POS;
-    return false;
-  }
-  parent_->assemble_rows_timer_.Start();
-  return true;
-}
-
-bool HdfsParquetScanner::CollectionColumnReader::NextLevels() {
-  DCHECK(!children_.empty());
-  DCHECK_LE(rep_level_, new_collection_rep_level());
-  for (int c = 0; c < children_.size(); ++c) {
-    do {
-      // TODO(skye): verify somewhere that all column readers are at end
-      if (!children_[c]->NextLevels()) return false;
-    } while (children_[c]->rep_level() > new_collection_rep_level());
-  }
-  UpdateDerivedState();
-  return true;
-}
-
-bool HdfsParquetScanner::CollectionColumnReader::ReadValue(MemPool* pool, 
Tuple* tuple) {
-  DCHECK_GE(rep_level_, 0);
-  DCHECK_GE(def_level_, 0);
-  DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
-      "Caller should have called NextLevels() until we are ready to read a 
value";
-
-  if (tuple_offset_ == -1) {
-    return CollectionColumnReader::NextLevels();
-  } else if (def_level_ >= max_def_level()) {
-    return ReadSlot(tuple->GetSlot(tuple_offset_), pool);
-  } else {
-    // Null value
-    tuple->SetNull(null_indicator_offset_);
-    return CollectionColumnReader::NextLevels();
-  }
-}
-
-bool HdfsParquetScanner::CollectionColumnReader::ReadNonRepeatedValue(
-    MemPool* pool, Tuple* tuple) {
-  return CollectionColumnReader::ReadValue(pool, tuple);
-}
-
-bool HdfsParquetScanner::CollectionColumnReader::ReadSlot(void* slot, MemPool* 
pool) {
-  DCHECK(!children_.empty());
-  DCHECK_LE(rep_level_, new_collection_rep_level());
-
-  // Recursively read the collection into a new CollectionValue.
-  CollectionValue* coll_slot = reinterpret_cast<CollectionValue*>(slot);
-  *coll_slot = CollectionValue();
-  CollectionValueBuilder builder(
-      coll_slot, *slot_desc_->collection_item_descriptor(), pool, 
parent_->state_);
-  bool continue_execution = parent_->AssembleCollection(
-      children_, new_collection_rep_level(), &builder);
-  if (!continue_execution) return false;
-
-  // AssembleCollection() advances child readers, so we don't need to call 
NextLevels()
-  UpdateDerivedState();
-  return true;
-}
-
-void HdfsParquetScanner::CollectionColumnReader::UpdateDerivedState() {
-  // We don't need to cap our def_level_ at max_def_level(). We always check 
def_level_
-  // >= max_def_level() to check if the collection is defined.
-  // TODO(skye): consider capping def_level_ at max_def_level()
-  def_level_ = children_[0]->def_level();
-  rep_level_ = children_[0]->rep_level();
-
-  // All children should have been advanced to the beginning of the next 
collection
-  for (int i = 0; i < children_.size(); ++i) {
-    DCHECK_EQ(children_[i]->rep_level(), rep_level_);
-    if (def_level_ < max_def_level()) {
-      // Collection not defined
-      FILE_CHECK_EQ(children_[i]->def_level(), def_level_);
-    } else {
-      // Collection is defined
-      FILE_CHECK_GE(children_[i]->def_level(), max_def_level());
-    }
-  }
-
-  if (RowGroupAtEnd()) {
-    // No more values
-    pos_current_value_ = INVALID_POS;
-  } else if (rep_level_ <= max_rep_level() - 2) {
-    // Reset position counter if we are at the start of a new parent 
collection (i.e.,
-    // the current collection is the first item in a new parent collection).
-    pos_current_value_ = 0;
-  }
-}
-
-Status HdfsParquetScanner::ValidateColumnOffsets(const parquet::RowGroup& 
row_group) {
-  const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename());
-  for (int i = 0; i < row_group.columns.size(); ++i) {
-    const parquet::ColumnChunk& col_chunk = row_group.columns[i];
-    int64_t col_start = col_chunk.meta_data.data_page_offset;
-    // The file format requires that if a dictionary page exists, it be before 
data pages.
-    if (col_chunk.meta_data.__isset.dictionary_page_offset) {
-      if (col_chunk.meta_data.dictionary_page_offset >= col_start) {
-        stringstream ss;
-        ss << "File " << file_desc->filename << ": metadata is corrupt. "
-            << "Dictionary page (offset=" << 
col_chunk.meta_data.dictionary_page_offset
-            << ") must come before any data pages (offset=" << col_start << 
").";
-        return Status(ss.str());
-      }
-      col_start = col_chunk.meta_data.dictionary_page_offset;
-    }
-    int64_t col_len = col_chunk.meta_data.total_compressed_size;
-    int64_t col_end = col_start + col_len;
-    if (col_end <= 0 || col_end > file_desc->file_length) {
-      stringstream ss;
-      ss << "File " << file_desc->filename << ": metadata is corrupt. "
-          << "Column " << i << " has invalid column offsets "
-          << "(offset=" << col_start << ", size=" << col_len << ", "
-          << "file_size=" << file_desc->file_length << ").";
-      return Status(ss.str());
-    }
-  }
-  return Status::OK();
-}
-
 // Get the start of the column.
 static int64_t GetColumnStartOffset(const parquet::ColumnMetaData& column) {
   if (column.__isset.dictionary_page_offset) {
@@ -1851,18 +243,18 @@ static int64_t GetRowGroupMidOffset(const 
parquet::RowGroup& row_group) {
   return start_offset + (end_offset - start_offset) / 2;
 }
 
-int HdfsParquetScanner::CountScalarColumns(const vector<ColumnReader*>& 
column_readers) {
+int HdfsParquetScanner::CountScalarColumns(const vector<ParquetColumnReader*>& 
column_readers) {
   DCHECK(!column_readers.empty());
   int num_columns = 0;
-  stack<ColumnReader*> readers;
-  for (ColumnReader* r: column_readers_) readers.push(r);
+  stack<ParquetColumnReader*> readers;
+  for (ParquetColumnReader* r: column_readers_) readers.push(r);
   while (!readers.empty()) {
-    ColumnReader* col_reader = readers.top();
+    ParquetColumnReader* col_reader = readers.top();
     readers.pop();
     if (col_reader->IsCollectionReader()) {
       CollectionColumnReader* collection_reader =
           static_cast<CollectionColumnReader*>(col_reader);
-      for (ColumnReader* r: *collection_reader->children()) readers.push(r);
+      for (ParquetColumnReader* r: *collection_reader->children()) 
readers.push(r);
       continue;
     }
     ++num_columns;
@@ -1875,11 +267,16 @@ Status HdfsParquetScanner::ProcessSplit() {
   // First process the file metadata in the footer
   bool eosr;
   RETURN_IF_ERROR(ProcessFooter(&eosr));
-
   if (eosr) return Status::OK();
 
+  // Parse the file schema into an internal representation for schema 
resolution.
+  ParquetSchemaResolver schema_resolver(*scan_node_->hdfs_table(),
+      state_->query_options().parquet_fallback_schema_resolution);
+  RETURN_IF_ERROR(schema_resolver.Init(&file_metadata_, filename()));
+
   // We've processed the metadata and there are columns that need to be 
materialized.
-  RETURN_IF_ERROR(CreateColumnReaders(*scan_node_->tuple_desc(), 
&column_readers_));
+  RETURN_IF_ERROR(
+      CreateColumnReaders(*scan_node_->tuple_desc(), schema_resolver, 
&column_readers_));
   COUNTER_SET(num_cols_counter_,
       static_cast<int64_t>(CountScalarColumns(column_readers_)));
   // Set top-level template tuple.
@@ -1895,9 +292,11 @@ Status HdfsParquetScanner::ProcessSplit() {
     const parquet::RowGroup& row_group = file_metadata_.row_groups[i];
     if (row_group.num_rows == 0) continue;
 
-    const DiskIoMgr::ScanRange* split_range =
-        
reinterpret_cast<ScanRangeMetadata*>(metadata_range_->meta_data())->original_split;
-    RETURN_IF_ERROR(ValidateColumnOffsets(row_group));
+    const DiskIoMgr::ScanRange* split_range = 
reinterpret_cast<ScanRangeMetadata*>(
+        metadata_range_->meta_data())->original_split;
+    HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename());
+    RETURN_IF_ERROR(ParquetMetadataUtils::ValidateColumnOffsets(
+        file_desc->filename, file_desc->file_length, row_group));
 
     int64_t row_group_mid_pos = GetRowGroupMidOffset(row_group);
     int64_t split_offset = split_range->offset();
@@ -1919,7 +318,7 @@ Status HdfsParquetScanner::ProcessSplit() {
 
     // Prepare column readers for first read
     bool continue_execution = true;
-    for (ColumnReader* col_reader: column_readers_) {
+    for (ParquetColumnReader* col_reader: column_readers_) {
       // Seed collection and boolean column readers with NextLevel().
       // The ScalarColumnReaders use an optimized ReadValueBatch() that
       // should not be seeded.
@@ -1950,7 +349,7 @@ Status HdfsParquetScanner::ProcessSplit() {
     // with parse_status_.
     RETURN_IF_ERROR(state_->GetQueryStatus());
     if (UNLIKELY(!parse_status_.ok())) {
-      RETURN_IF_ERROR(LogOrReturnError(parse_status_.msg()));
+      RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg()));
     }
     if (scan_node_->ReachedLimit()) return Status::OK();
     if (context_->cancelled()) return Status::OK();
@@ -2078,7 +477,7 @@ bool HdfsParquetScanner::EvalRuntimeFilters(TupleRow* row) 
{
 /// difficult to maintain a maximum memory footprint without throwing away at 
least
 /// some work. This point needs further experimentation and thought.
 bool HdfsParquetScanner::AssembleRows(
-    const vector<ColumnReader*>& column_readers, int row_group_idx, bool* 
filters_pass) {
+    const vector<ParquetColumnReader*>& column_readers, int row_group_idx, 
bool* filters_pass) {
   DCHECK(!column_readers.empty());
   DCHECK(scratch_batch_ != NULL);
 
@@ -2111,7 +510,7 @@ bool HdfsParquetScanner::AssembleRows(
     int last_num_tuples = -1;
     int num_col_readers = column_readers.size();
     for (int c = 0; c < num_col_readers; ++c) {
-      ColumnReader* col_reader = column_readers[c];
+      ParquetColumnReader* col_reader = column_readers[c];
       if (col_reader->max_rep_level() > 0) {
         continue_execution = col_reader->ReadValueBatch(
             scratch_batch_->mem_pool(), scratch_capacity, tuple_byte_size_,
@@ -2150,7 +549,7 @@ bool HdfsParquetScanner::AssembleRows(
 }
 
 bool HdfsParquetScanner::AssembleCollection(
-    const vector<ColumnReader*>& column_readers, int new_collection_rep_level,
+    const vector<ParquetColumnReader*>& column_readers, int 
new_collection_rep_level,
     CollectionValueBuilder* coll_value_builder) {
   DCHECK(!column_readers.empty());
   DCHECK_GE(new_collection_rep_level, 0);
@@ -2229,13 +628,13 @@ bool HdfsParquetScanner::AssembleCollection(
 }
 
 inline bool HdfsParquetScanner::ReadCollectionItem(
-    const vector<ColumnReader*>& column_readers,
+    const vector<ParquetColumnReader*>& column_readers,
     bool materialize_tuple, MemPool* pool, Tuple* tuple) const {
   DCHECK(!column_readers.empty());
   bool continue_execution = true;
   int size = column_readers.size();
   for (int c = 0; c < size; ++c) {
-    ColumnReader* col_reader = column_readers[c];
+    ParquetColumnReader* col_reader = column_readers[c];
     if (materialize_tuple) {
       // All column readers for this tuple should a value to materialize.
       FILE_CHECK_GE(col_reader->def_level(),
@@ -2364,9 +763,11 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) {
         status.GetDetail()));
   }
 
-  RETURN_IF_ERROR(ValidateFileMetadata());
-  // Parse file schema
-  RETURN_IF_ERROR(CreateSchemaTree(file_metadata_.schema, &schema_));
+  RETURN_IF_ERROR(ParquetMetadataUtils::ValidateFileVersion(file_metadata_, 
filename()));
+  // Parse out the created by application version string
+  if (file_metadata_.__isset.created_by) {
+    file_version_ = ParquetFileVersion(file_metadata_.created_by);
+  }
 
   if (scan_node_->IsZeroSlotTableScan()) {
     // There are no materialized slots, e.g. count(*) over the table.  We can 
serve
@@ -2398,301 +799,12 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) {
     return Status(
         Substitute("Invalid file. This file: $0 has no row groups", 
filename()));
   }
-  if (schema_.children.empty()) {
-    return Status(Substitute("Invalid file: '$0' has no columns.", 
filename()));
-  }
-  return Status::OK();
-}
-
-Status HdfsParquetScanner::ResolvePath(const SchemaPath& path, SchemaNode** 
node,
-    bool* pos_field, bool* missing_field) {
-  *missing_field = false;
-  // First try two-level array encoding.
-  bool missing_field_two_level;
-  Status status_two_level =
-      ResolvePathHelper(TWO_LEVEL, path, node, pos_field, 
&missing_field_two_level);
-  if (missing_field_two_level) DCHECK(status_two_level.ok());
-  if (status_two_level.ok() && !missing_field_two_level) return Status::OK();
-  // The two-level resolution failed or reported a missing field, try 
three-level array
-  // encoding.
-  bool missing_field_three_level;
-  Status status_three_level =
-      ResolvePathHelper(THREE_LEVEL, path, node, pos_field, 
&missing_field_three_level);
-  if (missing_field_three_level) DCHECK(status_three_level.ok());
-  if (status_three_level.ok() && !missing_field_three_level) return 
Status::OK();
-  // The three-level resolution failed or reported a missing field, try 
one-level array
-  // encoding.
-  bool missing_field_one_level;
-  Status status_one_level =
-      ResolvePathHelper(ONE_LEVEL, path, node, pos_field, 
&missing_field_one_level);
-  if (missing_field_one_level) DCHECK(status_one_level.ok());
-  if (status_one_level.ok() && !missing_field_one_level) return Status::OK();
-  // None of resolutions yielded a node. Set *missing_field to true if any of 
the
-  // resolutions reported a missing a field.
-  if (missing_field_one_level || missing_field_two_level || 
missing_field_three_level) {
-    *node = NULL;
-    *missing_field = true;
-    return Status::OK();
-  }
-  // All resolutions failed. Log and return the status from the three-level 
resolution
-  // (which is technically the standard).
-  DCHECK(!status_one_level.ok() && !status_two_level.ok() && 
!status_three_level.ok());
-  *node = NULL;
-  VLOG_QUERY << status_three_level.msg().msg() << "\n" << GetStackTrace();
-  return status_three_level;
-}
-
-Status HdfsParquetScanner::ResolvePathHelper(ArrayEncoding array_encoding,
-    const SchemaPath& path, SchemaNode** node, bool* pos_field, bool* 
missing_field) {
-  DCHECK(schema_.element != NULL)
-      << "schema_ must be initialized before calling ResolvePath()";
-
-  *pos_field = false;
-  *missing_field = false;
-  *node = &schema_;
-  const ColumnType* col_type = NULL;
-
-  // Traverse 'path' and resolve 'node' to the corresponding SchemaNode in 
'schema_' (by
-  // ordinal), or set 'node' to NULL if 'path' doesn't exist in this file's 
schema.
-  for (int i = 0; i < path.size(); ++i) {
-    // Advance '*node' if necessary
-    if (i == 0 || col_type->type != TYPE_ARRAY || array_encoding == 
THREE_LEVEL) {
-      *node = NextSchemaNode(col_type, path, i, *node, missing_field);
-      if (*missing_field) return Status::OK();
-    } else {
-      // We just resolved an array, meaning *node is set to the repeated field 
of the
-      // array. Since we are trying to resolve using one- or two-level array 
encoding, the
-      // repeated field represents both the array and the array's item (i.e. 
there is no
-      // explict item field), so we don't advance *node in this case.
-      DCHECK(col_type != NULL);
-      DCHECK_EQ(col_type->type, TYPE_ARRAY);
-      DCHECK(array_encoding == ONE_LEVEL || array_encoding == TWO_LEVEL);
-      DCHECK((*node)->is_repeated());
-    }
-
-    // Advance 'col_type'
-    int table_idx = path[i];
-    col_type = i == 0 ? 
&scan_node_->hdfs_table()->col_descs()[table_idx].type()
-               : &col_type->children[table_idx];
-
-    // Resolve path[i]
-    if (col_type->type == TYPE_ARRAY) {
-      DCHECK_EQ(col_type->children.size(), 1);
-      RETURN_IF_ERROR(
-          ResolveArray(array_encoding, path, i, node, pos_field, 
missing_field));
-      if (*missing_field || *pos_field) return Status::OK();
-    } else if (col_type->type == TYPE_MAP) {
-      DCHECK_EQ(col_type->children.size(), 2);
-      RETURN_IF_ERROR(ResolveMap(path, i, node, missing_field));
-      if (*missing_field) return Status::OK();
-    } else if (col_type->type == TYPE_STRUCT) {
-      DCHECK_GT(col_type->children.size(), 0);
-      // Nothing to do for structs
-    } else {
-      DCHECK(!col_type->IsComplexType());
-      DCHECK_EQ(i, path.size() - 1);
-      RETURN_IF_ERROR(ValidateScalarNode(**node, *col_type, path, i));
-    }
-  }
-  DCHECK(*node != NULL);
-  return Status::OK();
-}
-
-HdfsParquetScanner::SchemaNode* HdfsParquetScanner::NextSchemaNode(
-    const ColumnType* col_type, const SchemaPath& path, int next_idx, 
SchemaNode* node,
-    bool* missing_field) {
-  DCHECK_LT(next_idx, path.size());
-  if (next_idx != 0) DCHECK(col_type != NULL);
-
-  int file_idx;
-  int table_idx = path[next_idx];
-  bool resolve_by_name = 
state_->query_options().parquet_fallback_schema_resolution ==
-      TParquetFallbackSchemaResolution::NAME;
-  if (resolve_by_name) {
-    if (next_idx == 0) {
-      // Resolve top-level table column by name.
-      DCHECK_LT(table_idx, scan_node_->hdfs_table()->col_descs().size());
-      const string& name = 
scan_node_->hdfs_table()->col_descs()[table_idx].name();
-      file_idx = FindChildWithName(node, name);
-    } else if (col_type->type == TYPE_STRUCT) {
-      // Resolve struct field by name.
-      DCHECK_LT(table_idx, col_type->field_names.size());
-      const string& name = col_type->field_names[table_idx];
-      file_idx = FindChildWithName(node, name);
-    } else if (col_type->type == TYPE_ARRAY) {
-      // Arrays have only one child in the file.
-      DCHECK_EQ(table_idx, SchemaPathConstants::ARRAY_ITEM);
-      file_idx = table_idx;
-    } else {
-      DCHECK_EQ(col_type->type, TYPE_MAP);
-      // Maps have two values, "key" and "value". These are supposed to be 
ordered and may
-      // not have the right field names, but try to resolve by name in case 
they're
-      // switched and otherwise use the order. See
-      // 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for
-      // more details.
-      DCHECK(table_idx == SchemaPathConstants::MAP_KEY ||
-             table_idx == SchemaPathConstants::MAP_VALUE);
-      const string& name = table_idx == SchemaPathConstants::MAP_KEY ? "key" : 
"value";
-      file_idx = FindChildWithName(node, name);
-      if (file_idx >= node->children.size()) {
-        // Couldn't resolve by name, fall back to resolution by position.
-        file_idx = table_idx;
-      }
-    }
-  } else {
-    // Resolution by position.
-    DCHECK_EQ(state_->query_options().parquet_fallback_schema_resolution,
-        TParquetFallbackSchemaResolution::POSITION);
-    if (next_idx == 0) {
-      // For top-level columns, the first index in a path includes the table's 
partition
-      // keys.
-      file_idx = table_idx - scan_node_->num_partition_keys();
-    } else {
-      file_idx = table_idx;
-    }
-  }
-
-  if (file_idx >= node->children.size()) {
-    VLOG_FILE << Substitute(
-        "File '$0' does not contain path '$1' (resolving by $2)", filename(),
-        PrintPath(path), resolve_by_name ? "name" : "position");
-    *missing_field = true;
-    return NULL;
-  }
-  return &node->children[file_idx];
-}
-
-int HdfsParquetScanner::FindChildWithName(HdfsParquetScanner::SchemaNode* node,
-    const string& name) {
-  int idx;
-  for (idx = 0; idx < node->children.size(); ++idx) {
-    if (node->children[idx].element->name == name) break;
-  }
-  return idx;
-}
-
-// There are three types of array encodings:
-//
-// 1. One-level encoding
-//      A bare repeated field. This is interpreted as a required array of 
required
-//      items.
-//    Example:
-//      repeated <item-type> item;
-//
-// 2. Two-level encoding
-//      A group containing a single repeated field. This is interpreted as a
-//      <list-repetition> array of required items (<list-repetition> is either
-//      optional or required).
-//    Example:
-//      <list-repetition> group <name> {
-//        repeated <item-type> item;
-//      }
-//
-// 3. Three-level encoding
-//      The "official" encoding according to the parquet spec. A group 
containing a
-//      single repeated group containing the item field. This is interpreted 
as a
-//      <list-repetition> array of <item-repetition> items (<list-repetition> 
and
-//      <item-repetition> are each either optional or required).
-//    Example:
-//      <list-repetition> group <name> {
-//        repeated group list {
-//          <item-repetition> <item-type> item;
-//        }
-//      }
-//
-// We ignore any field annotations or names, making us more permissive than the
-// Parquet spec dictates. Note that in any of the encodings, <item-type> may 
be a
-// group containing more fields, which corresponds to a complex item type. See
-// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists 
for
-// more details and examples.
-//
-// This function resolves the array at '*node' assuming one-, two-, or 
three-level
-// encoding, determined by 'array_encoding'. '*node' is set to the repeated 
field for all
-// three encodings (unless '*pos_field' or '*missing_field' are set to true).
-Status HdfsParquetScanner::ResolveArray(ArrayEncoding array_encoding,
-    const SchemaPath& path, int idx, SchemaNode** node, bool* pos_field,
-    bool* missing_field) {
-  if (array_encoding == ONE_LEVEL) {
-    if (!(*node)->is_repeated()) {
-      ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename(),
-          PrintPath(path, idx), "array", (*node)->DebugString());
-      return Status::Expected(msg);
-    }
-  } else {
-    // In the multi-level case, we always expect the outer group to contain a 
single
-    // repeated field
-    if ((*node)->children.size() != 1 || !(*node)->children[0].is_repeated()) {
-      ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename(),
-          PrintPath(path, idx), "array", (*node)->DebugString());
-      return Status::Expected(msg);
-    }
-    // Set *node to the repeated field
-    *node = &(*node)->children[0];
-  }
-  DCHECK((*node)->is_repeated());
-
-  if (idx + 1 < path.size()) {
-    if (path[idx + 1] == SchemaPathConstants::ARRAY_POS) {
-      // The next index in 'path' is the artifical position field.
-      DCHECK_EQ(path.size(), idx + 2) << "position field cannot have 
children!";
-      *pos_field = true;
-      *node = NULL;
-      return Status::OK();
-    } else {
-      // The n

<TRUNCATED>

Reply via email to