http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc 
b/be/src/exec/parquet-column-readers.cc
deleted file mode 100644
index aec1265..0000000
--- a/be/src/exec/parquet-column-readers.cc
+++ /dev/null
@@ -1,1931 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet-column-readers.h"
-
-#include <boost/scoped_ptr.hpp>
-#include <string>
-#include <sstream>
-#include <gflags/gflags.h>
-#include <gutil/strings/substitute.h>
-
-#include "exec/hdfs-parquet-scanner.h"
-#include "exec/parquet-metadata-utils.h"
-#include "exec/parquet-scratch-tuple-batch.h"
-#include "exec/read-write-util.h"
-#include "exec/scanner-context.inline.h"
-#include "rpc/thrift-util.h"
-#include "runtime/collection-value-builder.h"
-#include "runtime/exec-env.h"
-#include "runtime/io/disk-io-mgr.h"
-#include "runtime/io/request-context.h"
-#include "runtime/tuple-row.h"
-#include "runtime/tuple.h"
-#include "runtime/runtime-state.h"
-#include "runtime/mem-pool.h"
-#include "util/bit-util.h"
-#include "util/codec.h"
-#include "util/debug-util.h"
-#include "util/dict-encoding.h"
-#include "util/rle-encoding.h"
-
-#include "common/names.h"
-
-// Provide a workaround for IMPALA-1658.
-DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
-    "When true, TIMESTAMPs read from files written by Parquet-MR (used by 
Hive) will "
-    "be converted from UTC to local time. Writes are unaffected.");
-
-// Max dictionary page header size in bytes. This is an estimate and only 
needs to be an
-// upper bound.
-static const int MAX_DICT_HEADER_SIZE = 100;
-
-// Max data page header size in bytes. This is an estimate and only needs to 
be an upper
-// bound. It is theoretically possible to have a page header of any size due 
to string
-// value statistics, but in practice we'll have trouble reading string values 
this large.
-// Also, this limit is in place to prevent impala from reading corrupt parquet 
files.
-DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size 
in bytes");
-
-// Trigger debug action on every other call of Read*ValueBatch() once at least 
128
-// tuples have been produced to simulate failure such as exceeding memory 
limit.
-// Triggering it every other call so as not to always fail on the first column 
reader
-// when materializing multiple columns. Failing on non-empty row batch tests 
proper
-// resources freeing by the Parquet scanner.
-#ifndef NDEBUG
-static int debug_count = 0;
-#define SHOULD_TRIGGER_DEBUG_ACTION(num_tuples) \
-    ((debug_count++ % 2) == 1 && num_tuples >= 128)
-#else
-#define SHOULD_TRIGGER_DEBUG_ACTION(x) (false)
-#endif
-
-using namespace impala::io;
-
-using parquet::Encoding;
-
-namespace impala {
-
-const string PARQUET_COL_MEM_LIMIT_EXCEEDED =
-    "ParquetColumnReader::$0() failed to allocate $1 bytes for $2.";
-
-Status ParquetLevelDecoder::Init(const string& filename, Encoding::type 
encoding,
-    MemPool* cache_pool, int cache_size, int max_level, uint8_t** data, int* 
data_size) {
-  DCHECK(*data != nullptr);
-  DCHECK_GE(*data_size, 0);
-  DCHECK_GT(cache_size, 0);
-  cache_size = BitUtil::RoundUpToPowerOf2(cache_size, 32);
-  encoding_ = encoding;
-  max_level_ = max_level;
-  filename_ = filename;
-  RETURN_IF_ERROR(InitCache(cache_pool, cache_size));
-
-  // Return because there is no level data to read, e.g., required field.
-  if (max_level == 0) return Status::OK();
-
-  int32_t num_bytes = 0;
-  switch (encoding) {
-    case Encoding::RLE: {
-      Status status;
-      if (!ReadWriteUtil::Read(data, data_size, &num_bytes, &status)) {
-        return status;
-      }
-      if (num_bytes < 0 || num_bytes > *data_size) {
-        return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename, 
num_bytes);
-      }
-      int bit_width = BitUtil::Log2Ceiling64(max_level + 1);
-      rle_decoder_.Reset(*data, num_bytes, bit_width);
-      break;
-    }
-    case parquet::Encoding::BIT_PACKED:
-      return Status(TErrorCode::PARQUET_BIT_PACKED_LEVELS, filename);
-    default: {
-      stringstream ss;
-      ss << "Unsupported encoding: " << encoding;
-      return Status(ss.str());
-    }
-  }
-  if (UNLIKELY(num_bytes < 0 || num_bytes > *data_size)) {
-    return Status(Substitute("Corrupt Parquet file '$0': $1 bytes of encoded 
levels but "
-        "only $2 bytes left in page", filename, num_bytes, *data_size));
-  }
-  *data += num_bytes;
-  *data_size -= num_bytes;
-  return Status::OK();
-}
-
-Status ParquetLevelDecoder::InitCache(MemPool* pool, int cache_size) {
-  num_cached_levels_ = 0;
-  cached_level_idx_ = 0;
-  // Memory has already been allocated.
-  if (cached_levels_ != nullptr) {
-    DCHECK_EQ(cache_size_, cache_size);
-    return Status::OK();
-  }
-
-  cached_levels_ = reinterpret_cast<uint8_t*>(pool->TryAllocate(cache_size));
-  if (cached_levels_ == nullptr) {
-    return pool->mem_tracker()->MemLimitExceeded(
-        nullptr, "Definition level cache", cache_size);
-  }
-  memset(cached_levels_, 0, cache_size);
-  cache_size_ = cache_size;
-  return Status::OK();
-}
-
-inline int16_t ParquetLevelDecoder::ReadLevel() {
-  if (UNLIKELY(!CacheHasNext())) {
-    if (UNLIKELY(!FillCache(cache_size_, &num_cached_levels_))) {
-      return HdfsParquetScanner::INVALID_LEVEL;
-    }
-    DCHECK_GE(num_cached_levels_, 0);
-    if (UNLIKELY(num_cached_levels_ == 0)) {
-      return HdfsParquetScanner::INVALID_LEVEL;
-    }
-  }
-  return CacheGetNext();
-}
-
-int32_t ParquetLevelDecoder::NextRepeatedRunLength() {
-  if (CacheHasNext() || encoding_ != Encoding::RLE) return 0;
-  // Treat always-zero levels as an infinitely long run of zeroes. Return the 
maximum
-  // run length allowed by the Parquet standard.
-  if (max_level_ == 0) return numeric_limits<int32_t>::max();
-  return rle_decoder_.NextNumRepeats();
-}
-
-uint8_t ParquetLevelDecoder::GetRepeatedValue(uint32_t num_to_consume) {
-  DCHECK(!CacheHasNext());
-  DCHECK_EQ(encoding_, Encoding::RLE);
-  // Treat always-zero levels as an infinitely long run of zeroes.
-  if (max_level_ == 0) return 0;
-  return rle_decoder_.GetRepeatedValue(num_to_consume);
-}
-
-Status ParquetLevelDecoder::CacheNextBatch(int vals_remaining) {
-  /// Fill the cache completely if there are enough values remaining.
-  /// Otherwise don't try to read more values than are left.
-  int batch_size = min(vals_remaining, cache_size_);
-  if (max_level_ > 0) {
-    if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_)
-            || num_cached_levels_ < batch_size)) {
-      return Status(decoding_error_code_, vals_remaining, filename_);
-    }
-  } else {
-    // No levels to read, e.g., because the field is required. The cache was
-    // already initialized with all zeros, so we can hand out those values.
-    DCHECK_EQ(max_level_, 0);
-    cached_level_idx_ = 0;
-    num_cached_levels_ = batch_size;
-  }
-  return Status::OK();
-}
-
-bool ParquetLevelDecoder::FillCache(int batch_size, int* num_cached_levels) {
-  DCHECK(!CacheHasNext());
-  DCHECK(num_cached_levels != nullptr);
-  DCHECK_GE(max_level_, 0);
-  DCHECK_GE(*num_cached_levels, 0);
-  cached_level_idx_ = 0;
-  if (max_level_ == 0) {
-    // No levels to read, e.g., because the field is required. The cache was
-    // already initialized with all zeros, so we can hand out those values.
-    *num_cached_levels = batch_size;
-    return true;
-  }
-  DCHECK_EQ(encoding_, parquet::Encoding::RLE);
-  *num_cached_levels = rle_decoder_.GetValues(batch_size, cached_levels_);
-  return *num_cached_levels > 0;
-}
-
-/// Per column type reader. InternalType is the datatype that Impala uses 
internally to
-/// store tuple data and PARQUET_TYPE is the corresponding primitive datatype 
(as defined
-/// in the parquet spec) that is used to store column values in parquet files.
-/// If MATERIALIZED is true, the column values are materialized into the slot 
described
-/// by slot_desc. If MATERIALIZED is false, the column values are not 
materialized, but
-/// the position can be accessed.
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
-class ScalarColumnReader : public BaseScalarColumnReader {
- public:
-  ScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
-      const SlotDescriptor* slot_desc);
-  virtual ~ScalarColumnReader() { }
-
-  virtual bool ReadValue(MemPool* pool, Tuple* tuple) override {
-    return ReadValue<true>(tuple);
-  }
-
-  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) override {
-    return ReadValue<false>(tuple);
-  }
-
-  virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
-      uint8_t* tuple_mem, int* num_values) override {
-    return ReadValueBatch<true>(max_values, tuple_size, tuple_mem, num_values);
-  }
-
-  virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int 
tuple_size,
-      uint8_t* tuple_mem, int* num_values) override {
-    return ReadValueBatch<false>(max_values, tuple_size, tuple_mem, 
num_values);
-  }
-
-  virtual DictDecoderBase* GetDictionaryDecoder() override {
-    return HasDictionaryDecoder() ? &dict_decoder_ : nullptr;
-  }
-
-  virtual bool NeedsConversion() override { return NeedsConversionInline(); }
-  virtual bool NeedsValidation() override { return NeedsValidationInline(); }
-
- protected:
-  template <bool IN_COLLECTION>
-  inline bool ReadValue(Tuple* tuple);
-
-  /// Implementation of the ReadValueBatch() functions specialized for this
-  /// column reader type. This function drives the reading of data pages and
-  /// caching of rep/def levels. Once a data page and cached levels are 
available,
-  /// it calls into a more specialized MaterializeValueBatch() for doing the 
actual
-  /// value materialization using the level caches.
-  /// Use RESTRICT so that the compiler knows that it is safe to cache member
-  /// variables in registers or on the stack (otherwise gcc's alias analysis
-  /// conservatively assumes that buffers like 'tuple_mem', 'num_values' or the
-  /// 'def_levels_' 'rep_levels_' buffers may alias 'this', especially with
-  /// -fno-strict-alias).
-  template <bool IN_COLLECTION>
-  bool ReadValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT 
tuple_mem,
-      int* RESTRICT num_values) RESTRICT;
-
-  /// Helper function for ReadValueBatch() above that performs value 
materialization.
-  /// It assumes a data page with remaining values is available, and that the 
def/rep
-  /// level caches have been populated. Materializes values into 'tuple_mem' 
with a
-  /// stride of 'tuple_size' and updates 'num_buffered_values_'. Returns the 
number of
-  /// values materialized in 'num_values'.
-  /// For efficiency, the simple special case of !MATERIALIZED && 
!IN_COLLECTION is not
-  /// handled in this function.
-  /// Use RESTRICT so that the compiler knows that it is safe to cache member
-  /// variables in registers or on the stack (otherwise gcc's alias analysis
-  /// conservatively assumes that buffers like 'tuple_mem', 'num_values' or the
-  /// 'def_levels_' 'rep_levels_' buffers may alias 'this', especially with
-  /// -fno-strict-alias).
-  template <bool IN_COLLECTION, Encoding::type ENCODING, bool NEEDS_CONVERSION>
-  bool MaterializeValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT 
tuple_mem,
-      int* RESTRICT num_values) RESTRICT;
-
-  /// Same as above, but dispatches to the appropriate templated 
implementation of
-  /// MaterializeValueBatch() based on 'page_encoding_' and 
NeedsConversionInline().
-  template <bool IN_COLLECTION>
-  bool MaterializeValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT 
tuple_mem,
-      int* RESTRICT num_values) RESTRICT;
-
-  /// Fast path for MaterializeValueBatch() that materializes values for a run 
of
-  /// repeated definition levels. Read up to 'max_values' values into 
'tuple_mem',
-  /// returning the number of values materialised in 'num_values'.
-  bool MaterializeValueBatchRepeatedDefLevel(int max_values, int tuple_size,
-      uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT;
-
-  /// Read 'num_to_read' values into a batch of tuples starting at 'tuple_mem'.
-  bool ReadSlots(
-      int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) 
RESTRICT;
-
-  /// Read 'num_to_read' values into a batch of tuples starting at 
'tuple_mem', when
-  /// conversion is needed.
-  bool ReadAndConvertSlots(
-      int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) 
RESTRICT;
-
-  /// Read 'num_to_read' values into a batch of tuples starting at 
'tuple_mem', when
-  /// conversion is not needed.
-  bool ReadSlotsNoConversion(
-      int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) 
RESTRICT;
-
-  /// Read 'num_to_read' position values into a batch of tuples starting at 
'tuple_mem'.
-  void ReadPositions(
-      int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) 
RESTRICT;
-
-  virtual Status CreateDictionaryDecoder(
-      uint8_t* values, int size, DictDecoderBase** decoder) override {
-    if (!dict_decoder_.template Reset<PARQUET_TYPE>(values, size, 
fixed_len_size_)) {
-      return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
-          slot_desc_->type().DebugString(), "could not decode dictionary");
-    }
-    dict_decoder_init_ = true;
-    *decoder = &dict_decoder_;
-    return Status::OK();
-  }
-
-  virtual bool HasDictionaryDecoder() override {
-    return dict_decoder_init_;
-  }
-
-  virtual void ClearDictionaryDecoder() override {
-    dict_decoder_init_ = false;
-  }
-
-  virtual Status InitDataPage(uint8_t* data, int size) override;
-
- private:
-  /// Writes the next value into the appropriate destination slot in 'tuple'. 
Returns
-  /// false if execution should be aborted for some reason, e.g. parse_error_ 
is set, the
-  /// query is cancelled, or the scan node limit was reached. Otherwise 
returns true.
-  ///
-  /// Force inlining - GCC does not always inline this into hot loops.
-  template <Encoding::type ENCODING, bool NEEDS_CONVERSION>
-  inline ALWAYS_INLINE bool ReadSlot(Tuple* tuple);
-
-  /// Reads position into 'pos' and updates 'pos_current_value_' based on 
'rep_level'.
-  /// This helper is only used on the batched decoding path where we need to 
reset
-  /// 'pos_current_value_' to 0 based on 'rep_level'.
-  inline ALWAYS_INLINE void ReadPositionBatched(int16_t rep_level, int64_t* 
pos);
-
-  /// Decode one value from *data into 'val' and advance *data. 'data_end' is 
one byte
-  /// past the end of the buffer. Return false and set 'parse_error_' if there 
is an
-  /// error decoding the value.
-  template <Encoding::type ENCODING>
-  inline ALWAYS_INLINE bool DecodeValue(
-      uint8_t** data, const uint8_t* data_end, InternalType* RESTRICT val) 
RESTRICT;
-
-  /// Decode multiple values into 'out_vals' with a stride of 'stride' bytes. 
Return
-  /// false and set 'parse_error_' if there is an error decoding any value.
-  inline ALWAYS_INLINE bool DecodeValues(
-      int64_t stride, int64_t count, InternalType* RESTRICT out_vals) RESTRICT;
-
-  /// Most column readers never require conversion, so we can avoid branches by
-  /// returning constant false. Column readers for types that require 
conversion
-  /// must specialize this function.
-  inline bool NeedsConversionInline() const {
-    DCHECK(!needs_conversion_);
-    return false;
-  }
-
-  /// Similar to NeedsConversion(), most column readers do not require 
validation,
-  /// so to avoid branches, we return constant false. In general, types where 
not
-  /// all possible bit representations of the data type are valid should be
-  /// validated.
-  inline bool NeedsValidationInline() const {
-    return false;
-  }
-
-  /// Converts and writes 'src' into 'slot' based on desc_->type()
-  bool ConvertSlot(const InternalType* src, void* slot) {
-    DCHECK(false);
-    return false;
-  }
-
-  /// Checks if 'val' is invalid, e.g. due to being out of the valid value 
range. If it
-  /// is invalid, logs the error and returns false. If the error should stop 
execution,
-  /// sets 'parent_->parse_status_'.
-  bool ValidateValue(InternalType* val) const {
-    DCHECK(false);
-    return false;
-  }
-
-  /// Pull out slow-path Status construction code
-  void __attribute__((noinline)) SetDictDecodeError() {
-    parent_->parse_status_ = Status(TErrorCode::PARQUET_DICT_DECODE_FAILURE, 
filename(),
-        slot_desc_->type().DebugString(), stream_->file_offset());
-  }
-
-  void __attribute__((noinline)) SetPlainDecodeError() {
-    parent_->parse_status_ = Status(TErrorCode::PARQUET_CORRUPT_PLAIN_VALUE, 
filename(),
-        slot_desc_->type().DebugString(), stream_->file_offset());
-  }
-
-  /// Dictionary decoder for decoding column values.
-  DictDecoder<InternalType> dict_decoder_;
-
-  /// True if dict_decoder_ has been initialized with a dictionary page.
-  bool dict_decoder_init_ = false;
-
-  /// true if decoded values must be converted before being written to an 
output tuple.
-  bool needs_conversion_ = false;
-
-  /// The size of this column with plain encoding for FIXED_LEN_BYTE_ARRAY, or
-  /// the max length for VARCHAR columns. Unused otherwise.
-  int fixed_len_size_;
-
-  /// Contains extra data needed for Timestamp decoding.
-  ParquetTimestampDecoder timestamp_decoder_;
-
-  /// Allocated from parent_->perm_pool_ if NeedsConversion() is true and null 
otherwise.
-  uint8_t* conversion_buffer_ = nullptr;
-};
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
-ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::ScalarColumnReader(
-    HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor* 
slot_desc)
-  : BaseScalarColumnReader(parent, node, slot_desc),
-    dict_decoder_(parent->scan_node_->mem_tracker()) {
-  if (!MATERIALIZED) {
-    // We're not materializing any values, just counting them. No need (or 
ability) to
-    // initialize state used to materialize values.
-    DCHECK(slot_desc_ == nullptr);
-    return;
-  }
-
-  DCHECK(slot_desc_ != nullptr);
-  DCHECK_NE(slot_desc_->type().type, TYPE_BOOLEAN);
-  if (slot_desc_->type().type == TYPE_DECIMAL
-      && PARQUET_TYPE == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
-    fixed_len_size_ = node.element->type_length;
-  } else if (slot_desc_->type().type == TYPE_VARCHAR) {
-    fixed_len_size_ = slot_desc_->type().len;
-  } else {
-    fixed_len_size_ = -1;
-  }
-
-  needs_conversion_ = slot_desc_->type().type == TYPE_CHAR;
-
-  if (slot_desc_->type().type == TYPE_TIMESTAMP) {
-    timestamp_decoder_ = parent->CreateTimestampDecoder(*node.element);
-    dict_decoder_.SetTimestampHelper(timestamp_decoder_);
-    needs_conversion_ = timestamp_decoder_.NeedsConversion();
-  }
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
-Status ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::InitDataPage(
-    uint8_t* data, int size) {
-  // Data can be empty if the column contains all NULLs
-  DCHECK_GE(size, 0);
-  page_encoding_ = current_page_header_.data_page_header.encoding;
-  if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY &&
-      page_encoding_ != parquet::Encoding::PLAIN) {
-    return GetUnsupportedDecodingError();
-  }
-
-  // If slot_desc_ is NULL, we don't need so decode any values so 
dict_decoder_ does
-  // not need to be initialized.
-  if (page_encoding_ == Encoding::PLAIN_DICTIONARY && slot_desc_ != nullptr) {
-    if (!dict_decoder_init_) {
-      return Status("File corrupt. Missing dictionary page.");
-    }
-    RETURN_IF_ERROR(dict_decoder_.SetData(data, size));
-  }
-  // Allocate a temporary buffer to hold InternalType values if we need to 
convert
-  // before writing to the final slot.
-  if (NeedsConversionInline() && conversion_buffer_ == nullptr) {
-    int64_t buffer_size = sizeof(InternalType) * parent_->state_->batch_size();
-    conversion_buffer_ =
-        parent_->perm_pool_->TryAllocateAligned(buffer_size, 
alignof(InternalType));
-    if (conversion_buffer_ == nullptr) {
-      return 
parent_->perm_pool_->mem_tracker()->MemLimitExceeded(parent_->state_,
-          "Failed to allocate conversion buffer in Parquet scanner", 
buffer_size);
-    }
-  }
-  // TODO: Perform filter selectivity checks here.
-  return Status::OK();
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
-template <bool IN_COLLECTION>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValue(
-    Tuple* tuple) {
-  // NextLevels() should have already been called and def and rep levels 
should be in
-  // valid range.
-  DCHECK_GE(rep_level_, 0);
-  DCHECK_GE(def_level_, 0);
-  DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
-      "Caller should have called NextLevels() until we are ready to read a 
value";
-
-  if (MATERIALIZED) {
-    if (def_level_ >= max_def_level()) {
-      bool continue_execution;
-      if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
-        continue_execution = NeedsConversionInline() ?
-            ReadSlot<Encoding::PLAIN_DICTIONARY, true>(tuple) :
-            ReadSlot<Encoding::PLAIN_DICTIONARY, false>(tuple);
-      } else {
-        DCHECK_EQ(page_encoding_, Encoding::PLAIN);
-        continue_execution = NeedsConversionInline() ?
-            ReadSlot<Encoding::PLAIN, true>(tuple) :
-            ReadSlot<Encoding::PLAIN, false>(tuple);
-      }
-      if (!continue_execution) return false;
-    } else {
-      tuple->SetNull(null_indicator_offset_);
-    }
-  }
-  return NextLevels<IN_COLLECTION>();
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
-template <bool IN_COLLECTION>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::ReadValueBatch(
-    int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
-    int* RESTRICT num_values) RESTRICT {
-  // Repetition level is only present if this column is nested in a collection 
type.
-  if (IN_COLLECTION) {
-    DCHECK_GT(max_rep_level(), 0) << slot_desc()->DebugString();
-  } else {
-    DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();
-  }
-
-  int val_count = 0;
-  bool continue_execution = true;
-  while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
-    DCHECK_GE(num_buffered_values_, 0);
-    // Read next page if necessary.
-    if (num_buffered_values_ == 0) {
-      if (!NextPage()) {
-        continue_execution = parent_->parse_status_.ok();
-        continue;
-      }
-    }
-
-    // Not materializing anything - skip decoding any levels and rely on the 
value
-    // count from page metadata to return the correct number of rows.
-    if (!MATERIALIZED && !IN_COLLECTION) {
-      int vals_to_add = min(num_buffered_values_, max_values - val_count);
-      val_count += vals_to_add;
-      num_buffered_values_ -= vals_to_add;
-      DCHECK_GE(num_buffered_values_, 0);
-      continue;
-    }
-    // Fill the rep level cache if needed. We are flattening out the fields of 
the
-    // nested collection into the top-level tuple returned by the scan, so we 
don't
-    // care about the nesting structure unless the position slot is being 
populated.
-    if (IN_COLLECTION && pos_slot_desc_ != nullptr && 
!rep_levels_.CacheHasNext()) {
-      parent_->parse_status_.MergeStatus(
-          rep_levels_.CacheNextBatch(num_buffered_values_));
-      if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-    }
-
-    const int remaining_val_capacity = max_values - val_count;
-    uint8_t* next_tuple = tuple_mem + val_count * tuple_size;
-    if (def_levels_.NextRepeatedRunLength() > 0) {
-      // Fast path to materialize a run of values with the same definition 
level. This
-      // avoids checking for NULL/not-NULL for every value.
-      int ret_val_count = 0;
-      continue_execution = MaterializeValueBatchRepeatedDefLevel(
-          remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
-      val_count += ret_val_count;
-    } else {
-      // We don't have a repeated run - cache def levels and process 
value-by-value.
-      if (!def_levels_.CacheHasNext()) {
-        parent_->parse_status_.MergeStatus(
-            def_levels_.CacheNextBatch(num_buffered_values_));
-        if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-      }
-
-      // Read data page and cached levels to materialize values.
-      int ret_val_count = 0;
-      continue_execution = MaterializeValueBatch<IN_COLLECTION>(
-          remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
-      val_count += ret_val_count;
-    }
-    if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
-      continue_execution &= ColReaderDebugAction(&val_count);
-    }
-  }
-  *num_values = val_count;
-  return continue_execution;
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
-template <bool IN_COLLECTION, Encoding::type ENCODING, bool NEEDS_CONVERSION>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::MaterializeValueBatch(
-    int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
-    int* RESTRICT num_values) RESTRICT {
-  DCHECK(MATERIALIZED || IN_COLLECTION);
-  DCHECK_GT(num_buffered_values_, 0);
-  DCHECK(def_levels_.CacheHasNext());
-  if (IN_COLLECTION && pos_slot_desc_ != nullptr) 
DCHECK(rep_levels_.CacheHasNext());
-  const int cache_start_idx = def_levels_.CacheCurrIdx();
-  uint8_t* curr_tuple = tuple_mem;
-  int val_count = 0;
-  DCHECK_LE(def_levels_.CacheRemaining(), num_buffered_values_);
-  max_values = min(max_values, num_buffered_values_);
-  while (def_levels_.CacheHasNext() && val_count < max_values) {
-    Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
-    int def_level = def_levels_.CacheGetNext();
-
-    if (IN_COLLECTION) {
-      if (def_level < def_level_of_immediate_repeated_ancestor()) {
-        // A containing repeated field is empty or NULL. Skip the value but
-        // move to the next repetition level if necessary.
-        if (pos_slot_desc_ != nullptr) rep_levels_.CacheSkipLevels(1);
-        continue;
-      }
-      if (pos_slot_desc_ != nullptr) {
-        ReadPositionBatched(rep_levels_.CacheGetNext(),
-            tuple->GetBigIntSlot(pos_slot_desc_->tuple_offset()));
-      }
-    }
-
-    if (MATERIALIZED) {
-      if (def_level >= max_def_level()) {
-        bool continue_execution = ReadSlot<ENCODING, NEEDS_CONVERSION>(tuple);
-        if (UNLIKELY(!continue_execution)) return false;
-      } else {
-        tuple->SetNull(null_indicator_offset_);
-      }
-    }
-    curr_tuple += tuple_size;
-    ++val_count;
-  }
-  num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx);
-  DCHECK_GE(num_buffered_values_, 0);
-  *num_values = val_count;
-  return true;
-}
-
-// Note that the structure of this function is very similar to 
MaterializeValueBatch()
-// above, except it is unrolled to operate on multiple values at a time.
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE,
-    MATERIALIZED>::MaterializeValueBatchRepeatedDefLevel(int max_values, int 
tuple_size,
-    uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT {
-  DCHECK_GT(num_buffered_values_, 0);
-  if (pos_slot_desc_ != nullptr) DCHECK(rep_levels_.CacheHasNext());
-  int32_t def_level_repeats = def_levels_.NextRepeatedRunLength();
-  DCHECK_GT(def_level_repeats, 0);
-  // Peek at the def level. The number of def levels we'll consume depends on 
several
-  // conditions below.
-  uint8_t def_level = def_levels_.GetRepeatedValue(0);
-  int32_t num_def_levels_to_consume = 0;
-
-  if (def_level < def_level_of_immediate_repeated_ancestor()) {
-    DCHECK_GT(max_rep_level_, 0) << "Only possible if in a collection.";
-    // A containing repeated field is empty or NULL. We don't need to return 
any values
-    // but need to advance any rep levels.
-    if (pos_slot_desc_ != nullptr) {
-      num_def_levels_to_consume =
-          min<uint32_t>(def_level_repeats, rep_levels_.CacheRemaining());
-      rep_levels_.CacheSkipLevels(num_def_levels_to_consume);
-    } else {
-      num_def_levels_to_consume = def_level_repeats;
-    }
-    *num_values = 0;
-  } else {
-    // Cannot consume more levels than allowed by buffered input values and 
output space.
-    num_def_levels_to_consume =
-        min(num_buffered_values_, min(max_values, def_level_repeats));
-    if (pos_slot_desc_ != nullptr) {
-      num_def_levels_to_consume =
-          min<uint32_t>(num_def_levels_to_consume, 
rep_levels_.CacheRemaining());
-      ReadPositions(num_def_levels_to_consume, tuple_size, tuple_mem);
-    }
-    if (MATERIALIZED) {
-      if (def_level >= max_def_level()) {
-        if (!ReadSlots(num_def_levels_to_consume, tuple_size, tuple_mem)) {
-          return false;
-        }
-      } else {
-        Tuple::SetNullIndicators(
-            null_indicator_offset_, num_def_levels_to_consume, tuple_size, 
tuple_mem);
-      }
-    }
-    *num_values = num_def_levels_to_consume;
-  }
-  // We now know how many we actually consumed.
-  def_levels_.GetRepeatedValue(num_def_levels_to_consume);
-  num_buffered_values_ -= num_def_levels_to_consume;
-  DCHECK_GE(num_buffered_values_, 0);
-  return true;
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
-template <bool IN_COLLECTION>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::MaterializeValueBatch(
-    int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
-    int* RESTRICT num_values) RESTRICT {
-  // Dispatch to the correct templated implementation of 
MaterializeValueBatch().
-  if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
-    if (NeedsConversionInline()) {
-      return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN_DICTIONARY, 
true>(
-          max_values, tuple_size, tuple_mem, num_values);
-    } else {
-      return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN_DICTIONARY, 
false>(
-          max_values, tuple_size, tuple_mem, num_values);
-    }
-  } else {
-    DCHECK_EQ(page_encoding_, Encoding::PLAIN);
-    if (NeedsConversionInline()) {
-      return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN, true>(
-          max_values, tuple_size, tuple_mem, num_values);
-    } else {
-      return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN, false>(
-          max_values, tuple_size, tuple_mem, num_values);
-    }
-  }
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
-template <Encoding::type ENCODING, bool NEEDS_CONVERSION>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlot(
-    Tuple* RESTRICT tuple) RESTRICT {
-  void* slot = tuple->GetSlot(tuple_offset_);
-  // Use an uninitialized stack allocation for temporary value to avoid running
-  // constructors doing work unnecessarily, e.g. if T == StringValue.
-  alignas(InternalType) uint8_t val_buf[sizeof(InternalType)];
-  InternalType* val_ptr =
-      reinterpret_cast<InternalType*>(NEEDS_CONVERSION ? val_buf : slot);
-
-  if (UNLIKELY(!DecodeValue<ENCODING>(&data_, data_end_, val_ptr))) return 
false;
-  if (UNLIKELY(NeedsValidationInline() && !ValidateValue(val_ptr))) {
-    if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-    // The value is invalid but execution should continue - set the null 
indicator and
-    // skip conversion.
-    tuple->SetNull(null_indicator_offset_);
-    return true;
-  }
-  if (NEEDS_CONVERSION && UNLIKELY(!ConvertSlot(val_ptr, slot))) return false;
-  return true;
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlots(
-    int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT 
{
-  if (NeedsConversionInline()) {
-    return ReadAndConvertSlots(num_to_read, tuple_size, tuple_mem);
-  } else {
-    return ReadSlotsNoConversion(num_to_read, tuple_size, tuple_mem);
-  }
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::ReadAndConvertSlots(
-    int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT 
{
-  DCHECK(NeedsConversionInline());
-  DCHECK(conversion_buffer_ != nullptr);
-  InternalType* first_val = 
reinterpret_cast<InternalType*>(conversion_buffer_);
-  // Decode into the conversion buffer before doing the conversion into the 
output tuples.
-  if (!DecodeValues(sizeof(InternalType), num_to_read, first_val)) return 
false;
-
-  InternalType* curr_val = first_val;
-  uint8_t* curr_tuple = tuple_mem;
-  for (int64_t i = 0; i < num_to_read; ++i, ++curr_val, curr_tuple += 
tuple_size) {
-    Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
-    if (NeedsValidationInline() && UNLIKELY(!ValidateValue(curr_val))) {
-      if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-      // The value is invalid but execution should continue - set the null 
indicator and
-      // skip conversion.
-      tuple->SetNull(null_indicator_offset_);
-      continue;
-    }
-    if (UNLIKELY(!ConvertSlot(curr_val, tuple->GetSlot(tuple_offset_)))) {
-      return false;
-    }
-  }
-  return true;
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::ReadSlotsNoConversion(
-    int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT 
{
-  DCHECK(!NeedsConversionInline());
-  // No conversion needed - decode directly into the output slots.
-  InternalType* first_slot = reinterpret_cast<InternalType*>(tuple_mem + 
tuple_offset_);
-  if (!DecodeValues(tuple_size, num_to_read, first_slot)) return false;
-  if (NeedsValidationInline()) {
-    // Validate the written slots.
-    uint8_t* curr_tuple = tuple_mem;
-    for (int64_t i = 0; i < num_to_read; ++i, curr_tuple += tuple_size) {
-      Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
-      InternalType* val = 
static_cast<InternalType*>(tuple->GetSlot(tuple_offset_));
-      if (UNLIKELY(!ValidateValue(val))) {
-        if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-        // The value is invalid but execution should continue - set the null 
indicator and
-        // skip conversion.
-        tuple->SetNull(null_indicator_offset_);
-      }
-    }
-  }
-  return true;
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
-template <Encoding::type ENCODING>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValue(
-    uint8_t** RESTRICT data, const uint8_t* RESTRICT data_end,
-    InternalType* RESTRICT val) RESTRICT {
-  DCHECK_EQ(page_encoding_, ENCODING);
-  if (ENCODING == Encoding::PLAIN_DICTIONARY) {
-    if (UNLIKELY(!dict_decoder_.GetNextValue(val))) {
-      SetDictDecodeError();
-      return false;
-    }
-  } else {
-    DCHECK_EQ(ENCODING, Encoding::PLAIN);
-    int encoded_len = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(
-        *data, data_end, fixed_len_size_, val);
-    if (UNLIKELY(encoded_len < 0)) {
-      SetPlainDecodeError();
-      return false;
-    }
-    *data += encoded_len;
-  }
-  return true;
-}
-
-template <>
-template <Encoding::type ENCODING>
-bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, 
true>::DecodeValue(
-    uint8_t** RESTRICT data, const uint8_t* RESTRICT data_end,
-    TimestampValue* RESTRICT val) RESTRICT {
-  DCHECK_EQ(page_encoding_, ENCODING);
-  if (ENCODING == Encoding::PLAIN_DICTIONARY) {
-    if (UNLIKELY(!dict_decoder_.GetNextValue(val))) {
-      SetDictDecodeError();
-      return false;
-    }
-  } else {
-    DCHECK_EQ(ENCODING, Encoding::PLAIN);
-    int encoded_len =
-        timestamp_decoder_.Decode<parquet::Type::INT64>(*data, data_end, val);
-    if (UNLIKELY(encoded_len < 0)) {
-      SetPlainDecodeError();
-      return false;
-    }
-    *data += encoded_len;
-  }
-  return true;
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::DecodeValues(
-    int64_t stride, int64_t count, InternalType* RESTRICT out_vals) RESTRICT {
-  if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
-    if (UNLIKELY(!dict_decoder_.GetNextValues(out_vals, stride, count))) {
-      SetDictDecodeError();
-      return false;
-    }
-  } else {
-    DCHECK_EQ(page_encoding_, Encoding::PLAIN);
-    int64_t encoded_len = ParquetPlainEncoder::DecodeBatch<InternalType, 
PARQUET_TYPE>(
-        data_, data_end_, fixed_len_size_, count, stride, out_vals);
-    if (UNLIKELY(encoded_len < 0)) {
-      SetPlainDecodeError();
-      return false;
-    }
-    data_ += encoded_len;
-  }
-  return true;
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
-void ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::ReadPositionBatched(
-    int16_t rep_level, int64_t* pos) {
-  // Reset position counter if we are at the start of a new parent collection.
-  if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0;
-  *pos = pos_current_value_++;
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
-void ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::ReadPositions(
-    int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT 
{
-  const int pos_slot_offset = pos_slot_desc()->tuple_offset();
-  void* first_slot = 
reinterpret_cast<Tuple*>(tuple_mem)->GetSlot(pos_slot_offset);
-  StrideWriter<int64_t> out{reinterpret_cast<int64_t*>(first_slot), 
tuple_size};
-  for (int64_t i = 0; i < num_to_read; ++i) {
-    ReadPositionBatched(rep_levels_.CacheGetNext(), out.Advance());
-  }
-}
-
-template <>
-inline bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY,
-    true>::NeedsConversionInline() const {
-  return needs_conversion_;
-}
-
-template <>
-bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, 
true>::ConvertSlot(
-    const StringValue* src, void* slot) {
-  DCHECK(slot_desc() != nullptr);
-  DCHECK(slot_desc()->type().type == TYPE_CHAR);
-  int char_len = slot_desc()->type().len;
-  int unpadded_len = min(char_len, src->len);
-  char* dst_char = reinterpret_cast<char*>(slot);
-  memcpy(dst_char, src->ptr, unpadded_len);
-  StringValue::PadWithSpaces(dst_char, char_len, unpadded_len);
-  return true;
-}
-
-template <>
-inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
-::NeedsConversionInline() const {
-  return needs_conversion_;
-}
-
-template <>
-inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>
-::NeedsConversionInline() const {
-  return needs_conversion_;
-}
-
-template <>
-bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, 
true>::ConvertSlot(
-    const TimestampValue* src, void* slot) {
-  // Conversion should only happen when this flag is enabled.
-  DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps);
-  DCHECK(timestamp_decoder_.NeedsConversion());
-  TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
-  *dst_ts = *src;
-  // TODO: IMPALA-7862: converting timestamps after validating them can move 
them out of
-  // range. We should either validate after conversion or require conversion 
to produce an
-  // in-range value.
-  timestamp_decoder_.ConvertToLocalTime(dst_ts);
-  return true;
-}
-
-template <>
-bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, 
true>::ConvertSlot(
-    const TimestampValue* src, void* slot) {
-  DCHECK(timestamp_decoder_.NeedsConversion());
-  TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
-  *dst_ts = *src;
-  // TODO: IMPALA-7862: converting timestamps after validating them can move 
them out of
-  // range. We should either validate after conversion or require conversion 
to produce an
-  // in-range value.
-  timestamp_decoder_.ConvertToLocalTime(static_cast<TimestampValue*>(dst_ts));
-  return true;
-}
-
-template <>
-inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
-::NeedsValidationInline() const {
-  return true;
-}
-
-template <>
-inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>
-::NeedsValidationInline() const {
-  return true;
-}
-
-template <>
-bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, 
true>::ValidateValue(
-    TimestampValue* val) const {
-  if (UNLIKELY(!TimestampValue::IsValidDate(val->date())
-      || !TimestampValue::IsValidTime(val->time()))) {
-    // If both are corrupt, invalid time takes precedence over invalid date, 
because
-    // invalid date may come from a more or less functional encoder that does 
not respect
-    // the 1400..9999 limit, while an invalid time is a good indicator of 
buggy encoder
-    // or memory garbage.
-    TErrorCode::type errorCode = TimestampValue::IsValidTime(val->time())
-        ? TErrorCode::PARQUET_TIMESTAMP_OUT_OF_RANGE
-        : TErrorCode::PARQUET_TIMESTAMP_INVALID_TIME_OF_DAY;
-    ErrorMsg msg(errorCode, filename(), node_.element->name);
-    Status status = parent_->state_->LogOrReturnError(msg);
-    if (!status.ok()) parent_->parse_status_ = status;
-    return false;
-  }
-  return true;
-}
-
-template <>
-bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, 
true>::ValidateValue(
-    TimestampValue* val) const {
-  // The range was already checked during the int64_t->TimestampValue 
conversion, which
-  // sets the date to invalid if it was out of range.
-  if (UNLIKELY(!val->HasDate())) {
-    ErrorMsg msg(TErrorCode::PARQUET_TIMESTAMP_OUT_OF_RANGE,
-        filename(), node_.element->name);
-    Status status = parent_->state_->LogOrReturnError(msg);
-    if (!status.ok()) parent_->parse_status_ = status;
-    return false;
-  }
-  DCHECK(TimestampValue::IsValidDate(val->date()));
-  DCHECK(TimestampValue::IsValidTime(val->time()));
-  return true;
-}
-
-class BoolColumnReader : public BaseScalarColumnReader {
- public:
-  BoolColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
-      const SlotDescriptor* slot_desc)
-    : BaseScalarColumnReader(parent, node, slot_desc) {
-    if (slot_desc_ != nullptr) DCHECK_EQ(slot_desc_->type().type, 
TYPE_BOOLEAN);
-  }
-
-  virtual ~BoolColumnReader() { }
-
-  virtual bool ReadValue(MemPool* pool, Tuple* tuple) override {
-    return ReadValue<true>(tuple);
-  }
-
-  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) override {
-    return ReadValue<false>(tuple);
-  }
-
- protected:
-  virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
-      DictDecoderBase** decoder) override {
-    DCHECK(false) << "Dictionary encoding is not supported for bools. Should 
never "
-                  << "have gotten this far.";
-    return Status::OK();
-  }
-
-  virtual bool HasDictionaryDecoder() override {
-    // Decoder should never be created for bools.
-    return false;
-  }
-
-  virtual void ClearDictionaryDecoder() override { }
-
-  virtual Status InitDataPage(uint8_t* data, int size) override;
-
- private:
-  template <bool IN_COLLECTION>
-  inline bool ReadValue(Tuple* tuple);
-
-  /// Decodes the next value into 'value'. Returns false and sets
-  /// 'parent_->parse_status_' if an error is encountered decoding the
-  /// value. Otherwise returns true.
-  template <bool IN_COLLECTION>
-  inline bool DecodeValue(bool* value);
-
-  /// A buffer to store unpacked values. Must be a multiple of 32 size to use 
the
-  /// batch-oriented interface of BatchedBitReader.
-  static const int UNPACKED_BUFFER_LEN = 128;
-  bool unpacked_values_[UNPACKED_BUFFER_LEN];
-
-  /// The number of valid values in 'unpacked_values_'.
-  int num_unpacked_values_ = 0;
-
-  /// The next value to return from 'unpacked_values_'.
-  int unpacked_value_idx_ = 0;
-
-  /// Bit packed decoder, used if 'encoding_' is PLAIN.
-  BatchedBitReader bool_values_;
-
-  /// RLE decoder, used if 'encoding_' is RLE.
-  RleBatchDecoder<bool> rle_decoder_;
-};
-
-Status BoolColumnReader::InitDataPage(uint8_t* data, int size) {
-  page_encoding_ = current_page_header_.data_page_header.encoding;
-  // Only the relevant decoder is initialized for a given data page.
-  switch (page_encoding_) {
-    case parquet::Encoding::PLAIN:
-      bool_values_.Reset(data, size);
-      break;
-    case parquet::Encoding::RLE:
-      // The first 4 bytes contain the size of the encoded data. This 
information is
-      // redundant, as this is the last part of the data page, and the number 
of
-      // remaining bytes is already known.
-      rle_decoder_.Reset(data + 4, size - 4, 1);
-      break;
-    default:
-      return GetUnsupportedDecodingError();
-  }
-  num_unpacked_values_ = 0;
-  unpacked_value_idx_ = 0;
-  return Status::OK();
-}
-
-template <bool IN_COLLECTION>
-bool BoolColumnReader::ReadValue(Tuple* tuple) {
-  DCHECK(slot_desc_ != nullptr);
-  // Def and rep levels should be in valid range.
-  DCHECK_GE(rep_level_, 0);
-  DCHECK_GE(def_level_, 0);
-  DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
-      "Caller should have called NextLevels() until we are ready to read a 
value";
-
-  if (def_level_ >= max_def_level()) {
-    bool* slot = tuple->GetBoolSlot(tuple_offset_);
-    if (UNLIKELY(!DecodeValue<IN_COLLECTION>(slot))) return false;
-  } else {
-    // Null value
-    tuple->SetNull(null_indicator_offset_);
-  }
-  return NextLevels<IN_COLLECTION>();
-}
-
-template <bool IN_COLLECTION>
-bool BoolColumnReader::DecodeValue(bool* value) {
-  if (LIKELY(unpacked_value_idx_ < num_unpacked_values_)) {
-    *value = unpacked_values_[unpacked_value_idx_++];
-  } else {
-    // Unpack as many values as we can into the buffer. We expect to read at 
least one
-    // value.
-    if (page_encoding_ == parquet::Encoding::PLAIN) {
-      num_unpacked_values_ =
-          bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, 
&unpacked_values_[0]);
-    } else {
-      num_unpacked_values_ =
-          rle_decoder_.GetValues(UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
-    }
-
-    if (UNLIKELY(num_unpacked_values_ == 0)) {
-      parent_->parse_status_ = Status("Invalid bool column.");
-      return false;
-    }
-    *value = unpacked_values_[0];
-    unpacked_value_idx_ = 1;
-  }
-  return true;
-}
-
-// Change 'val_count' to zero to exercise IMPALA-5197. This verifies the error 
handling
-// path doesn't falsely report that the file is corrupted.
-bool ParquetColumnReader::ColReaderDebugAction(int* val_count) {
-#ifndef NDEBUG
-  Status status = parent_->ScannerDebugAction();
-  if (!status.ok()) {
-    if (!status.IsCancelled()) parent_->parse_status_.MergeStatus(status);
-    *val_count = 0;
-    return false;
-  }
-#endif
-  return true;
-}
-
-bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values,
-    int tuple_size, uint8_t* tuple_mem, int* num_values) {
-  // The below loop requires that NextLevels() was called previously to 
populate
-  // 'def_level_' and 'rep_level_'. Ensure it is called at the start of each
-  // row group.
-  if (def_level_ == HdfsParquetScanner::INVALID_LEVEL && !NextLevels()) return 
false;
-
-  int val_count = 0;
-  bool continue_execution = true;
-  while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
-    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * 
tuple_size);
-    if (def_level_ < def_level_of_immediate_repeated_ancestor()) {
-      // A containing repeated field is empty or NULL
-      continue_execution = NextLevels();
-      continue;
-    }
-    // Fill in position slot if applicable
-    if (pos_slot_desc_ != nullptr) {
-      
ReadPositionNonBatched(tuple->GetBigIntSlot(pos_slot_desc()->tuple_offset()));
-    }
-    continue_execution = ReadValue(pool, tuple);
-    ++val_count;
-    if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
-      continue_execution &= ColReaderDebugAction(&val_count);
-    }
-  }
-  *num_values = val_count;
-  return continue_execution;
-}
-
-bool ParquetColumnReader::ReadNonRepeatedValueBatch(MemPool* pool,
-    int max_values, int tuple_size, uint8_t* tuple_mem, int* num_values) {
-  // The below loop requires that NextLevels() was called previously to 
populate
-  // 'def_level_' and 'rep_level_'. Ensure it is called at the start of each
-  // row group.
-  if (def_level_ == HdfsParquetScanner::INVALID_LEVEL && !NextLevels()) return 
false;
-
-  int val_count = 0;
-  bool continue_execution = true;
-  while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
-    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * 
tuple_size);
-    continue_execution = ReadNonRepeatedValue(pool, tuple);
-    ++val_count;
-    if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
-      continue_execution &= ColReaderDebugAction(&val_count);
-    }
-  }
-  *num_values = val_count;
-  return continue_execution;
-}
-
-void ParquetColumnReader::ReadPositionNonBatched(int64_t* pos) {
-  // NextLevels() should have already been called
-  DCHECK_GE(rep_level_, 0);
-  DCHECK_GE(def_level_, 0);
-  DCHECK_GE(pos_current_value_, 0);
-  DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
-      "Caller should have called NextLevels() until we are ready to read a 
value";
-  *pos = pos_current_value_++;
-}
-
-// In 1.1, we had a bug where the dictionary page metadata was not set. 
Returns true
-// if this matches those versions and compatibility workarounds need to be 
used.
-static bool RequiresSkippedDictionaryHeaderCheck(
-    const ParquetFileVersion& v) {
-  if (v.application != "impala") return false;
-  return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal);
-}
-
-Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc,
-    const parquet::ColumnChunk& col_chunk, int row_group_idx) {
-  // Ensure metadata is valid before using it to initialize the reader.
-  
RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(parent_->file_metadata_,
-      parent_->filename(), row_group_idx, col_idx(), schema_element(),
-      parent_->state_));
-  num_buffered_values_ = 0;
-  data_ = nullptr;
-  data_end_ = nullptr;
-  stream_ = nullptr;
-  io_reservation_ = 0;
-  metadata_ = &col_chunk.meta_data;
-  num_values_read_ = 0;
-  def_level_ = HdfsParquetScanner::INVALID_LEVEL;
-  // See ColumnReader constructor.
-  rep_level_ = max_rep_level() == 0 ? 0 : HdfsParquetScanner::INVALID_LEVEL;
-  pos_current_value_ = HdfsParquetScanner::INVALID_POS;
-
-  if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
-    RETURN_IF_ERROR(Codec::CreateDecompressor(
-        nullptr, false, ConvertParquetToImpalaCodec(metadata_->codec), 
&decompressor_));
-  }
-  int64_t col_start = col_chunk.meta_data.data_page_offset;
-  if (col_chunk.meta_data.__isset.dictionary_page_offset) {
-    // Already validated in ValidateColumnOffsets()
-    DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start);
-    col_start = col_chunk.meta_data.dictionary_page_offset;
-  }
-  int64_t col_len = col_chunk.meta_data.total_compressed_size;
-  if (col_len <= 0) {
-    return Status(Substitute("File '$0' contains invalid column chunk size: 
$1",
-        filename(), col_len));
-  }
-  int64_t col_end = col_start + col_len;
-
-  // Already validated in ValidateColumnOffsets()
-  DCHECK_GT(col_end, 0);
-  DCHECK_LT(col_end, file_desc.file_length);
-  const ParquetFileVersion& file_version = parent_->file_version_;
-  if (file_version.application == "parquet-mr" && file_version.VersionLt(1, 2, 
9)) {
-    // The Parquet MR writer had a bug in 1.2.8 and below where it didn't 
include the
-    // dictionary page header size in total_compressed_size and 
total_uncompressed_size
-    // (see IMPALA-694). We pad col_len to compensate.
-    int64_t bytes_remaining = file_desc.file_length - col_end;
-    int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining);
-    col_len += pad;
-  }
-
-  // TODO: this will need to change when we have co-located files and the 
columns
-  // are different files.
-  if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) {
-    return Status(Substitute("Expected parquet column file path '$0' to match "
-        "filename '$1'", col_chunk.file_path, filename()));
-  }
-
-  const ScanRange* metadata_range = parent_->metadata_range_;
-  int64_t partition_id = parent_->context_->partition_descriptor()->id();
-  const ScanRange* split_range =
-      
static_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split;
-  // Determine if the column is completely contained within a local split.
-  bool col_range_local = split_range->expected_local()
-      && col_start >= split_range->offset()
-      && col_end <= split_range->offset() + split_range->len();
-  scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
-      filename(), col_len, col_start, partition_id, split_range->disk_id(),
-      col_range_local,
-      BufferOpts(split_range->try_cache(), file_desc.mtime));
-  ClearDictionaryDecoder();
-  return Status::OK();
-}
-
-Status BaseScalarColumnReader::StartScan() {
-  DCHECK(scan_range_ != nullptr) << "Must Reset() before starting scan.";
-  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
-  ScannerContext* context = parent_->context_;
-  DCHECK_GT(io_reservation_, 0);
-  bool needs_buffers;
-  RETURN_IF_ERROR(parent_->scan_node_->reader_context()->StartScanRange(
-      scan_range_, &needs_buffers));
-  if (needs_buffers) {
-    RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
-        context->bp_client(), scan_range_, io_reservation_));
-  }
-  stream_ = parent_->context_->AddStream(scan_range_, io_reservation_);
-  DCHECK(stream_ != nullptr);
-  return Status::OK();
-}
-
-Status BaseScalarColumnReader::ReadPageHeader(bool peek,
-    parquet::PageHeader* next_page_header, uint32_t* next_header_size, bool* 
eos) {
-  DCHECK(stream_ != nullptr);
-  *eos = false;
-
-  uint8_t* buffer;
-  int64_t buffer_size;
-  RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &buffer_size));
-  // check for end of stream
-  if (buffer_size == 0) {
-    // The data pages contain fewer values than stated in the column metadata.
-    DCHECK(stream_->eosr());
-    DCHECK_LT(num_values_read_, metadata_->num_values);
-    // TODO for 2.3: node_.element->name isn't necessarily useful
-    ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID, 
metadata_->num_values,
-        num_values_read_, node_.element->name, filename());
-    RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg));
-    *eos = true;
-    return Status::OK();
-  }
-
-  // We don't know the actual header size until the thrift object is 
deserialized.  Loop
-  // until we successfully deserialize the header or exceed the maximum header 
size.
-  uint32_t header_size;
-  Status status;
-  while (true) {
-    header_size = buffer_size;
-    status = DeserializeThriftMsg(buffer, &header_size, true, 
next_page_header);
-    if (status.ok()) break;
-
-    if (buffer_size >= FLAGS_max_page_header_size) {
-      stringstream ss;
-      ss << "ParquetScanner: could not read data page because page header 
exceeded "
-         << "maximum size of "
-         << PrettyPrinter::Print(FLAGS_max_page_header_size, TUnit::BYTES);
-      status.AddDetail(ss.str());
-      return status;
-    }
-
-    // Didn't read entire header, increase buffer size and try again
-    int64_t new_buffer_size = max<int64_t>(buffer_size * 2, 1024);
-    status = Status::OK();
-    bool success = stream_->GetBytes(
-        new_buffer_size, &buffer, &new_buffer_size, &status, /* peek */ true);
-    if (!success) {
-      DCHECK(!status.ok());
-      return status;
-    }
-    DCHECK(status.ok());
-
-    // Even though we increased the allowed buffer size, the number of bytes
-    // read did not change. The header is not limited by the buffer space,
-    // so it must be incomplete in the file.
-    if (buffer_size == new_buffer_size) {
-      DCHECK_NE(new_buffer_size, 0);
-      return Status(TErrorCode::PARQUET_HEADER_EOF, filename());
-    }
-    DCHECK_GT(new_buffer_size, buffer_size);
-    buffer_size = new_buffer_size;
-  }
-
-  *next_header_size = header_size;
-
-  // Successfully deserialized current_page_header_
-  if (!peek && !stream_->SkipBytes(header_size, &status)) return status;
-
-  int data_size = next_page_header->compressed_page_size;
-  if (UNLIKELY(data_size < 0)) {
-    return Status(Substitute("Corrupt Parquet file '$0': negative page size $1 
for "
-        "column '$2'", filename(), data_size, schema_element().name));
-  }
-  int uncompressed_size = next_page_header->uncompressed_page_size;
-  if (UNLIKELY(uncompressed_size < 0)) {
-    return Status(Substitute("Corrupt Parquet file '$0': negative uncompressed 
page "
-        "size $1 for column '$2'", filename(), uncompressed_size,
-        schema_element().name));
-  }
-
-  return Status::OK();
-}
-
-Status BaseScalarColumnReader::InitDictionary() {
-  // Peek at the next page header
-  bool eos;
-  parquet::PageHeader next_page_header;
-  uint32_t next_header_size;
-  DCHECK(stream_ != nullptr);
-  DCHECK(!HasDictionaryDecoder());
-
-  RETURN_IF_ERROR(ReadPageHeader(true /* peek */, &next_page_header,
-                                 &next_header_size, &eos));
-  if (eos) return Status::OK();
-  // The dictionary must be the first data page, so if the first page
-  // is not a dictionary, then there is no dictionary.
-  if (next_page_header.type != parquet::PageType::DICTIONARY_PAGE) return 
Status::OK();
-
-  current_page_header_ = next_page_header;
-  Status status;
-  if (!stream_->SkipBytes(next_header_size, &status)) return status;
-
-  int data_size = current_page_header_.compressed_page_size;
-  if (slot_desc_ == nullptr) {
-    // Skip processing the dictionary page if we don't need to decode any 
values. In
-    // addition to being unnecessary, we are likely unable to successfully 
decode the
-    // dictionary values because we don't necessarily create the right type of 
scalar
-    // reader if there's no slot to read into (see CreateReader()).
-    if (!stream_->SkipBytes(data_size, &status)) return status;
-    return Status::OK();
-  }
-
-  if (node_.element->type == parquet::Type::BOOLEAN) {
-    return Status("Unexpected dictionary page. Dictionary page is not"
-       " supported for booleans.");
-  }
-
-  const parquet::DictionaryPageHeader* dict_header = nullptr;
-  if (current_page_header_.__isset.dictionary_page_header) {
-    dict_header = &current_page_header_.dictionary_page_header;
-  } else {
-    if (!RequiresSkippedDictionaryHeaderCheck(parent_->file_version_)) {
-      return Status("Dictionary page does not have dictionary header set.");
-    }
-  }
-  if (dict_header != nullptr &&
-      dict_header->encoding != Encoding::PLAIN &&
-      dict_header->encoding != Encoding::PLAIN_DICTIONARY) {
-    return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported "
-                  "for dictionary pages.");
-  }
-
-  if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
-  data_end_ = data_ + data_size;
-
-  // The size of dictionary can be 0, if every value is null. The dictionary 
still has to
-  // be reset in this case.
-  DictDecoderBase* dict_decoder;
-  if (current_page_header_.uncompressed_page_size == 0) {
-    return CreateDictionaryDecoder(nullptr, 0, &dict_decoder);
-  }
-
-  // There are 3 different cases from the aspect of memory management:
-  // 1. If the column type is string, the dictionary will contain pointers to 
a buffer,
-  //    so the buffer's lifetime must be as long as any row batch that 
references it.
-  // 2. If the column type is not string, and the dictionary page is 
compressed, then a
-  //    temporary buffer is needed for the uncompressed values.
-  // 3. If the column type is not string, and the dictionary page is not 
compressed,
-  //    then no buffer is necessary.
-  ScopedBuffer uncompressed_buffer(parent_->dictionary_pool_->mem_tracker());
-  uint8_t* dict_values = nullptr;
-  if (decompressor_.get() != nullptr || slot_desc_->type().IsStringType()) {
-    int buffer_size = current_page_header_.uncompressed_page_size;
-    if (slot_desc_->type().IsStringType()) {
-      dict_values = parent_->dictionary_pool_->TryAllocate(buffer_size); // 
case 1.
-    } else if (uncompressed_buffer.TryAllocate(buffer_size)) {
-      dict_values = uncompressed_buffer.buffer(); // case 2
-    }
-    if (UNLIKELY(dict_values == nullptr)) {
-      string details = Substitute(PARQUET_COL_MEM_LIMIT_EXCEEDED, 
"InitDictionary",
-          buffer_size, "dictionary");
-      return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
-               parent_->state_, details, buffer_size);
-    }
-  } else {
-    dict_values = data_; // case 3.
-  }
-
-  if (decompressor_.get() != nullptr) {
-    int uncompressed_size = current_page_header_.uncompressed_page_size;
-    RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, data_size, data_,
-                    &uncompressed_size, &dict_values));
-    VLOG_FILE << "Decompressed " << data_size << " to " << uncompressed_size;
-    if (current_page_header_.uncompressed_page_size != uncompressed_size) {
-      return Status(Substitute("Error decompressing dictionary page in file 
'$0'. "
-               "Expected $1 uncompressed bytes but got $2", filename(),
-               current_page_header_.uncompressed_page_size, 
uncompressed_size));
-    }
-  } else {
-    if (current_page_header_.uncompressed_page_size != data_size) {
-      return Status(Substitute("Error reading dictionary page in file '$0'. "
-                               "Expected $1 bytes but got $2", filename(),
-                               current_page_header_.uncompressed_page_size, 
data_size));
-    }
-    if (slot_desc_->type().IsStringType()) memcpy(dict_values, data_, 
data_size);
-  }
-
-  RETURN_IF_ERROR(CreateDictionaryDecoder(
-      dict_values, current_page_header_.uncompressed_page_size, 
&dict_decoder));
-  if (dict_header != nullptr &&
-      dict_header->num_values != dict_decoder->num_entries()) {
-    return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
-                  slot_desc_->type().DebugString(),
-                  Substitute("Expected $0 entries but data contained $1 
entries",
-                             dict_header->num_values, 
dict_decoder->num_entries()));
-  }
-
-  return Status::OK();
-}
-
-Status BaseScalarColumnReader::InitDictionaries(
-    const vector<BaseScalarColumnReader*> readers) {
-  for (BaseScalarColumnReader* reader : readers) {
-    RETURN_IF_ERROR(reader->InitDictionary());
-  }
-  return Status::OK();
-}
-
-Status BaseScalarColumnReader::ReadDataPage() {
-  // We're about to move to the next data page.  The previous data page is
-  // now complete, free up any memory allocated for it. If the data page 
contained
-  // strings we need to attach it to the returned batch.
-  if (PageContainsTupleData(page_encoding_)) {
-    parent_->scratch_batch_->aux_mem_pool.AcquireData(data_page_pool_.get(), 
false);
-  } else {
-    data_page_pool_->FreeAll();
-  }
-  // We don't hold any pointers to earlier pages in the stream - we can safely 
free
-  // any I/O or boundary buffer.
-  stream_->ReleaseCompletedResources(false);
-
-  // Read the next data page, skipping page types we don't care about.
-  // We break out of this loop on the non-error case (a data page was found or 
we read all
-  // the pages).
-  while (true) {
-    DCHECK_EQ(num_buffered_values_, 0);
-    if (num_values_read_ == metadata_->num_values) {
-      // No more pages to read
-      // TODO: should we check for stream_->eosr()?
-      break;
-    } else if (num_values_read_ > metadata_->num_values) {
-      ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
-          metadata_->num_values, num_values_read_, node_.element->name, 
filename());
-      RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg));
-      return Status::OK();
-    }
-
-    bool eos;
-    uint32_t header_size;
-    RETURN_IF_ERROR(ReadPageHeader(false /* peek */, &current_page_header_,
-                                   &header_size, &eos));
-    if (eos) return Status::OK();
-
-    if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) {
-      // Any dictionary is already initialized, as InitDictionary has already
-      // been called. There are two possibilities:
-      // 1. The parquet file has two dictionary pages
-      // OR
-      // 2. The parquet file does not have the dictionary as the first data 
page.
-      // Both are errors in the parquet file.
-      if (HasDictionaryDecoder()) {
-        return Status(Substitute("Corrupt Parquet file '$0': multiple 
dictionary pages "
-            "for column '$1'", filename(), schema_element().name));
-      } else {
-        return Status(Substitute("Corrupt Parquet file: '$0': dictionary page 
for "
-            "column '$1' is not the first page", filename(), 
schema_element().name));
-      }
-    }
-
-    Status status;
-    int data_size = current_page_header_.compressed_page_size;
-    if (current_page_header_.type != parquet::PageType::DATA_PAGE) {
-      // We can safely skip non-data pages
-      if (!stream_->SkipBytes(data_size, &status)) {
-        DCHECK(!status.ok());
-        return status;
-      }
-      continue;
-    }
-
-    // Read Data Page
-    // TODO: when we start using page statistics, we will need to ignore 
certain corrupt
-    // statistics. See IMPALA-2208 and PARQUET-251.
-    if (!stream_->ReadBytes(data_size, &data_, &status)) {
-      DCHECK(!status.ok());
-      return status;
-    }
-    data_end_ = data_ + data_size;
-    int num_values = current_page_header_.data_page_header.num_values;
-    if (num_values < 0) {
-      return Status(Substitute("Error reading data page in Parquet file '$0'. "
-          "Invalid number of values in metadata: $1", filename(), num_values));
-    }
-    num_buffered_values_ = num_values;
-    num_values_read_ += num_buffered_values_;
-
-    int uncompressed_size = current_page_header_.uncompressed_page_size;
-    if (decompressor_.get() != nullptr) {
-      SCOPED_TIMER(parent_->decompress_timer_);
-      uint8_t* decompressed_buffer;
-      RETURN_IF_ERROR(AllocateUncompressedDataPage(
-            uncompressed_size, "decompressed data", &decompressed_buffer));
-      RETURN_IF_ERROR(decompressor_->ProcessBlock32(true,
-          current_page_header_.compressed_page_size, data_, &uncompressed_size,
-          &decompressed_buffer));
-      VLOG_FILE << "Decompressed " << current_page_header_.compressed_page_size
-                << " to " << uncompressed_size;
-      if (current_page_header_.uncompressed_page_size != uncompressed_size) {
-        return Status(Substitute("Error decompressing data page in file '$0'. "
-            "Expected $1 uncompressed bytes but got $2", filename(),
-            current_page_header_.uncompressed_page_size, uncompressed_size));
-      }
-      data_ = decompressed_buffer;
-      data_size = current_page_header_.uncompressed_page_size;
-      data_end_ = data_ + data_size;
-    } else {
-      DCHECK_EQ(metadata_->codec, parquet::CompressionCodec::UNCOMPRESSED);
-      if (current_page_header_.compressed_page_size != uncompressed_size) {
-        return Status(Substitute("Error reading data page in file '$0'. "
-            "Expected $1 bytes but got $2", filename(),
-            current_page_header_.compressed_page_size, uncompressed_size));
-      }
-      if 
(PageContainsTupleData(current_page_header_.data_page_header.encoding)) {
-        // In this case returned batches will have pointers into the data page 
itself.
-        // We don't transfer disk I/O buffers out of the scanner so we need to 
copy
-        // the page data so that it can be attached to output batches.
-        uint8_t* copy_buffer;
-        RETURN_IF_ERROR(AllocateUncompressedDataPage(
-              uncompressed_size, "uncompressed variable-length data", 
&copy_buffer));
-        memcpy(copy_buffer, data_, uncompressed_size);
-        data_ = copy_buffer;
-        data_end_ = data_ + uncompressed_size;
-      }
-    }
-
-    // Initialize the repetition level data
-    RETURN_IF_ERROR(rep_levels_.Init(filename(),
-        current_page_header_.data_page_header.repetition_level_encoding,
-        parent_->perm_pool_.get(), parent_->state_->batch_size(), 
max_rep_level(), &data_,
-        &data_size));
-
-    // Initialize the definition level data
-    RETURN_IF_ERROR(def_levels_.Init(filename(),
-        current_page_header_.data_page_header.definition_level_encoding,
-        parent_->perm_pool_.get(), parent_->state_->batch_size(), 
max_def_level(), &data_,
-        &data_size));
-
-    // Data can be empty if the column contains all NULLs
-    RETURN_IF_ERROR(InitDataPage(data_, data_size));
-    break;
-  }
-
-  return Status::OK();
-}
-
-Status BaseScalarColumnReader::AllocateUncompressedDataPage(int64_t size,
-    const char* err_ctx, uint8_t** buffer) {
-  *buffer = data_page_pool_->TryAllocate(size);
-  if (*buffer == nullptr) {
-    string details =
-        Substitute(PARQUET_COL_MEM_LIMIT_EXCEEDED, "ReadDataPage", size, 
err_ctx);
-    return data_page_pool_->mem_tracker()->MemLimitExceeded(
-        parent_->state_, details, size);
-  }
-  return Status::OK();
-}
-
-template <bool ADVANCE_REP_LEVEL>
-bool BaseScalarColumnReader::NextLevels() {
-  if (!ADVANCE_REP_LEVEL) DCHECK_EQ(max_rep_level(), 0) << 
slot_desc()->DebugString();
-
-  if (UNLIKELY(num_buffered_values_ == 0)) {
-    if (!NextPage()) return parent_->parse_status_.ok();
-  }
-  --num_buffered_values_;
-  DCHECK_GE(num_buffered_values_, 0);
-
-  // Definition level is not present if column and any containing structs are 
required.
-  def_level_ = max_def_level() == 0 ? 0 : def_levels_.ReadLevel();
-  // The compiler can optimize these two conditions into a single branch by 
treating
-  // def_level_ as unsigned.
-  if (UNLIKELY(def_level_ < 0 || def_level_ > max_def_level())) {
-    SetLevelDecodeError("def", def_level_, max_def_level());
-    return false;
-  }
-
-  if (ADVANCE_REP_LEVEL && max_rep_level() > 0) {
-    // Repetition level is only present if this column is nested in any 
collection type.
-    rep_level_ = rep_levels_.ReadLevel();
-    if (UNLIKELY(rep_level_ < 0 || rep_level_ > max_rep_level())) {
-      SetLevelDecodeError("rep", rep_level_, max_rep_level());
-      return false;
-    }
-    // Reset position counter if we are at the start of a new parent 
collection.
-    if (rep_level_ <= max_rep_level() - 1) pos_current_value_ = 0;
-  }
-
-  return parent_->parse_status_.ok();
-}
-
-Status BaseScalarColumnReader::GetUnsupportedDecodingError() {
-  return Status(Substitute(
-      "File '$0' is corrupt: unexpected encoding: $1 for data page of column 
'$2'.",
-      filename(), PrintThriftEnum(page_encoding_), schema_element().name));
-}
-
-bool BaseScalarColumnReader::NextPage() {
-  parent_->assemble_rows_timer_.Stop();
-  parent_->parse_status_ = ReadDataPage();
-  if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-  if (num_buffered_values_ == 0) {
-    rep_level_ = HdfsParquetScanner::ROW_GROUP_END;
-    def_level_ = HdfsParquetScanner::ROW_GROUP_END;
-    pos_current_value_ = HdfsParquetScanner::INVALID_POS;
-    return false;
-  }
-  parent_->assemble_rows_timer_.Start();
-  return true;
-}
-
-void BaseScalarColumnReader::SetLevelDecodeError(const char* level_name,
-    int decoded_level, int max_level) {
-  if (decoded_level < 0) {
-    DCHECK_EQ(decoded_level, HdfsParquetScanner::INVALID_LEVEL);
-    parent_->parse_status_.MergeStatus(Status(Substitute("Corrupt Parquet file 
'$0': "
-        "could not read all $1 levels for column '$2'", filename(),
-        level_name, schema_element().name)));
-  } else {
-    parent_->parse_status_.MergeStatus(Status(Substitute("Corrupt Parquet file 
'$0': "
-        "invalid $1 level $2 > max $1 level $3 for column '$4'", filename(),
-        level_name, decoded_level, max_level, schema_element().name)));
-  }
-}
-
-bool CollectionColumnReader::NextLevels() {
-  DCHECK(!children_.empty());
-  DCHECK_LE(rep_level_, new_collection_rep_level());
-  for (int c = 0; c < children_.size(); ++c) {
-    do {
-      // TODO(skye): verify somewhere that all column readers are at end
-      if (!children_[c]->NextLevels()) return false;
-    } while (children_[c]->rep_level() > new_collection_rep_level());
-  }
-  UpdateDerivedState();
-  return true;
-}
-
-bool CollectionColumnReader::ReadValue(MemPool* pool, Tuple* tuple) {
-  DCHECK_GE(rep_level_, 0);
-  DCHECK_GE(def_level_, 0);
-  DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
-      "Caller should have called NextLevels() until we are ready to read a 
value";
-
-  if (tuple_offset_ == -1) {
-    return CollectionColumnReader::NextLevels();
-  } else if (def_level_ >= max_def_level()) {
-    return ReadSlot(tuple->GetCollectionSlot(tuple_offset_), pool);
-  } else {
-    // Null value
-    tuple->SetNull(null_indicator_offset_);
-    return CollectionColumnReader::NextLevels();
-  }
-}
-
-bool CollectionColumnReader::ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) 
{
-  return CollectionColumnReader::ReadValue(pool, tuple);
-}
-
-bool CollectionColumnReader::ReadSlot(CollectionValue* slot, MemPool* pool) {
-  DCHECK(!children_.empty());
-  DCHECK_LE(rep_level_, new_collection_rep_level());
-
-  // Recursively read the collection into a new CollectionValue.
-  *slot = CollectionValue();
-  CollectionValueBuilder builder(
-      slot, *slot_desc_->collection_item_descriptor(), pool, parent_->state_);
-  bool continue_execution = parent_->AssembleCollection(
-      children_, new_collection_rep_level(), &builder);
-  if (!continue_execution) return false;
-
-  // AssembleCollection() advances child readers, so we don't need to call 
NextLevels()
-  UpdateDerivedState();
-  return true;
-}
-
-void CollectionColumnReader::UpdateDerivedState() {
-  // We don't need to cap our def_level_ at max_def_level(). We always check 
def_level_
-  // >= max_def_level() to check if the collection is defined.
-  // TODO(skye): consider capping def_level_ at max_def_level()
-  def_level_ = children_[0]->def_level();
-  rep_level_ = children_[0]->rep_level();
-
-  // All children should have been advanced to the beginning of the next 
collection
-  for (int i = 0; i < children_.size(); ++i) {
-    DCHECK_EQ(children_[i]->rep_level(), rep_level_);
-    if (def_level_ < max_def_level()) {
-      // Collection not defined
-      FILE_CHECK_EQ(children_[i]->def_level(), def_level_);
-    } else {
-      // Collection is defined
-      FILE_CHECK_GE(children_[i]->def_level(), max_def_level());
-    }
-  }
-
-  if (RowGroupAtEnd()) {
-    // No more values
-    pos_current_value_ = HdfsParquetScanner::INVALID_POS;
-  } else if (rep_level_ <= max_rep_level() - 2) {
-    // Reset position counter if we are at the start of a new parent 
collection (i.e.,
-    // the current collection is the first item in a new parent collection).
-    pos_current_value_ = 0;
-  }
-}
-
-/// Returns a column reader for decimal types based on its size and parquet 
type.
-static ParquetColumnReader* CreateDecimalColumnReader(const SchemaNode& node,
-    const SlotDescriptor* slot_desc, HdfsParquetScanner* parent) {
-  switch (node.element->type) {
-    case parquet::Type::FIXED_LEN_BYTE_ARRAY:
-      switch (slot_desc->type().GetByteSize()) {
-      case 4:
-        return new ScalarColumnReader<Decimal4Value, 
parquet::Type::FIXED_LEN_BYTE_ARRAY,
-            true>(parent, node, slot_desc);
-      case 8:
-        return new ScalarColumnReader<Decimal8Value, 
parquet::Type::FIXED_LEN_BYTE_ARRAY,
-            true>(parent, node, slot_desc);
-      case 16:
-        return new ScalarColumnReader<Decimal16Value, 
parquet::Type::FIXED_LEN_BYTE_ARRAY,
-            true>(parent, node, slot_desc);
-      }
-      break;
-    case parquet::Type::BYTE_ARRAY:
-      switch (slot_desc->type().GetByteSize()) {
-      case 4:
-        return new ScalarColumnReader<Decimal4Value, 
parquet::Type::BYTE_ARRAY, true>(
-            parent, node, slot_desc);
-      case 8:
-        return new ScalarColumnReader<Decimal8Value, 
parquet::Type::BYTE_ARRAY, true>(
-            parent, node, slot_desc);
-      case 16:
-        return new ScalarColumnReader<Decimal16Value, 
parquet::Type::BYTE_ARRAY, true>(
-            parent, node, slot_desc);
-      }
-      break;
-    case parquet::Type::INT32:
-      DCHECK_EQ(sizeof(Decimal4Value::StorageType), 
slot_desc->type().GetByteSize());
-      return new ScalarColumnReader<Decimal4Value, parquet::Type::INT32, true>(
-          parent, node, slot_desc);
-    case parquet::Type::INT64:
-      DCHECK_EQ(sizeof(Decimal8Value::StorageType), 
slot_desc->type().GetByteSize());
-      return new ScalarColumnReader<Decimal8Value, parquet::Type::INT64, true>(
-          parent, node, slot_desc);
-    default:
-      DCHECK(false) << "Invalid decimal primitive type";
-  }
-  DCHECK(false) << "Invalid decimal type";
-  return nullptr;
-}
-
-ParquetColumnReader* ParquetColumnReader::Create(const SchemaNode& node,
-    bool is_collection_field, const SlotDescriptor* slot_desc,
-    HdfsParquetScanner* parent) {
-  if (is_collection_field) {
-    // Create collection reader (note this handles both NULL and non-NULL 
'slot_desc')
-    return new CollectionColumnReader(parent, node, slot_desc);
-  } else if (slot_desc != nullptr) {
-    // Create the appropriate ScalarColumnReader type to read values into 
'slot_desc'
-    switch (slot_desc->type().type) {
-      case TYPE_BOOLEAN:
-        return new BoolColumnReader(parent, node, slot_desc);
-      case TYPE_TINYINT:
-        return new ScalarColumnReader<int8_t, parquet::Type::INT32, 
true>(parent, node,
-            slot_desc);
-      case TYPE_SMALLINT:
-        return new ScalarColumnReader<int16_t, parquet::Type::INT32, 
true>(parent, node,
-            slot_desc);
-      case TYPE_INT:
-        return new ScalarColumnReader<int32_t, parquet::Type::INT32, 
true>(parent, node,
-            slot_desc);
-      case TYPE_BIGINT:
-        switch (node.element->type) {
-          case parquet::Type::INT32:
-            return new ScalarColumnReader<int64_t, parquet::Type::INT32, 
true>(parent,
-                node, slot_desc);
-          default:
-            return new ScalarColumnReader<int64_t, parquet::Type::INT64, 
true>(parent,
-                node, slot_desc);
-        }
-      case TYPE_FLOAT:
-        return new ScalarColumnReader<float, parquet::Type::FLOAT, 
true>(parent, node,
-            slot_desc);
-      case TYPE_DOUBLE:
-        switch (node.element->type) {
-          case parquet::Type::INT32:
-            return new ScalarColumnReader<double , parquet::Type::INT32, 
true>(parent,
-                node, slot_desc);
-          case parquet::Type::FLOAT:
-            return new ScalarColumnReader<double, parquet::Type::FLOAT, 
true>(parent,
-                node, slot_desc);
-          default:
-            return new ScalarColumnReader<double, parquet::Type::DOUBLE, 
true>(parent,
-                node, slot_desc);
-        }
-      case TYPE_TIMESTAMP:
-        return CreateTimestampColumnReader(node, slot_desc, parent);
-      case TYPE_STRING:
-      case TYPE_VARCHAR:
-      case TYPE_CHAR:
-        return new ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, 
true>(
-            parent, node, slot_desc);
-      case TYPE_DECIMAL:
-        return CreateDecimalColumnReader(node, slot_desc, parent);
-      default:
-        DCHECK(false) << slot_desc->type().DebugString();
-        return nullptr;
-    }
-  } else {
-    // Special case for counting scalar values (e.g. count(*), no materialized 
columns in
-    // the file, only materializing a position slot). We won't actually read 
any values,
-    // only the rep and def levels, so it doesn't matter what kind of reader 
we make.
-    return new ScalarColumnReader<int8_t, parquet::Type::INT32, false>(parent, 
node,
-        slot_desc);
-  }
-}
-
-ParquetColumnReader* ParquetColumnReader::CreateTimestampColumnReader(
-    const SchemaNode& node, const SlotDescriptor* slot_desc,
-    HdfsParquetScanner* parent) {
-  if (node.element->type == parquet::Type::INT96) {
-    return new ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>(
-        parent, node, slot_desc);
-  }
-  else if (node.element->type == parquet::Type::INT64) {
-    return new ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>(
-        parent, node, slot_desc);
-  }
-  DCHECK(false) << slot_desc->type().DebugString();
-  return nullptr;
-}
-
-}

Reply via email to