http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h 
b/be/src/exec/hdfs-parquet-scanner.h
index 5a12602..3791ae1 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -18,13 +18,27 @@
 
 #include "exec/hdfs-scanner.h"
 #include "exec/parquet-common.h"
+#include "exec/parquet-scratch-tuple-batch.h"
+#include "exec/parquet-metadata-utils.h"
 #include "util/runtime-profile-counters.h"
 
 namespace impala {
 
 class CollectionValueBuilder;
 struct HdfsFileDesc;
-struct ScratchTupleBatch;
+
+/// Internal schema representation and resolution.
+class SchemaNode;
+
+/// Class that implements Parquet definition and repetition level decoding.
+class ParquetLevelDecoder;
+
+/// Per column reader.
+class ParquetColumnReader;
+class CollectionColumnReader;
+class BaseScalarColumnReader;
+template<typename T, bool MATERIALIZED> class ScalarColumnReader;
+class BoolColumnReader;
 
 /// This scanner parses Parquet files located in HDFS, and writes the content 
as tuples in
 /// the Impala in-memory representation of data, e.g.  (tuples, rows, row 
batches).
@@ -305,7 +319,7 @@ class HdfsParquetScanner : public HdfsScanner {
  public:
   HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* state);
 
-  virtual ~HdfsParquetScanner();
+  virtual ~HdfsParquetScanner() {};
   virtual Status Prepare(ScannerContext* context);
   virtual void Close();
   virtual Status ProcessSplit();
@@ -315,100 +329,24 @@ class HdfsParquetScanner : public HdfsScanner {
   static Status IssueInitialRanges(HdfsScanNode* scan_node,
                                    const std::vector<HdfsFileDesc*>& files);
 
-  struct FileVersion {
-    /// Application that wrote the file. e.g. "IMPALA"
-    std::string application;
-
-    /// Version of the application that wrote the file, expressed in three 
parts
-    /// (<major>.<minor>.<patch>). Unspecified parts default to 0, and extra 
parts are
-    /// ignored. e.g.:
-    /// "1.2.3"    => {1, 2, 3}
-    /// "1.2"      => {1, 2, 0}
-    /// "1.2-cdh5" => {1, 2, 0}
-    struct {
-      int major;
-      int minor;
-      int patch;
-    } version;
-
-    /// If true, this file was generated by an Impala internal release
-    bool is_impala_internal;
-
-    FileVersion() : is_impala_internal(false) { }
-
-    /// Parses the version from the created_by string
-    FileVersion(const std::string& created_by);
-
-    /// Returns true if version is strictly less than <major>.<minor>.<patch>
-    bool VersionLt(int major, int minor = 0, int patch = 0) const;
-
-    /// Returns true if version is equal to <major>.<minor>.<patch>
-    bool VersionEq(int major, int minor, int patch) const;
-  };
-
- private:
-  /// Internal representation of a column schema (including nested-type 
columns).
-  struct SchemaNode {
-    /// The corresponding schema element defined in the file metadata
-    const parquet::SchemaElement* element;
-
-    /// The index into the RowGroup::columns list if this column is 
materialized in the
-    /// file (i.e. it's a scalar type). -1 for nested types.
-    int col_idx;
-
-    /// The maximum definition level of this column, i.e., the definition 
level that
-    /// corresponds to a non-NULL value. Valid values are >= 0.
-    int max_def_level;
-
-    /// The maximum repetition level of this column. Valid values are >= 0.
-    int max_rep_level;
-
-    /// The definition level of the most immediate ancestor of this node with 
repeated
-    /// field repetition type. 0 if there are no repeated ancestors.
-    int def_level_of_immediate_repeated_ancestor;
-
-    /// Any nested schema nodes. Empty for non-nested types.
-    std::vector<SchemaNode> children;
-
-    SchemaNode() : element(NULL), col_idx(-1), max_def_level(-1), 
max_rep_level(-1),
-                   def_level_of_immediate_repeated_ancestor(-1) { }
-
-    std::string DebugString(int indent = 0) const;
-
-    bool is_repeated() const {
-      return element->repetition_type == 
parquet::FieldRepetitionType::REPEATED;
-    }
-  };
-
-  /// Size of the file footer.  This is a guess.  If this value is too little, 
we will
-  /// need to issue another read.
-  static const int64_t FOOTER_SIZE;
-
   /// The repetition level is set to this value to indicate the end of a row 
group.
-  static const int16_t ROW_GROUP_END;
+  static const int16_t ROW_GROUP_END = numeric_limits<int16_t>::min();
   /// Indicates an invalid definition or repetition level.
-  static const int16_t INVALID_LEVEL;
+  static const int16_t INVALID_LEVEL = -1;
   /// Indicates an invalid position value.
-  static const int16_t INVALID_POS;
-
-  /// Class that implements Parquet definition and repetition level decoding.
-  class LevelDecoder;
+  static const int16_t INVALID_POS = -1;
 
-  /// Per column reader.
-  class ColumnReader;
-  friend class ColumnReader;
-
-  class CollectionColumnReader;
+ private:
+  friend class ParquetColumnReader;
   friend class CollectionColumnReader;
-
-  class BaseScalarColumnReader;
   friend class BaseScalarColumnReader;
-
-  template<typename T, bool MATERIALIZED> class ScalarColumnReader;
   template<typename T, bool MATERIALIZED> friend class ScalarColumnReader;
-  class BoolColumnReader;
   friend class BoolColumnReader;
 
+  /// Size of the file footer.  This is a guess.  If this value is too little, 
we will
+  /// need to issue another read.
+  static const int64_t FOOTER_SIZE = 1024 * 100;
+
   /// Cached runtime filter contexts, one for each filter that applies to this 
column.
   vector<const FilterContext*> filter_ctxs_;
 
@@ -443,7 +381,7 @@ class HdfsParquetScanner : public HdfsScanner {
   vector<LocalFilterStats> filter_stats_;
 
   /// Column reader for each materialized columns for this file.
-  std::vector<ColumnReader*> column_readers_;
+  std::vector<ParquetColumnReader*> column_readers_;
 
   /// Column readers will write slot values into this scratch batch for
   /// top-level tuples. See AssembleRows().
@@ -453,10 +391,7 @@ class HdfsParquetScanner : public HdfsScanner {
   parquet::FileMetaData file_metadata_;
 
   /// Version of the application that wrote this file.
-  FileVersion file_version_;
-
-  /// The root schema node for this file.
-  SchemaNode schema_;
+  ParquetFileVersion file_version_;
 
   /// Scan range for the metadata.
   const DiskIoMgr::ScanRange* metadata_range_;
@@ -493,7 +428,7 @@ class HdfsParquetScanner : public HdfsScanner {
   /// If 'filters_pass' is set to false by this method, the partition columns 
associated
   /// with this row group did not pass all the runtime filters (and therefore 
only filter
   /// contexts that apply only to partition columns are checked).
-  bool AssembleRows(const std::vector<ColumnReader*>& column_readers,
+  bool AssembleRows(const std::vector<ParquetColumnReader*>& column_readers,
       int row_group_idx, bool* filters_pass);
 
   /// Evaluates runtime filters and conjuncts (if any) against the tuples in
@@ -522,7 +457,7 @@ class HdfsParquetScanner : public HdfsScanner {
   /// - scan node limit was reached
   /// When false is returned the column_readers are left in an undefined state 
and
   /// execution should be aborted immediately by the caller.
-  bool AssembleCollection(const std::vector<ColumnReader*>& column_readers,
+  bool AssembleCollection(const std::vector<ParquetColumnReader*>& 
column_readers,
       int new_collection_rep_level, CollectionValueBuilder* 
coll_value_builder);
 
   /// Function used by AssembleCollection() to materialize a single collection 
item
@@ -530,17 +465,13 @@ class HdfsParquetScanner : public HdfsScanner {
   /// otherwise returns true.
   /// If 'materialize_tuple' is false, only advances the column readers' 
levels,
   /// and does not read any data values.
-  inline bool ReadCollectionItem(const std::vector<ColumnReader*>& 
column_readers,
+  inline bool ReadCollectionItem(const std::vector<ParquetColumnReader*>& 
column_readers,
       bool materialize_tuple, MemPool* pool, Tuple* tuple) const;
 
   /// Find and return the last split in the file if it is assigned to this 
scan node.
   /// Returns NULL otherwise.
   static DiskIoMgr::ScanRange* FindFooterSplit(HdfsFileDesc* file);
 
-  /// Validate column offsets by checking if the dictionary page comes before 
the data
-  /// pages and checking if the column offsets lie within the file.
-  Status ValidateColumnOffsets(const parquet::RowGroup& row_group);
-
   /// Process the file footer and parse file_metadata_.  This should be called 
with the
   /// last FOOTER_SIZE bytes in context_.
   /// *eosr is a return value.  If true, the scan range is complete (e.g. 
select count(*))
@@ -551,23 +482,12 @@ class HdfsParquetScanner : public HdfsScanner {
   /// well. Fills in the appropriate template tuple slot with NULL for any 
materialized
   /// fields missing in the file.
   Status CreateColumnReaders(const TupleDescriptor& tuple_desc,
-      std::vector<ColumnReader*>* column_readers);
+      const ParquetSchemaResolver& schema_resolver,
+      std::vector<ParquetColumnReader*>* column_readers);
 
   /// Returns the total number of scalar column readers in 'column_readers', 
including
   /// the children of collection readers.
-  int CountScalarColumns(const std::vector<ColumnReader*>& column_readers);
-
-  /// Creates a column reader for 'node'. slot_desc may be NULL, in which case 
the
-  /// returned column reader can only be used to read def/rep levels.
-  /// 'is_collection_field' should be set to true if the returned reader is 
reading a
-  /// collection. This cannot be determined purely by 'node' because a 
repeated scalar
-  /// node represents both an array and the array's items (in this case
-  /// 'is_collection_field' should be true if the reader reads one value per 
array, and
-  /// false if it reads one value per item).  The reader is added to the 
runtime state's
-  /// object pool. Does not create child readers for collection readers; these 
must be
-  /// added by the caller.
-  ColumnReader* CreateReader(const SchemaNode& node, bool is_collection_field,
-      const SlotDescriptor* slot_desc);
+  int CountScalarColumns(const std::vector<ParquetColumnReader*>& 
column_readers);
 
   /// Creates a column reader that reads one value for each item in the table 
or
   /// collection element corresponding to 'parent_path'. 'parent_path' should 
point to
@@ -578,89 +498,24 @@ class HdfsParquetScanner : public HdfsScanner {
   /// This is used for counting item values, rather than materializing any 
values. For
   /// example, in a count(*) over a collection, there are no values to 
materialize, but we
   /// still need to iterate over every item in the collection to count them.
-  Status CreateCountingReader(
-      const SchemaPath& parent_path, ColumnReader** reader);
+  Status CreateCountingReader(const SchemaPath& parent_path,
+      const ParquetSchemaResolver& schema_resolver,
+      ParquetColumnReader** reader);
 
   /// Walks file_metadata_ and initiates reading the materialized columns.  
This
   /// initializes 'column_readers' and issues the reads for the columns. 
'column_readers'
   /// should be the readers used to materialize a single tuple (i.e., 
column_readers_ or
   /// the children of a collection node).
   Status InitColumns(
-      int row_group_idx, const std::vector<ColumnReader*>& column_readers);
-
-  /// Validates the file metadata
-  Status ValidateFileMetadata();
-
-  /// Validates the column metadata to make sure this column is supported 
(e.g. encoding,
-  /// type, etc) and matches the type of col_reader's slot desc.
-  Status ValidateColumn(const BaseScalarColumnReader& col_reader, int 
row_group_idx);
+      int row_group_idx, const std::vector<ParquetColumnReader*>& 
column_readers);
 
   /// Performs some validation once we've reached the end of a row group to 
help detect
   /// bugs or bad input files.
-  Status ValidateEndOfRowGroup(const std::vector<ColumnReader*>& 
column_readers,
+  Status ValidateEndOfRowGroup(const std::vector<ParquetColumnReader*>& 
column_readers,
       int row_group_idx, int64_t rows_read);
 
   /// Part of the HdfsScanner interface, not used in Parquet.
   Status InitNewRange() { return Status::OK(); };
-
-  /// Unflattens the schema metadata from a Parquet file metadata and converts 
it to our
-  /// SchemaNode representation. Returns the result in 'n' unless an error 
status is
-  /// returned. Does not set the slot_desc field of any SchemaNode.
-  Status CreateSchemaTree(const std::vector<parquet::SchemaElement>& schema,
-      SchemaNode* node) const;
-
-  /// Recursive implementation used internally by the above CreateSchemaTree() 
function.
-  Status CreateSchemaTree(const std::vector<parquet::SchemaElement>& schema,
-      int max_def_level, int max_rep_level, int ira_def_level, int* idx, int* 
col_idx,
-      SchemaNode* node) const;
-
-  /// Traverses 'schema_' according to 'path', returning the result in 'node'. 
If 'path'
-  /// does not exist in this file's schema, 'missing_field' is set to true and
-  /// Status::OK() is returned, otherwise 'missing_field' is set to false. If 
'path'
-  /// resolves to a collecton position field, *pos_field is set to true. 
Otherwise
-  /// 'pos_field' is set to false. Returns a non-OK status if 'path' cannot be 
resolved
-  /// against the file's schema (e.g., unrecognized collection schema).
-  ///
-  /// Tries to resolve assuming either two- or three-level array encoding in
-  /// 'schema_'. Returns a bad status if resolution fails in both cases.
-  Status ResolvePath(const SchemaPath& path, SchemaNode** node, bool* 
pos_field,
-      bool* missing_field);
-
-  /// The 'array_encoding' parameter determines whether to assume one-, two-, 
or
-  /// three-level array encoding. The returned status is not logged (i.e. it's 
an expected
-  /// error).
-  enum ArrayEncoding {
-    ONE_LEVEL,
-    TWO_LEVEL,
-    THREE_LEVEL
-  };
-  Status ResolvePathHelper(ArrayEncoding array_encoding, const SchemaPath& 
path,
-      SchemaNode** node, bool* pos_field, bool* missing_field);
-
-  /// Helper functions for ResolvePathHelper().
-
-  /// Advances 'node' to one of its children based on path[next_idx] and
-  /// 'col_type'. 'col_type' is NULL if 'node' is the root node, otherwise 
it's the type
-  /// associated with 'node'. Returns the child node or sets 'missing_field' 
to true.
-  SchemaNode* NextSchemaNode(const ColumnType* col_type, const SchemaPath& 
path,
-      int next_idx, SchemaNode* node, bool* missing_field);
-
-  /// Returns the index of 'node's child with 'name', or the number of 
children if not
-  /// found.
-  int FindChildWithName(SchemaNode* node, const string& name);
-
-  /// The ResolvePathHelper() logic for arrays.
-  Status ResolveArray(ArrayEncoding array_encoding, const SchemaPath& path, 
int idx,
-    SchemaNode** node, bool* pos_field, bool* missing_field);
-
-  /// The ResolvePathHelper() logic for maps.
-  Status ResolveMap(const SchemaPath& path, int idx, SchemaNode** node,
-      bool* missing_field);
-
-  /// The ResolvePathHelper() logic for scalars (just does validation since 
there's no
-  /// more actual work to be done).
-  Status ValidateScalarNode(const SchemaNode& node, const ColumnType& col_type,
-      const SchemaPath& path, int idx);
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/hdfs-rcfile-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc 
b/be/src/exec/hdfs-rcfile-scanner.cc
index d85ad49..3914845 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -525,7 +525,7 @@ Status HdfsRCFileScanner::ProcessRange() {
         if (error_in_row) {
           error_in_row = false;
           ErrorMsg msg(TErrorCode::GENERAL, Substitute("file: $0", 
stream_->filename()));
-          RETURN_IF_ERROR(LogOrReturnError(msg));
+          RETURN_IF_ERROR(state_->LogOrReturnError(msg));
         }
 
         current_row->SetTuple(scan_node_->tuple_idx(), tuple);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 5abd346..4f63a73 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -671,23 +671,3 @@ void HdfsScanner::ReportColumnParseError(const 
SlotDescriptor* desc,
   }
 }
 
-Status HdfsScanner::LogOrReturnError(const ErrorMsg& message) const {
-  DCHECK_NE(message.error(), TErrorCode::OK);
-  // If either abort_on_error=true or the error necessitates execution stops
-  // immediately, return an error status.
-  if (state_->abort_on_error() ||
-      message.error() == TErrorCode::MEM_LIMIT_EXCEEDED ||
-      message.error() == TErrorCode::CANCELLED) {
-    return Status(message);
-  }
-  // Otherwise, add the error to the error log and continue.
-  state_->LogError(message);
-  return Status::OK();
-}
-
-string HdfsScanner::PrintPath(const SchemaPath& path, int subpath_idx) const {
-  SchemaPath::const_iterator subpath_end =
-      subpath_idx == -1 ? path.end() : path.begin() + subpath_idx + 1;
-  SchemaPath subpath(path.begin(), subpath_end);
-  return impala::PrintPath(*scan_node_->hdfs_table(), subpath);
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 9069451..7a723b8 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -408,17 +408,6 @@ class HdfsScanner {
     return reinterpret_cast<TupleRow*>(mem + batch_->row_byte_size());
   }
 
-  /// Given an error message, determine whether execution should be aborted 
and, if so,
-  /// return the corresponding error status. Otherwise, log the error and 
return
-  /// Status::OK(). Execution is aborted if the ABORT_ON_ERROR query option is 
set to
-  /// true or the error is not recoverable and should be handled upstream.
-  Status LogOrReturnError(const ErrorMsg& message) const;
-
-  // Convenience function for calling the PrintPath() function in
-  // debug-util. 'subpath_idx' can be specified in order to truncate the 
output to end on
-  // the i-th element of 'path' (inclusive).
-  string PrintPath(const SchemaPath& path, int subpath_idx = -1) const;
-
   /// Simple wrapper around scanner_conjunct_ctxs_. Used in the codegen'd 
version of
   /// WriteCompleteTuple() because it's easier than writing IR to access
   /// scanner_conjunct_ctxs_.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index dcd3081..6d45880 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -286,7 +286,8 @@ Status HdfsTextScanner::FinishScanRange() {
         stringstream ss;
         ss << "Read failed while trying to finish scan range: " << 
stream_->filename()
            << ":" << stream_->file_offset() << endl << status.GetDetail();
-        RETURN_IF_ERROR(LogOrReturnError(ErrorMsg(TErrorCode::GENERAL, 
ss.str())));
+        RETURN_IF_ERROR(state_->LogOrReturnError(
+            ErrorMsg(TErrorCode::GENERAL, ss.str())));
       } else if (!partial_tuple_empty_ || !boundary_column_.IsEmpty() ||
           !boundary_row_.IsEmpty() ||
           (delimited_text_parser_->HasUnfinishedTuple() &&

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc 
b/be/src/exec/parquet-column-readers.cc
new file mode 100644
index 0000000..280c206
--- /dev/null
+++ b/be/src/exec/parquet-column-readers.cc
@@ -0,0 +1,1093 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "parquet-column-readers.h"
+
+#include <boost/scoped_ptr.hpp>
+#include <string>
+#include <sstream>
+#include <gflags/gflags.h>
+#include <gutil/strings/substitute.h>
+
+#include "exec/hdfs-parquet-scanner.h"
+#include "exec/parquet-metadata-utils.h"
+#include "exec/parquet-scratch-tuple-batch.h"
+#include "exec/read-write-util.h"
+#include "gutil/bits.h"
+#include "rpc/thrift-util.h"
+#include "runtime/collection-value-builder.h"
+#include "runtime/tuple-row.h"
+#include "runtime/tuple.h"
+#include "runtime/runtime-state.h"
+#include "runtime/mem-pool.h"
+#include "util/codec.h"
+#include "util/debug-util.h"
+#include "util/dict-encoding.h"
+#include "util/rle-encoding.h"
+
+#include "common/names.h"
+
+using strings::Substitute;
+
+// Provide a workaround for IMPALA-1658.
+DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
+    "When true, TIMESTAMPs read from files written by Parquet-MR (used by 
Hive) will "
+    "be converted from UTC to local time. Writes are unaffected.");
+
+// Max data page header size in bytes. This is an estimate and only needs to 
be an upper
+// bound. It is theoretically possible to have a page header of any size due 
to string
+// value statistics, but in practice we'll have trouble reading string values 
this large.
+// Also, this limit is in place to prevent impala from reading corrupt parquet 
files.
+DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size 
in bytes");
+
+namespace impala {
+
+const string PARQUET_MEM_LIMIT_EXCEEDED = "HdfsParquetScanner::$0() failed to 
allocate "
+    "$1 bytes for $2.";
+
+Status ParquetLevelDecoder::Init(const string& filename,
+    parquet::Encoding::type encoding, MemPool* cache_pool, int cache_size,
+    int max_level, int num_buffered_values, uint8_t** data, int* data_size) {
+  encoding_ = encoding;
+  max_level_ = max_level;
+  num_buffered_values_ = num_buffered_values;
+  filename_ = filename;
+  RETURN_IF_ERROR(InitCache(cache_pool, cache_size));
+
+  // Return because there is no level data to read, e.g., required field.
+  if (max_level == 0) return Status::OK();
+
+  int32_t num_bytes = 0;
+  switch (encoding) {
+    case parquet::Encoding::RLE: {
+      Status status;
+      if (!ReadWriteUtil::Read(data, data_size, &num_bytes, &status)) {
+        return status;
+      }
+      if (num_bytes < 0) {
+        return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename, 
num_bytes);
+      }
+      int bit_width = Bits::Log2Ceiling64(max_level + 1);
+      Reset(*data, num_bytes, bit_width);
+      break;
+    }
+    case parquet::Encoding::BIT_PACKED:
+      num_bytes = BitUtil::Ceil(num_buffered_values, 8);
+      bit_reader_.Reset(*data, num_bytes);
+      break;
+    default: {
+      stringstream ss;
+      ss << "Unsupported encoding: " << encoding;
+      return Status(ss.str());
+    }
+  }
+  DCHECK_GT(num_bytes, 0);
+  *data += num_bytes;
+  *data_size -= num_bytes;
+  return Status::OK();
+}
+
+Status ParquetLevelDecoder::InitCache(MemPool* pool, int cache_size) {
+  num_cached_levels_ = 0;
+  cached_level_idx_ = 0;
+  // Memory has already been allocated.
+  if (cached_levels_ != NULL) {
+    DCHECK_EQ(cache_size_, cache_size);
+    return Status::OK();
+  }
+
+  cached_levels_ = reinterpret_cast<uint8_t*>(pool->TryAllocate(cache_size));
+  if (cached_levels_ == NULL) {
+    return pool->mem_tracker()->MemLimitExceeded(
+        NULL, "Definition level cache", cache_size);
+  }
+  memset(cached_levels_, 0, cache_size);
+  cache_size_ = cache_size;
+  return Status::OK();
+}
+
+inline int16_t ParquetLevelDecoder::ReadLevel() {
+  bool valid;
+  uint8_t level;
+  if (encoding_ == parquet::Encoding::RLE) {
+    valid = Get(&level);
+  } else {
+    DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED);
+    valid = bit_reader_.GetValue(1, &level);
+  }
+  return LIKELY(valid) ? level : HdfsParquetScanner::INVALID_LEVEL;
+}
+
+Status ParquetLevelDecoder::CacheNextBatch(int batch_size) {
+  DCHECK_LE(batch_size, cache_size_);
+  cached_level_idx_ = 0;
+  if (max_level_ > 0) {
+    if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_))) {
+      return Status(decoding_error_code_, num_buffered_values_, filename_);
+    }
+  } else {
+    // No levels to read, e.g., because the field is required. The cache was
+    // already initialized with all zeros, so we can hand out those values.
+    DCHECK_EQ(max_level_, 0);
+    num_cached_levels_ = batch_size;
+  }
+  return Status::OK();
+}
+
+bool ParquetLevelDecoder::FillCache(int batch_size,
+    int* num_cached_levels) {
+  DCHECK(num_cached_levels != NULL);
+  int num_values = 0;
+  if (encoding_ == parquet::Encoding::RLE) {
+    while (true) {
+      // Add RLE encoded values by repeating the current value this number of 
times.
+      uint32_t num_repeats_to_set =
+          min<uint32_t>(repeat_count_, batch_size - num_values);
+      memset(cached_levels_ + num_values, current_value_, num_repeats_to_set);
+      num_values += num_repeats_to_set;
+      repeat_count_ -= num_repeats_to_set;
+
+      // Add remaining literal values, if any.
+      uint32_t num_literals_to_set =
+          min<uint32_t>(literal_count_, batch_size - num_values);
+      int num_values_end = min<uint32_t>(num_values + literal_count_, 
batch_size);
+      for (; num_values < num_values_end; ++num_values) {
+        bool valid = bit_reader_.GetValue(bit_width_, 
&cached_levels_[num_values]);
+        if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) 
return false;
+      }
+      literal_count_ -= num_literals_to_set;
+
+      if (num_values == batch_size) break;
+      if (UNLIKELY(!NextCounts<int16_t>())) return false;
+      if (repeat_count_ > 0 && current_value_ > max_level_) return false;
+    }
+  } else {
+    DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED);
+    for (; num_values < batch_size; ++num_values) {
+      bool valid = bit_reader_.GetValue(1, &cached_levels_[num_values]);
+      if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) return 
false;
+    }
+  }
+  *num_cached_levels = num_values;
+  return true;
+}
+
+/// Per column type reader. If MATERIALIZED is true, the column values are 
materialized
+/// into the slot described by slot_desc. If MATERIALIZED is false, the column 
values
+/// are not materialized, but the position can be accessed.
+template<typename T, bool MATERIALIZED>
+class ScalarColumnReader : public BaseScalarColumnReader {
+ public:
+  ScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
+      const SlotDescriptor* slot_desc)
+    : BaseScalarColumnReader(parent, node, slot_desc),
+      dict_decoder_init_(false) {
+    if (!MATERIALIZED) {
+      // We're not materializing any values, just counting them. No need (or 
ability) to
+      // initialize state used to materialize values.
+      DCHECK(slot_desc_ == NULL);
+      return;
+    }
+
+    DCHECK(slot_desc_ != NULL);
+    DCHECK_NE(slot_desc_->type().type, TYPE_BOOLEAN);
+    if (slot_desc_->type().type == TYPE_DECIMAL) {
+      fixed_len_size_ = ParquetPlainEncoder::DecimalSize(slot_desc_->type());
+    } else if (slot_desc_->type().type == TYPE_VARCHAR) {
+      fixed_len_size_ = slot_desc_->type().len;
+    } else {
+      fixed_len_size_ = -1;
+    }
+    needs_conversion_ = slot_desc_->type().type == TYPE_CHAR ||
+        // TODO: Add logic to detect file versions that have unconverted 
TIMESTAMP
+        // values. Currently all versions have converted values.
+        (FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
+        slot_desc_->type().type == TYPE_TIMESTAMP &&
+        parent->file_version_.application == "parquet-mr");
+  }
+
+  virtual ~ScalarColumnReader() { }
+
+  virtual bool ReadValue(MemPool* pool, Tuple* tuple) {
+    return ReadValue<true>(pool, tuple);
+  }
+
+  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) {
+    return ReadValue<false>(pool, tuple);
+  }
+
+  virtual bool NeedsSeedingForBatchedReading() const { return false; }
+
+  virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
+      uint8_t* tuple_mem, int* num_values) {
+    return ReadValueBatch<true>(pool, max_values, tuple_size, tuple_mem, 
num_values);
+  }
+
+  virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int 
tuple_size,
+      uint8_t* tuple_mem, int* num_values) {
+    return ReadValueBatch<false>(pool, max_values, tuple_size, tuple_mem, 
num_values);
+  }
+
+ protected:
+  template <bool IN_COLLECTION>
+  inline bool ReadValue(MemPool* pool, Tuple* tuple) {
+    // NextLevels() should have already been called and def and rep levels 
should be in
+    // valid range.
+    DCHECK_GE(rep_level_, 0);
+    DCHECK_LE(rep_level_, max_rep_level());
+    DCHECK_GE(def_level_, 0);
+    DCHECK_LE(def_level_, max_def_level());
+    DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
+        "Caller should have called NextLevels() until we are ready to read a 
value";
+
+    if (MATERIALIZED) {
+      if (def_level_ >= max_def_level()) {
+        if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
+          if (!ReadSlot<true>(tuple->GetSlot(tuple_offset_), pool)) return 
false;
+        } else {
+          if (!ReadSlot<false>(tuple->GetSlot(tuple_offset_), pool)) return 
false;
+        }
+      } else {
+        tuple->SetNull(null_indicator_offset_);
+      }
+    }
+    return NextLevels<IN_COLLECTION>();
+  }
+
+  /// Implementation of the ReadValueBatch() functions specialized for this
+  /// column reader type. This function drives the reading of data pages and
+  /// caching of rep/def levels. Once a data page and cached levels are 
available,
+  /// it calls into a more specialized MaterializeValueBatch() for doing the 
actual
+  /// value materialization using the level caches.
+  template<bool IN_COLLECTION>
+  bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
+      uint8_t* tuple_mem, int* num_values) {
+    // Repetition level is only present if this column is nested in a 
collection type.
+    if (!IN_COLLECTION) DCHECK_EQ(max_rep_level(), 0) << 
slot_desc()->DebugString();
+    if (IN_COLLECTION) DCHECK_GT(max_rep_level(), 0) << 
slot_desc()->DebugString();
+
+    int val_count = 0;
+    bool continue_execution = true;
+    while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
+      // Read next page if necessary.
+      if (num_buffered_values_ == 0) {
+        if (!NextPage()) {
+          continue_execution = parent_->parse_status_.ok();
+          continue;
+        }
+      }
+
+      // Fill def/rep level caches if they are empty.
+      int level_batch_size = min(parent_->state_->batch_size(), 
num_buffered_values_);
+      if (!def_levels_.CacheHasNext()) {
+        
parent_->parse_status_.MergeStatus(def_levels_.CacheNextBatch(level_batch_size));
+      }
+      // We only need the repetition levels for populating the position slot 
since we
+      // are only populating top-level tuples.
+      if (IN_COLLECTION && pos_slot_desc_ != NULL && 
!rep_levels_.CacheHasNext()) {
+        
parent_->parse_status_.MergeStatus(rep_levels_.CacheNextBatch(level_batch_size));
+      }
+      if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+
+      // This special case is most efficiently handled here directly.
+      if (!MATERIALIZED && !IN_COLLECTION) {
+        int vals_to_add = min(def_levels_.CacheRemaining(), max_values - 
val_count);
+        val_count += vals_to_add;
+        def_levels_.CacheSkipLevels(vals_to_add);
+        num_buffered_values_ -= vals_to_add;
+        continue;
+      }
+
+      // Read data page and cached levels to materialize values.
+      int cache_start_idx = def_levels_.CacheCurrIdx();
+      uint8_t* next_tuple = tuple_mem + val_count * tuple_size;
+      int remaining_val_capacity = max_values - val_count;
+      int ret_val_count = 0;
+      if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
+        continue_execution = MaterializeValueBatch<IN_COLLECTION, true>(
+            pool, remaining_val_capacity, tuple_size, next_tuple, 
&ret_val_count);
+      } else {
+        continue_execution = MaterializeValueBatch<IN_COLLECTION, false>(
+            pool, remaining_val_capacity, tuple_size, next_tuple, 
&ret_val_count);
+      }
+      val_count += ret_val_count;
+      num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx);
+    }
+    *num_values = val_count;
+    return continue_execution;
+  }
+
+  /// Helper function for ReadValueBatch() above that performs value 
materialization.
+  /// It assumes a data page with remaining values is available, and that the 
def/rep
+  /// level caches have been populated.
+  /// For efficiency, the simple special case of !MATERIALIZED && 
!IN_COLLECTION is not
+  /// handled in this function.
+  template<bool IN_COLLECTION, bool IS_DICT_ENCODED>
+  bool MaterializeValueBatch(MemPool* pool, int max_values, int tuple_size,
+      uint8_t* tuple_mem, int* num_values) {
+    DCHECK(MATERIALIZED || IN_COLLECTION);
+    DCHECK_GT(num_buffered_values_, 0);
+    DCHECK(def_levels_.CacheHasNext());
+    if (IN_COLLECTION && pos_slot_desc_ != NULL) 
DCHECK(rep_levels_.CacheHasNext());
+
+    uint8_t* curr_tuple = tuple_mem;
+    int val_count = 0;
+    while (def_levels_.CacheHasNext()) {
+      Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
+      int def_level = def_levels_.CacheGetNext();
+
+      if (IN_COLLECTION) {
+        if (def_level < def_level_of_immediate_repeated_ancestor()) {
+          // A containing repeated field is empty or NULL. Skip the value but
+          // move to the next repetition level if necessary.
+          if (pos_slot_desc_ != NULL) rep_levels_.CacheGetNext();
+          continue;
+        }
+        if (pos_slot_desc_ != NULL) {
+          int rep_level = rep_levels_.CacheGetNext();
+          // Reset position counter if we are at the start of a new parent 
collection.
+          if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0;
+          void* pos_slot = tuple->GetSlot(pos_slot_desc()->tuple_offset());
+          *reinterpret_cast<int64_t*>(pos_slot) = pos_current_value_++;
+        }
+      }
+
+      if (MATERIALIZED) {
+        if (def_level >= max_def_level()) {
+          bool continue_execution =
+              ReadSlot<IS_DICT_ENCODED>(tuple->GetSlot(tuple_offset_), pool);
+          if (UNLIKELY(!continue_execution)) return false;
+        } else {
+          tuple->SetNull(null_indicator_offset_);
+        }
+      }
+
+      curr_tuple += tuple_size;
+      ++val_count;
+      if (UNLIKELY(val_count == max_values)) break;
+    }
+    *num_values = val_count;
+    return true;
+  }
+
+  virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
+      DictDecoderBase** decoder) {
+    if (!dict_decoder_.Reset(values, size, fixed_len_size_)) {
+        return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
+            slot_desc_->type().DebugString(), "could not decode dictionary");
+    }
+    dict_decoder_init_ = true;
+    *decoder = &dict_decoder_;
+    return Status::OK();
+  }
+
+  virtual bool HasDictionaryDecoder() {
+    return dict_decoder_init_;
+  }
+
+  virtual void ClearDictionaryDecoder() {
+    dict_decoder_init_ = false;
+  }
+
+  virtual Status InitDataPage(uint8_t* data, int size) {
+    page_encoding_ = current_page_header_.data_page_header.encoding;
+    if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY &&
+        page_encoding_ != parquet::Encoding::PLAIN) {
+      stringstream ss;
+      ss << "File '" << filename() << "' is corrupt: unexpected encoding: "
+         << PrintEncoding(page_encoding_) << " for data page of column '"
+         << schema_element().name << "'.";
+      return Status(ss.str());
+    }
+
+    // If slot_desc_ is NULL, dict_decoder_ is uninitialized
+    if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY && slot_desc_ != 
NULL) {
+      if (!dict_decoder_init_) {
+        return Status("File corrupt. Missing dictionary page.");
+      }
+      dict_decoder_.SetData(data, size);
+    }
+
+    // TODO: Perform filter selectivity checks here.
+    return Status::OK();
+  }
+
+ private:
+  /// Writes the next value into *slot using pool if necessary.
+  ///
+  /// Returns false if execution should be aborted for some reason, e.g. 
parse_error_ is
+  /// set, the query is cancelled, or the scan node limit was reached. 
Otherwise returns
+  /// true.
+  template<bool IS_DICT_ENCODED>
+  inline bool ReadSlot(void* slot, MemPool* pool) {
+    T val;
+    T* val_ptr = NeedsConversion() ? &val : reinterpret_cast<T*>(slot);
+    if (IS_DICT_ENCODED) {
+      DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN_DICTIONARY);
+      if (UNLIKELY(!dict_decoder_.GetValue(val_ptr))) {
+        SetDictDecodeError();
+        return false;
+      }
+    } else {
+      DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN);
+      int encoded_len =
+          ParquetPlainEncoder::Decode<T>(data_, data_end_, fixed_len_size_, 
val_ptr);
+      if (UNLIKELY(encoded_len < 0)) {
+        SetPlainDecodeError();
+        return false;
+      }
+      data_ += encoded_len;
+    }
+    if (UNLIKELY(NeedsConversion() &&
+            !ConvertSlot(&val, reinterpret_cast<T*>(slot), pool))) {
+      return false;
+    }
+    return true;
+  }
+
+  /// Most column readers never require conversion, so we can avoid branches by
+  /// returning constant false. Column readers for types that require 
conversion
+  /// must specialize this function.
+  inline bool NeedsConversion() const {
+    DCHECK(!needs_conversion_);
+    return false;
+  }
+
+  /// Converts and writes src into dst based on desc_->type()
+  bool ConvertSlot(const T* src, T* dst, MemPool* pool) {
+    DCHECK(false);
+    return false;
+  }
+
+  /// Pull out slow-path Status construction code from ReadRepetitionLevel()/
+  /// ReadDefinitionLevel() for performance.
+  void __attribute__((noinline)) SetDictDecodeError() {
+    parent_->parse_status_ = Status(TErrorCode::PARQUET_DICT_DECODE_FAILURE, 
filename(),
+        slot_desc_->type().DebugString(), stream_->file_offset());
+  }
+  void __attribute__((noinline)) SetPlainDecodeError() {
+    parent_->parse_status_ = Status(TErrorCode::PARQUET_CORRUPT_PLAIN_VALUE, 
filename(),
+        slot_desc_->type().DebugString(), stream_->file_offset());
+  }
+
+  /// Dictionary decoder for decoding column values.
+  DictDecoder<T> dict_decoder_;
+
+  /// True if dict_decoder_ has been initialized with a dictionary page.
+  bool dict_decoder_init_;
+
+  /// true if decoded values must be converted before being written to an 
output tuple.
+  bool needs_conversion_;
+
+  /// The size of this column with plain encoding for FIXED_LEN_BYTE_ARRAY, or
+  /// the max length for VARCHAR columns. Unused otherwise.
+  int fixed_len_size_;
+};
+
+template<>
+inline bool ScalarColumnReader<StringValue, true>::NeedsConversion() const {
+  return needs_conversion_;
+}
+
+template<>
+bool ScalarColumnReader<StringValue, true>::ConvertSlot(
+    const StringValue* src, StringValue* dst, MemPool* pool) {
+  DCHECK(slot_desc() != NULL);
+  DCHECK(slot_desc()->type().type == TYPE_CHAR);
+  int len = slot_desc()->type().len;
+  StringValue sv;
+  sv.len = len;
+  if (slot_desc()->type().IsVarLenStringType()) {
+    sv.ptr = reinterpret_cast<char*>(pool->TryAllocate(len));
+    if (UNLIKELY(sv.ptr == NULL)) {
+      string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ConvertSlot",
+          len, "StringValue");
+      parent_->parse_status_ =
+          pool->mem_tracker()->MemLimitExceeded(parent_->state_, details, len);
+      return false;
+    }
+  } else {
+    sv.ptr = reinterpret_cast<char*>(dst);
+  }
+  int unpadded_len = min(len, src->len);
+  memcpy(sv.ptr, src->ptr, unpadded_len);
+  StringValue::PadWithSpaces(sv.ptr, len, unpadded_len);
+
+  if (slot_desc()->type().IsVarLenStringType()) *dst = sv;
+  return true;
+}
+
+template<>
+inline bool ScalarColumnReader<TimestampValue, true>::NeedsConversion() const {
+  return needs_conversion_;
+}
+
+template<>
+bool ScalarColumnReader<TimestampValue, true>::ConvertSlot(
+    const TimestampValue* src, TimestampValue* dst, MemPool* pool) {
+  // Conversion should only happen when this flag is enabled.
+  DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps);
+  *dst = *src;
+  if (dst->HasDateAndTime()) dst->UtcToLocal();
+  return true;
+}
+
+class BoolColumnReader : public BaseScalarColumnReader {
+ public:
+  BoolColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
+      const SlotDescriptor* slot_desc)
+    : BaseScalarColumnReader(parent, node, slot_desc) {
+    if (slot_desc_ != NULL) DCHECK_EQ(slot_desc_->type().type, TYPE_BOOLEAN);
+  }
+
+  virtual ~BoolColumnReader() { }
+
+  virtual bool ReadValue(MemPool* pool, Tuple* tuple) {
+    return ReadValue<true>(pool, tuple);
+  }
+
+  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) {
+    return ReadValue<false>(pool, tuple);
+  }
+
+ protected:
+  virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
+      DictDecoderBase** decoder) {
+    DCHECK(false) << "Dictionary encoding is not supported for bools. Should 
never "
+                  << "have gotten this far.";
+    return Status::OK();
+  }
+
+  virtual bool HasDictionaryDecoder() {
+    // Decoder should never be created for bools.
+    return false;
+  }
+
+  virtual void ClearDictionaryDecoder() { }
+
+  virtual Status InitDataPage(uint8_t* data, int size) {
+    // Initialize bool decoder
+    bool_values_ = BitReader(data, size);
+    return Status::OK();
+  }
+
+ private:
+  template<bool IN_COLLECTION>
+  inline bool ReadValue(MemPool* pool, Tuple* tuple) {
+    DCHECK(slot_desc_ != NULL);
+    // Def and rep levels should be in valid range.
+    DCHECK_GE(rep_level_, 0);
+    DCHECK_LE(rep_level_, max_rep_level());
+    DCHECK_GE(def_level_, 0);
+    DCHECK_LE(def_level_, max_def_level());
+    DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
+        "Caller should have called NextLevels() until we are ready to read a 
value";
+
+    if (def_level_ >= max_def_level()) {
+      return ReadSlot<IN_COLLECTION>(tuple->GetSlot(tuple_offset_), pool);
+    } else {
+      // Null value
+      tuple->SetNull(null_indicator_offset_);
+      return NextLevels<IN_COLLECTION>();
+    }
+  }
+
+  /// Writes the next value into *slot using pool if necessary. Also advances 
def_level_
+  /// and rep_level_ via NextLevels().
+  ///
+  /// Returns false if execution should be aborted for some reason, e.g. 
parse_error_ is
+  /// set, the query is cancelled, or the scan node limit was reached. 
Otherwise returns
+  /// true.
+  template <bool IN_COLLECTION>
+  inline bool ReadSlot(void* slot, MemPool* pool)  {
+    if (!bool_values_.GetValue(1, reinterpret_cast<bool*>(slot))) {
+      parent_->parse_status_ = Status("Invalid bool column.");
+      return false;
+    }
+    return NextLevels<IN_COLLECTION>();
+  }
+
+  BitReader bool_values_;
+};
+
+bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values,
+    int tuple_size, uint8_t* tuple_mem, int* num_values) {
+  int val_count = 0;
+  bool continue_execution = true;
+  while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
+    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * 
tuple_size);
+    if (def_level_ < def_level_of_immediate_repeated_ancestor()) {
+      // A containing repeated field is empty or NULL
+      continue_execution = NextLevels();
+      continue;
+    }
+    // Fill in position slot if applicable
+    if (pos_slot_desc_ != NULL) ReadPosition(tuple);
+    continue_execution = ReadValue(pool, tuple);
+    ++val_count;
+  }
+  *num_values = val_count;
+  return continue_execution;
+}
+
+bool ParquetColumnReader::ReadNonRepeatedValueBatch(MemPool* pool,
+    int max_values, int tuple_size, uint8_t* tuple_mem, int* num_values) {
+  int val_count = 0;
+  bool continue_execution = true;
+  while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
+    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * 
tuple_size);
+    continue_execution = ReadNonRepeatedValue(pool, tuple);
+    ++val_count;
+  }
+  *num_values = val_count;
+  return continue_execution;
+}
+
+void ParquetColumnReader::ReadPosition(Tuple* tuple) {
+  DCHECK(pos_slot_desc() != NULL);
+  // NextLevels() should have already been called
+  DCHECK_GE(rep_level_, 0);
+  DCHECK_GE(def_level_, 0);
+  DCHECK_GE(pos_current_value_, 0);
+  DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
+      "Caller should have called NextLevels() until we are ready to read a 
value";
+
+  void* slot = tuple->GetSlot(pos_slot_desc()->tuple_offset());
+  *reinterpret_cast<int64_t*>(slot) = pos_current_value_++;
+}
+
+// In 1.1, we had a bug where the dictionary page metadata was not set. 
Returns true
+// if this matches those versions and compatibility workarounds need to be 
used.
+static bool RequiresSkippedDictionaryHeaderCheck(
+    const ParquetFileVersion& v) {
+  if (v.application != "impala") return false;
+  return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal);
+}
+
+Status BaseScalarColumnReader::ReadDataPage() {
+  Status status;
+  uint8_t* buffer;
+
+  // We're about to move to the next data page.  The previous data page is
+  // now complete, pass along the memory allocated for it.
+  
parent_->scratch_batch_->mem_pool()->AcquireData(decompressed_data_pool_.get(), 
false);
+
+  // Read the next data page, skipping page types we don't care about.
+  // We break out of this loop on the non-error case (a data page was found or 
we read all
+  // the pages).
+  while (true) {
+    DCHECK_EQ(num_buffered_values_, 0);
+    if (num_values_read_ == metadata_->num_values) {
+      // No more pages to read
+      // TODO: should we check for stream_->eosr()?
+      break;
+    } else if (num_values_read_ > metadata_->num_values) {
+      ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
+          metadata_->num_values, num_values_read_, node_.element->name, 
filename());
+      RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg));
+      return Status::OK();
+    }
+
+    int64_t buffer_size;
+    RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &buffer_size));
+    if (buffer_size == 0) {
+      // The data pages contain fewer values than stated in the column 
metadata.
+      DCHECK(stream_->eosr());
+      DCHECK_LT(num_values_read_, metadata_->num_values);
+      // TODO for 2.3: node_.element->name isn't necessarily useful
+      ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
+          metadata_->num_values, num_values_read_, node_.element->name, 
filename());
+      RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg));
+      return Status::OK();
+    }
+
+    // We don't know the actual header size until the thrift object is 
deserialized.  Loop
+    // until we successfully deserialize the header or exceed the maximum 
header size.
+    uint32_t header_size;
+    while (true) {
+      header_size = buffer_size;
+      status = DeserializeThriftMsg(
+          buffer, &header_size, true, &current_page_header_);
+      if (status.ok()) break;
+
+      if (buffer_size >= FLAGS_max_page_header_size) {
+        stringstream ss;
+        ss << "ParquetScanner: could not read data page because page header 
exceeded "
+           << "maximum size of "
+           << PrettyPrinter::Print(FLAGS_max_page_header_size, TUnit::BYTES);
+        status.AddDetail(ss.str());
+        return status;
+      }
+
+      // Didn't read entire header, increase buffer size and try again
+      Status status;
+      int64_t new_buffer_size = max<int64_t>(buffer_size * 2, 1024);
+      bool success = stream_->GetBytes(
+          new_buffer_size, &buffer, &new_buffer_size, &status, /* peek */ 
true);
+      if (!success) {
+        DCHECK(!status.ok());
+        return status;
+      }
+      DCHECK(status.ok());
+
+      if (buffer_size == new_buffer_size) {
+        DCHECK_NE(new_buffer_size, 0);
+        return Status(TErrorCode::PARQUET_HEADER_EOF, filename());
+      }
+      DCHECK_GT(new_buffer_size, buffer_size);
+      buffer_size = new_buffer_size;
+    }
+
+    // Successfully deserialized current_page_header_
+    if (!stream_->SkipBytes(header_size, &status)) return status;
+
+    int data_size = current_page_header_.compressed_page_size;
+    int uncompressed_size = current_page_header_.uncompressed_page_size;
+
+    if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) {
+      if (slot_desc_ == NULL) {
+        // Skip processing the dictionary page if we don't need to decode any 
values. In
+        // addition to being unnecessary, we are likely unable to successfully 
decode the
+        // dictionary values because we don't necessarily create the right 
type of scalar
+        // reader if there's no slot to read into (see CreateReader()).
+        if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
+        continue;
+      }
+
+      if (HasDictionaryDecoder()) {
+        return Status("Column chunk should not contain two dictionary pages.");
+      }
+      if (node_.element->type == parquet::Type::BOOLEAN) {
+        return Status("Unexpected dictionary page. Dictionary page is not"
+            " supported for booleans.");
+      }
+      const parquet::DictionaryPageHeader* dict_header = NULL;
+      if (current_page_header_.__isset.dictionary_page_header) {
+        dict_header = &current_page_header_.dictionary_page_header;
+      } else {
+        if (!RequiresSkippedDictionaryHeaderCheck(parent_->file_version_)) {
+          return Status("Dictionary page does not have dictionary header 
set.");
+        }
+      }
+      if (dict_header != NULL &&
+          dict_header->encoding != parquet::Encoding::PLAIN &&
+          dict_header->encoding != parquet::Encoding::PLAIN_DICTIONARY) {
+        return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported 
"
+            "for dictionary pages.");
+      }
+
+      if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
+      data_end_ = data_ + data_size;
+
+      uint8_t* dict_values = NULL;
+      if (decompressor_.get() != NULL) {
+        dict_values = 
parent_->dictionary_pool_->TryAllocate(uncompressed_size);
+        if (UNLIKELY(dict_values == NULL)) {
+          string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, 
"ReadDataPage",
+              uncompressed_size, "dictionary");
+          return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
+              parent_->state_, details, uncompressed_size);
+        }
+        RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, data_size, data_,
+            &uncompressed_size, &dict_values));
+        VLOG_FILE << "Decompressed " << data_size << " to " << 
uncompressed_size;
+        if (current_page_header_.uncompressed_page_size != uncompressed_size) {
+          return Status(Substitute("Error decompressing dictionary page in 
file '$0'. "
+              "Expected $1 uncompressed bytes but got $2", filename(),
+              current_page_header_.uncompressed_page_size, uncompressed_size));
+        }
+        data_size = uncompressed_size;
+      } else {
+        if (current_page_header_.uncompressed_page_size != data_size) {
+          return Status(Substitute("Error reading dictionary page in file 
'$0'. "
+              "Expected $1 bytes but got $2", filename(),
+              current_page_header_.uncompressed_page_size, data_size));
+        }
+        // Copy dictionary from io buffer (which will be recycled as we read
+        // more data) to a new buffer
+        dict_values = parent_->dictionary_pool_->TryAllocate(data_size);
+        if (UNLIKELY(dict_values == NULL)) {
+          string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, 
"ReadDataPage",
+              data_size, "dictionary");
+          return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
+              parent_->state_, details, data_size);
+        }
+        memcpy(dict_values, data_, data_size);
+      }
+
+      DictDecoderBase* dict_decoder;
+      RETURN_IF_ERROR(CreateDictionaryDecoder(dict_values, data_size, 
&dict_decoder));
+      if (dict_header != NULL &&
+          dict_header->num_values != dict_decoder->num_entries()) {
+        return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
+            slot_desc_->type().DebugString(),
+            Substitute("Expected $0 entries but data contained $1 entries",
+            dict_header->num_values, dict_decoder->num_entries()));
+      }
+      // Done with dictionary page, read next page
+      continue;
+    }
+
+    if (current_page_header_.type != parquet::PageType::DATA_PAGE) {
+      // We can safely skip non-data pages
+      if (!stream_->SkipBytes(data_size, &status)) return status;
+      continue;
+    }
+
+    // Read Data Page
+    // TODO: when we start using page statistics, we will need to ignore 
certain corrupt
+    // statistics. See IMPALA-2208 and PARQUET-251.
+    if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
+    data_end_ = data_ + data_size;
+    num_buffered_values_ = current_page_header_.data_page_header.num_values;
+    num_values_read_ += num_buffered_values_;
+
+    if (decompressor_.get() != NULL) {
+      SCOPED_TIMER(parent_->decompress_timer_);
+      uint8_t* decompressed_buffer =
+          decompressed_data_pool_->TryAllocate(uncompressed_size);
+      if (UNLIKELY(decompressed_buffer == NULL)) {
+        string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage",
+            uncompressed_size, "decompressed data");
+        return decompressed_data_pool_->mem_tracker()->MemLimitExceeded(
+            parent_->state_, details, uncompressed_size);
+      }
+      RETURN_IF_ERROR(decompressor_->ProcessBlock32(true,
+          current_page_header_.compressed_page_size, data_, &uncompressed_size,
+          &decompressed_buffer));
+      VLOG_FILE << "Decompressed " << current_page_header_.compressed_page_size
+                << " to " << uncompressed_size;
+      if (current_page_header_.uncompressed_page_size != uncompressed_size) {
+        return Status(Substitute("Error decompressing data page in file '$0'. "
+            "Expected $1 uncompressed bytes but got $2", filename(),
+            current_page_header_.uncompressed_page_size, uncompressed_size));
+      }
+      data_ = decompressed_buffer;
+      data_size = current_page_header_.uncompressed_page_size;
+      data_end_ = data_ + data_size;
+    } else {
+      DCHECK_EQ(metadata_->codec, parquet::CompressionCodec::UNCOMPRESSED);
+      if (current_page_header_.compressed_page_size != uncompressed_size) {
+        return Status(Substitute("Error reading data page in file '$0'. "
+            "Expected $1 bytes but got $2", filename(),
+            current_page_header_.compressed_page_size, uncompressed_size));
+      }
+    }
+
+    // Initialize the repetition level data
+    RETURN_IF_ERROR(rep_levels_.Init(filename(),
+        current_page_header_.data_page_header.repetition_level_encoding,
+        parent_->level_cache_pool_.get(), parent_->state_->batch_size(),
+        max_rep_level(), num_buffered_values_,
+        &data_, &data_size));
+
+    // Initialize the definition level data
+    RETURN_IF_ERROR(def_levels_.Init(filename(),
+        current_page_header_.data_page_header.definition_level_encoding,
+        parent_->level_cache_pool_.get(), parent_->state_->batch_size(),
+        max_def_level(), num_buffered_values_, &data_, &data_size));
+
+    // Data can be empty if the column contains all NULLs
+    if (data_size != 0) RETURN_IF_ERROR(InitDataPage(data_, data_size));
+    break;
+  }
+
+  return Status::OK();
+}
+
+template <bool ADVANCE_REP_LEVEL>
+bool BaseScalarColumnReader::NextLevels() {
+  if (!ADVANCE_REP_LEVEL) DCHECK_EQ(max_rep_level(), 0) << 
slot_desc()->DebugString();
+
+  if (UNLIKELY(num_buffered_values_ == 0)) {
+    if (!NextPage()) return parent_->parse_status_.ok();
+  }
+  --num_buffered_values_;
+
+  // Definition level is not present if column and any containing structs are 
required.
+  def_level_ = max_def_level() == 0 ? 0 : def_levels_.ReadLevel();
+
+  if (ADVANCE_REP_LEVEL && max_rep_level() > 0) {
+    // Repetition level is only present if this column is nested in any 
collection type.
+    rep_level_ = rep_levels_.ReadLevel();
+    // Reset position counter if we are at the start of a new parent 
collection.
+    if (rep_level_ <= max_rep_level() - 1) pos_current_value_ = 0;
+  }
+
+  return parent_->parse_status_.ok();
+}
+
+bool BaseScalarColumnReader::NextPage() {
+  parent_->assemble_rows_timer_.Stop();
+  parent_->parse_status_ = ReadDataPage();
+  if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+  if (num_buffered_values_ == 0) {
+    rep_level_ = HdfsParquetScanner::ROW_GROUP_END;
+    def_level_ = HdfsParquetScanner::INVALID_LEVEL;
+    pos_current_value_ = HdfsParquetScanner::INVALID_POS;
+    return false;
+  }
+  parent_->assemble_rows_timer_.Start();
+  return true;
+}
+
+bool CollectionColumnReader::NextLevels() {
+  DCHECK(!children_.empty());
+  DCHECK_LE(rep_level_, new_collection_rep_level());
+  for (int c = 0; c < children_.size(); ++c) {
+    do {
+      // TODO(skye): verify somewhere that all column readers are at end
+      if (!children_[c]->NextLevels()) return false;
+    } while (children_[c]->rep_level() > new_collection_rep_level());
+  }
+  UpdateDerivedState();
+  return true;
+}
+
+bool CollectionColumnReader::ReadValue(MemPool* pool, Tuple* tuple) {
+  DCHECK_GE(rep_level_, 0);
+  DCHECK_GE(def_level_, 0);
+  DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
+      "Caller should have called NextLevels() until we are ready to read a 
value";
+
+  if (tuple_offset_ == -1) {
+    return CollectionColumnReader::NextLevels();
+  } else if (def_level_ >= max_def_level()) {
+    return ReadSlot(tuple->GetSlot(tuple_offset_), pool);
+  } else {
+    // Null value
+    tuple->SetNull(null_indicator_offset_);
+    return CollectionColumnReader::NextLevels();
+  }
+}
+
+bool CollectionColumnReader::ReadNonRepeatedValue(
+    MemPool* pool, Tuple* tuple) {
+  return CollectionColumnReader::ReadValue(pool, tuple);
+}
+
+bool CollectionColumnReader::ReadSlot(void* slot, MemPool* pool) {
+  DCHECK(!children_.empty());
+  DCHECK_LE(rep_level_, new_collection_rep_level());
+
+  // Recursively read the collection into a new CollectionValue.
+  CollectionValue* coll_slot = reinterpret_cast<CollectionValue*>(slot);
+  *coll_slot = CollectionValue();
+  CollectionValueBuilder builder(
+      coll_slot, *slot_desc_->collection_item_descriptor(), pool, 
parent_->state_);
+  bool continue_execution = parent_->AssembleCollection(
+      children_, new_collection_rep_level(), &builder);
+  if (!continue_execution) return false;
+
+  // AssembleCollection() advances child readers, so we don't need to call 
NextLevels()
+  UpdateDerivedState();
+  return true;
+}
+
+void CollectionColumnReader::UpdateDerivedState() {
+  // We don't need to cap our def_level_ at max_def_level(). We always check 
def_level_
+  // >= max_def_level() to check if the collection is defined.
+  // TODO(skye): consider capping def_level_ at max_def_level()
+  def_level_ = children_[0]->def_level();
+  rep_level_ = children_[0]->rep_level();
+
+  // All children should have been advanced to the beginning of the next 
collection
+  for (int i = 0; i < children_.size(); ++i) {
+    DCHECK_EQ(children_[i]->rep_level(), rep_level_);
+    if (def_level_ < max_def_level()) {
+      // Collection not defined
+      FILE_CHECK_EQ(children_[i]->def_level(), def_level_);
+    } else {
+      // Collection is defined
+      FILE_CHECK_GE(children_[i]->def_level(), max_def_level());
+    }
+  }
+
+  if (RowGroupAtEnd()) {
+    // No more values
+    pos_current_value_ = HdfsParquetScanner::INVALID_POS;
+  } else if (rep_level_ <= max_rep_level() - 2) {
+    // Reset position counter if we are at the start of a new parent 
collection (i.e.,
+    // the current collection is the first item in a new parent collection).
+    pos_current_value_ = 0;
+  }
+}
+
+ParquetColumnReader* ParquetColumnReader::Create(const SchemaNode& node,
+    bool is_collection_field, const SlotDescriptor* slot_desc, 
HdfsParquetScanner* parent) {
+  ParquetColumnReader* reader = NULL;
+  if (is_collection_field) {
+    // Create collection reader (note this handles both NULL and non-NULL 
'slot_desc')
+    reader = new CollectionColumnReader(parent, node, slot_desc);
+  } else if (slot_desc != NULL) {
+    // Create the appropriate ScalarColumnReader type to read values into 
'slot_desc'
+    switch (slot_desc->type().type) {
+      case TYPE_BOOLEAN:
+        reader = new BoolColumnReader(parent, node, slot_desc);
+        break;
+      case TYPE_TINYINT:
+        reader = new ScalarColumnReader<int8_t, true>(parent, node, slot_desc);
+        break;
+      case TYPE_SMALLINT:
+        reader = new ScalarColumnReader<int16_t, true>(parent, node, 
slot_desc);
+        break;
+      case TYPE_INT:
+        reader = new ScalarColumnReader<int32_t, true>(parent, node, 
slot_desc);
+        break;
+      case TYPE_BIGINT:
+        reader = new ScalarColumnReader<int64_t, true>(parent, node, 
slot_desc);
+        break;
+      case TYPE_FLOAT:
+        reader = new ScalarColumnReader<float, true>(parent, node, slot_desc);
+        break;
+      case TYPE_DOUBLE:
+        reader = new ScalarColumnReader<double, true>(parent, node, slot_desc);
+        break;
+      case TYPE_TIMESTAMP:
+        reader = new ScalarColumnReader<TimestampValue, true>(parent, node, 
slot_desc);
+        break;
+      case TYPE_STRING:
+      case TYPE_VARCHAR:
+      case TYPE_CHAR:
+        reader = new ScalarColumnReader<StringValue, true>(parent, node, 
slot_desc);
+        break;
+      case TYPE_DECIMAL:
+        switch (slot_desc->type().GetByteSize()) {
+          case 4:
+            reader = new ScalarColumnReader<Decimal4Value, true>(
+                parent, node, slot_desc);
+            break;
+          case 8:
+            reader = new ScalarColumnReader<Decimal8Value, true>(
+                parent, node, slot_desc);
+            break;
+          case 16:
+            reader = new ScalarColumnReader<Decimal16Value, true>(
+                parent, node, slot_desc);
+            break;
+        }
+        break;
+      default:
+        DCHECK(false) << slot_desc->type().DebugString();
+    }
+  } else {
+    // Special case for counting scalar values (e.g. count(*), no materialized 
columns in
+    // the file, only materializing a position slot). We won't actually read 
any values,
+    // only the rep and def levels, so it doesn't matter what kind of reader 
we make.
+    reader = new ScalarColumnReader<int8_t, false>(parent, node, slot_desc);
+  }
+  return parent->obj_pool_.Add(reader);
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h 
b/be/src/exec/parquet-column-readers.h
new file mode 100644
index 0000000..3c26084
--- /dev/null
+++ b/be/src/exec/parquet-column-readers.h
@@ -0,0 +1,500 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef IMPALA_PARQUET_COLUMN_READERS_H
+#define IMPALA_PARQUET_COLUMN_READERS_H
+
+#include <boost/scoped_ptr.hpp>
+
+#include "exec/hdfs-parquet-scanner.h"
+#include "util/codec.h"
+#include "util/dict-encoding.h"
+#include "util/rle-encoding.h"
+
+namespace impala {
+
+class Tuple;
+class MemPool;
+
+/// Decoder for all supported Parquet level encodings. Optionally reads, 
decodes, and
+/// caches level values in batches.
+/// Level values are unsigned 8-bit integers because we support a maximum 
nesting
+/// depth of 100, as enforced by the FE. Using a small type saves memory and 
speeds up
+/// populating the level cache (e.g., with RLE we can memset() repeated 
values).
+///
+/// Inherits from RleDecoder instead of containing one for performance reasons.
+/// The containment design would require two BitReaders per column reader. The 
extra
+/// BitReader causes enough bloat for a column reader to require another cache 
line.
+/// TODO: It is not clear whether the inheritance vs. containment choice still 
makes
+/// sense with column-wise materialization. The containment design seems 
cleaner and
+/// we should revisit.
+class ParquetLevelDecoder : public RleDecoder {
+ public:
+  ParquetLevelDecoder(bool is_def_level_decoder)
+    : cached_levels_(NULL),
+      num_cached_levels_(0),
+      cached_level_idx_(0),
+      encoding_(parquet::Encoding::PLAIN),
+      max_level_(0),
+      cache_size_(0),
+      num_buffered_values_(0),
+      decoding_error_code_(is_def_level_decoder ?
+          TErrorCode::PARQUET_DEF_LEVEL_ERROR : 
TErrorCode::PARQUET_REP_LEVEL_ERROR) {
+  }
+
+  /// Initialize the LevelDecoder. Reads and advances the provided data buffer 
if the
+  /// encoding requires reading metadata from the page header.
+  Status Init(const string& filename, parquet::Encoding::type encoding,
+      MemPool* cache_pool, int cache_size, int max_level, int 
num_buffered_values,
+      uint8_t** data, int* data_size);
+
+  /// Returns the next level or INVALID_LEVEL if there was an error.
+  inline int16_t ReadLevel();
+
+  /// Decodes and caches the next batch of levels. Resets members associated 
with the
+  /// cache. Returns a non-ok status if there was a problem decoding a level, 
or if a
+  /// level was encountered with a value greater than max_level_.
+  Status CacheNextBatch(int batch_size);
+
+  /// Functions for working with the level cache.
+  inline bool CacheHasNext() const { return cached_level_idx_ < 
num_cached_levels_; }
+  inline uint8_t CacheGetNext() {
+    DCHECK_LT(cached_level_idx_, num_cached_levels_);
+    return cached_levels_[cached_level_idx_++];
+  }
+  inline void CacheSkipLevels(int num_levels) {
+    DCHECK_LE(cached_level_idx_ + num_levels, num_cached_levels_);
+    cached_level_idx_ += num_levels;
+  }
+  inline int CacheSize() const { return num_cached_levels_; }
+  inline int CacheRemaining() const { return num_cached_levels_ - 
cached_level_idx_; }
+  inline int CacheCurrIdx() const { return cached_level_idx_; }
+
+ private:
+  /// Initializes members associated with the level cache. Allocates memory for
+  /// the cache from pool, if necessary.
+  Status InitCache(MemPool* pool, int cache_size);
+
+  /// Decodes and writes a batch of levels into the cache. Sets the number of
+  /// values written to the cache in *num_cached_levels. Returns false if 
there was
+  /// an error decoding a level or if there was a level value greater than 
max_level_.
+  bool FillCache(int batch_size, int* num_cached_levels);
+
+  /// Buffer for a batch of levels. The memory is allocated and owned by a 
pool in
+  /// passed in Init().
+  uint8_t* cached_levels_;
+  /// Number of valid level values in the cache.
+  int num_cached_levels_;
+  /// Current index into cached_levels_.
+  int cached_level_idx_;
+  parquet::Encoding::type encoding_;
+
+  /// For error checking and reporting.
+  int max_level_;
+  /// Number of level values cached_levels_ has memory allocated for.
+  int cache_size_;
+  /// Number of remaining data values in the current data page.
+  int num_buffered_values_;
+  string filename_;
+  TErrorCode::type decoding_error_code_;
+};
+
+/// Base class for reading a Parquet column. Reads a logical column, not 
necessarily a
+/// column materialized in the file (e.g. collections). The two subclasses are
+/// BaseScalarColumnReader and CollectionColumnReader. Column readers read one 
def and rep
+/// level pair at a time. The current def and rep level are exposed to the 
user, and the
+/// corresponding value (if defined) can optionally be copied into a slot via
+/// ReadValue(). Can also write position slots.
+class ParquetColumnReader {
+ public:
+  /// Creates a column reader for 'node' and associates it with the given 
parent scanner.
+  /// Adds the new column reader to the parent's object pool.
+  /// 'slot_desc' may be NULL, in which case the returned column reader can 
only be used
+  /// to read def/rep levels.
+  /// 'is_collection_field' should be set to true if the returned reader is 
reading a
+  /// collection. This cannot be determined purely by 'node' because a 
repeated scalar
+  /// node represents both an array and the array's items (in this case
+  /// 'is_collection_field' should be true if the reader reads one value per 
array, and
+  /// false if it reads one value per item).  The reader is added to the 
runtime state's
+  /// object pool. Does not create child readers for collection readers; these 
must be
+  /// added by the caller.
+  static ParquetColumnReader* Create(const SchemaNode& node, bool 
is_collection_field,
+      const SlotDescriptor* slot_desc, HdfsParquetScanner* parent);
+
+  virtual ~ParquetColumnReader() { }
+
+  int def_level() const { return def_level_; }
+  int rep_level() const { return rep_level_; }
+
+  const SlotDescriptor* slot_desc() const { return slot_desc_; }
+  const parquet::SchemaElement& schema_element() const { return 
*node_.element; }
+  int16_t max_def_level() const { return max_def_level_; }
+  int16_t max_rep_level() const { return max_rep_level_; }
+  int def_level_of_immediate_repeated_ancestor() const {
+    return node_.def_level_of_immediate_repeated_ancestor;
+  }
+  const SlotDescriptor* pos_slot_desc() const { return pos_slot_desc_; }
+  void set_pos_slot_desc(const SlotDescriptor* pos_slot_desc) {
+    DCHECK(pos_slot_desc_ == NULL);
+    pos_slot_desc_ = pos_slot_desc;
+  }
+
+  /// Returns true if this reader materializes collections (i.e. 
CollectionValues).
+  virtual bool IsCollectionReader() const { return false; }
+
+  const char* filename() const { return parent_->filename(); };
+
+  /// Read the current value (or null) into 'tuple' for this column. This 
should only be
+  /// called when a value is defined, i.e., def_level() >=
+  /// def_level_of_immediate_repeated_ancestor() (since empty or NULL 
collections produce
+  /// no output values), otherwise NextLevels() should be called instead.
+  ///
+  /// Advances this column reader to the next value (i.e. NextLevels() doesn't 
need to be
+  /// called after calling ReadValue()).
+  ///
+  /// Returns false if execution should be aborted for some reason, e.g. 
parse_error_ is
+  /// set, the query is cancelled, or the scan node limit was reached. 
Otherwise returns
+  /// true.
+  ///
+  /// NextLevels() must be called on this reader before calling ReadValue() 
for the first
+  /// time. This is to initialize the current value that ReadValue() will read.
+  ///
+  /// TODO: this is the function that needs to be codegen'd (e.g. 
CodegenReadValue())
+  /// The codegened functions from all the materialized cols will then be 
combined
+  /// into one function.
+  /// TODO: another option is to materialize col by col for the entire row 
batch in
+  /// one call.  e.g. MaterializeCol would write out 1024 values.  Our row 
batches
+  /// are currently dense so we'll need to figure out something there.
+  virtual bool ReadValue(MemPool* pool, Tuple* tuple) = 0;
+
+  /// Same as ReadValue() but does not advance repetition level. Only valid 
for columns
+  /// not in collections.
+  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) = 0;
+
+  /// Returns true if this reader needs to be seeded with NextLevels() before
+  /// calling ReadValueBatch() or ReadNonRepeatedValueBatch().
+  /// Note that all readers need to be seeded before calling the non-batched 
ReadValue().
+  virtual bool NeedsSeedingForBatchedReading() const { return true; }
+
+  /// Batched version of ReadValue() that reads up to max_values at once and 
materializes
+  /// them into tuples in tuple_mem. Returns the number of values actually 
materialized
+  /// in *num_values. The return value, error behavior and state changes are 
generally
+  /// the same as in ReadValue(). For example, if an error occurs in the 
middle of
+  /// materializing a batch then false is returned, and num_values, tuple_mem, 
as well as
+  /// this column reader are left in an undefined state, assuming that the 
caller will
+  /// immediately abort execution.
+  virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
+      uint8_t* tuple_mem, int* num_values);
+
+  /// Batched version of ReadNonRepeatedValue() that reads up to max_values at 
once and
+  /// materializes them into tuples in tuple_mem.
+  /// The return value and error behavior are the same as in ReadValueBatch().
+  virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int 
tuple_size,
+      uint8_t* tuple_mem, int* num_values);
+
+  /// Advances this column reader's def and rep levels to the next logical 
value, i.e. to
+  /// the next scalar value or the beginning of the next collection, without 
attempting to
+  /// read the value. This is used to skip past def/rep levels that don't 
materialize a
+  /// value, such as the def/rep levels corresponding to an empty containing 
collection.
+  ///
+  /// NextLevels() must be called on this reader before calling ReadValue() 
for the first
+  /// time. This is to initialize the current value that ReadValue() will read.
+  ///
+  /// Returns false if execution should be aborted for some reason, e.g. 
parse_error_ is
+  /// set, the query is cancelled, or the scan node limit was reached. 
Otherwise returns
+  /// true.
+  virtual bool NextLevels() = 0;
+
+  /// Should only be called if pos_slot_desc_ is non-NULL. Writes 
pos_current_value_ to
+  /// 'tuple' (i.e. "reads" the synthetic position field of the parent 
collection into
+  /// 'tuple') and increments pos_current_value_.
+  void ReadPosition(Tuple* tuple);
+
+  /// Returns true if this column reader has reached the end of the row group.
+  inline bool RowGroupAtEnd() { return rep_level_ == 
HdfsParquetScanner::ROW_GROUP_END; }
+
+ protected:
+  HdfsParquetScanner* parent_;
+  const SchemaNode& node_;
+  const SlotDescriptor* slot_desc_;
+
+  /// The slot descriptor for the position field of the tuple, if there is 
one. NULL if
+  /// there's not. Only one column reader for a given tuple desc will have 
this set.
+  const SlotDescriptor* pos_slot_desc_;
+
+  /// The next value to write into the position slot, if there is one. 64-bit 
int because
+  /// the pos slot is always a BIGINT Set to -1 when this column reader does 
not have a
+  /// current rep and def level (i.e. before the first NextLevels() call or 
after the last
+  /// value in the column has been read).
+  int64_t pos_current_value_;
+
+  /// The current repetition and definition levels of this reader. Advanced via
+  /// ReadValue() and NextLevels(). Set to -1 when this column reader does not 
have a
+  /// current rep and def level (i.e. before the first NextLevels() call or 
after the last
+  /// value in the column has been read). If this is not inside a collection, 
rep_level_ is
+  /// always 0.
+  /// int16_t is large enough to hold the valid levels 0-255 and sentinel 
value -1.
+  /// The maximum values are cached here because they are accessed in inner 
loops.
+  int16_t rep_level_;
+  int16_t max_rep_level_;
+  int16_t def_level_;
+  int16_t max_def_level_;
+
+  // Cache frequently accessed members of slot_desc_ for perf.
+
+  /// slot_desc_->tuple_offset(). -1 if slot_desc_ is NULL.
+  int tuple_offset_;
+
+  /// slot_desc_->null_indicator_offset(). Invalid if slot_desc_ is NULL.
+  NullIndicatorOffset null_indicator_offset_;
+
+  ParquetColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
+      const SlotDescriptor* slot_desc)
+    : parent_(parent),
+      node_(node),
+      slot_desc_(slot_desc),
+      pos_slot_desc_(NULL),
+      pos_current_value_(HdfsParquetScanner::INVALID_POS),
+      rep_level_(HdfsParquetScanner::INVALID_LEVEL),
+      max_rep_level_(node_.max_rep_level),
+      def_level_(HdfsParquetScanner::INVALID_LEVEL),
+      max_def_level_(node_.max_def_level),
+      tuple_offset_(slot_desc == NULL ? -1 : slot_desc->tuple_offset()),
+      null_indicator_offset_(slot_desc == NULL ? NullIndicatorOffset(-1, -1) :
+          slot_desc->null_indicator_offset()) {
+    DCHECK_GE(node_.max_rep_level, 0);
+    DCHECK_LE(node_.max_rep_level, std::numeric_limits<int16_t>::max());
+    DCHECK_GE(node_.max_def_level, 0);
+    DCHECK_LE(node_.max_def_level, std::numeric_limits<int16_t>::max());
+    // rep_level_ is always valid and equal to 0 if col not in collection.
+    if (max_rep_level() == 0) rep_level_ = 0;
+  }
+};
+
+/// Reader for a single column from the parquet file.  It's associated with a
+/// ScannerContext::Stream and is responsible for decoding the data.  Super 
class for
+/// per-type column readers. This contains most of the logic, the type 
specific functions
+/// must be implemented in the subclass.
+class BaseScalarColumnReader : public ParquetColumnReader {
+ public:
+  BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
+      const SlotDescriptor* slot_desc)
+    : ParquetColumnReader(parent, node, slot_desc),
+      data_(NULL),
+      data_end_(NULL),
+      def_levels_(true),
+      rep_levels_(false),
+      page_encoding_(parquet::Encoding::PLAIN_DICTIONARY),
+      num_buffered_values_(0),
+      num_values_read_(0),
+      metadata_(NULL),
+      stream_(NULL),
+      decompressed_data_pool_(new MemPool(parent->scan_node_->mem_tracker())) {
+    DCHECK_GE(node_.col_idx, 0) << node_.DebugString();
+  }
+
+  virtual ~BaseScalarColumnReader() { }
+
+  /// This is called once for each row group in the file.
+  Status Reset(const parquet::ColumnMetaData* metadata, 
ScannerContext::Stream* stream) {
+    DCHECK(stream != NULL);
+    DCHECK(metadata != NULL);
+
+    num_buffered_values_ = 0;
+    data_ = NULL;
+    data_end_ = NULL;
+    stream_ = stream;
+    metadata_ = metadata;
+    num_values_read_ = 0;
+    def_level_ = -1;
+    // See ColumnReader constructor.
+    rep_level_ = max_rep_level() == 0 ? 0 : -1;
+    pos_current_value_ = -1;
+
+    if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
+      RETURN_IF_ERROR(Codec::CreateDecompressor(
+          NULL, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], 
&decompressor_));
+    }
+    ClearDictionaryDecoder();
+    return Status::OK();
+  }
+
+  /// Called once when the scanner is complete for final cleanup.
+  void Close() {
+    if (decompressor_.get() != NULL) decompressor_->Close();
+  }
+
+  int64_t total_len() const { return metadata_->total_compressed_size; }
+  int col_idx() const { return node_.col_idx; }
+  THdfsCompression::type codec() const {
+    if (metadata_ == NULL) return THdfsCompression::NONE;
+    return PARQUET_TO_IMPALA_CODEC[metadata_->codec];
+  }
+  MemPool* decompressed_data_pool() const { return 
decompressed_data_pool_.get(); }
+
+  /// Reads the next definition and repetition levels for this column. 
Initializes the
+  /// next data page if necessary.
+  virtual bool NextLevels() { return NextLevels<true>(); }
+
+  // TODO: Some encodings might benefit a lot from a SkipValues(int num_rows) 
if
+  // we know this row can be skipped. This could be very useful with stats and 
big
+  // sections can be skipped. Implement that when we can benefit from it.
+
+ protected:
+  // Friend parent scanner so it can perform validation (e.g. 
ValidateEndOfRowGroup())
+  friend class HdfsParquetScanner;
+
+  // Class members that are accessed for every column should be included up 
here so they
+  // fit in as few cache lines as possible.
+
+  /// Pointer to start of next value in data page
+  uint8_t* data_;
+
+  /// End of the data page.
+  const uint8_t* data_end_;
+
+  /// Decoder for definition levels.
+  ParquetLevelDecoder def_levels_;
+
+  /// Decoder for repetition levels.
+  ParquetLevelDecoder rep_levels_;
+
+  /// Page encoding for values. Cached here for perf.
+  parquet::Encoding::type page_encoding_;
+
+  /// Num values remaining in the current data page
+  int num_buffered_values_;
+
+  // Less frequently used members that are not accessed in inner loop should 
go below
+  // here so they do not occupy precious cache line space.
+
+  /// The number of values seen so far. Updated per data page.
+  int64_t num_values_read_;
+
+  const parquet::ColumnMetaData* metadata_;
+  boost::scoped_ptr<Codec> decompressor_;
+  ScannerContext::Stream* stream_;
+
+  /// Pool to allocate decompression buffers from.
+  boost::scoped_ptr<MemPool> decompressed_data_pool_;
+
+  /// Header for current data page.
+  parquet::PageHeader current_page_header_;
+
+  /// Read the next data page. If a dictionary page is encountered, that will 
be read and
+  /// this function will continue reading the next data page.
+  Status ReadDataPage();
+
+  /// Try to move the the next page and buffer more values. Return false and 
sets rep_level_,
+  /// def_level_ and pos_current_value_ to -1 if no more pages or an error 
encountered.
+  bool NextPage();
+
+  /// Implementation for NextLevels().
+  template <bool ADVANCE_REP_LEVEL>
+  bool NextLevels();
+
+  /// Creates a dictionary decoder from values/size. 'decoder' is set to point 
to a
+  /// dictionary decoder stored in this object. Subclass must implement this. 
Returns
+  /// an error status if the dictionary values could not be decoded 
successfully.
+  virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
+      DictDecoderBase** decoder) = 0;
+
+  /// Return true if the column has an initialized dictionary decoder. 
Subclass must
+  /// implement this.
+  virtual bool HasDictionaryDecoder() = 0;
+
+  /// Clear the dictionary decoder so HasDictionaryDecoder() will return 
false. Subclass
+  /// must implement this.
+  virtual void ClearDictionaryDecoder() = 0;
+
+  /// Initializes the reader with the data contents. This is the content for 
the entire
+  /// decompressed data page. Decoders can initialize state from here.
+  virtual Status InitDataPage(uint8_t* data, int size) = 0;
+
+ private:
+  /// Writes the next value into *slot using pool if necessary. Also advances 
rep_level_
+  /// and def_level_ via NextLevels().
+  ///
+  /// Returns false if execution should be aborted for some reason, e.g. 
parse_error_ is
+  /// set, the query is cancelled, or the scan node limit was reached. 
Otherwise returns
+  /// true.
+  template <bool IN_COLLECTION>
+  inline bool ReadSlot(void* slot, MemPool* pool);
+};
+
+/// Collections are not materialized directly in parquet files; only scalar 
values appear
+/// in the file. CollectionColumnReader uses the definition and repetition 
levels of child
+/// column readers to figure out the boundaries of each collection in this 
column.
+class CollectionColumnReader : public ParquetColumnReader {
+ public:
+  CollectionColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
+      const SlotDescriptor* slot_desc)
+    : ParquetColumnReader(parent, node, slot_desc) {
+    DCHECK(node_.is_repeated());
+    if (slot_desc != NULL) DCHECK(slot_desc->type().IsCollectionType());
+  }
+
+  virtual ~CollectionColumnReader() { }
+
+  vector<ParquetColumnReader*>* children() { return &children_; }
+
+  virtual bool IsCollectionReader() const { return true; }
+
+  /// The repetition level indicating that the current value is the first in a 
new
+  /// collection (meaning the last value read was the final item in the 
previous
+  /// collection).
+  int new_collection_rep_level() const { return max_rep_level() - 1; }
+
+  /// Materializes CollectionValue into tuple slot (if materializing) and 
advances to next
+  /// value.
+  virtual bool ReadValue(MemPool* pool, Tuple* tuple);
+
+  /// Same as ReadValue but does not advance repetition level. Only valid for 
columns not
+  /// in collections.
+  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple);
+
+  /// Advances all child readers to the beginning of the next collection and 
updates this
+  /// reader's state.
+  virtual bool NextLevels();
+
+  /// This is called once for each row group in the file.
+  void Reset() {
+    def_level_ = -1;
+    rep_level_ = -1;
+    pos_current_value_ = -1;
+  }
+
+ private:
+  /// Column readers of fields contained within this collection. There is at 
least one
+  /// child reader per collection reader. Child readers either materialize 
slots in the
+  /// collection item tuples, or there is a single child reader that does not 
materialize
+  /// any slot and is only used by this reader to read def and rep levels.
+  vector<ParquetColumnReader*> children_;
+
+  /// Updates this reader's def_level_, rep_level_, and pos_current_value_ 
based on child
+  /// reader's state.
+  void UpdateDerivedState();
+
+  /// Recursively reads from children_ to assemble a single CollectionValue 
into
+  /// *slot. Also advances rep_level_ and def_level_ via NextLevels().
+  ///
+  /// Returns false if execution should be aborted for some reason, e.g. 
parse_error_ is
+  /// set, the query is cancelled, or the scan node limit was reached. 
Otherwise returns
+  /// true.
+  inline bool ReadSlot(void* slot, MemPool* pool);
+};
+
+}
+
+#endif

Reply via email to