http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc 
b/be/src/exec/parquet/hdfs-parquet-scanner.cc
new file mode 100644
index 0000000..9cfcdb8
--- /dev/null
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -0,0 +1,1684 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/parquet/hdfs-parquet-scanner.h"
+
+#include <algorithm>
+#include <queue>
+
+#include <gflags/gflags.h>
+#include <gutil/strings/substitute.h>
+
+#include "codegen/codegen-anyval.h"
+#include "exec/hdfs-scan-node.h"
+#include "exec/parquet/parquet-collection-column-reader.h"
+#include "exec/parquet/parquet-column-readers.h"
+#include "exec/parquet/parquet-column-stats.h"
+#include "exec/scanner-context.inline.h"
+#include "rpc/thrift-util.h"
+#include "runtime/collection-value-builder.h"
+#include "runtime/exec-env.h"
+#include "runtime/io/disk-io-mgr.h"
+#include "runtime/io/request-context.h"
+#include "runtime/runtime-filter.inline.h"
+#include "runtime/runtime-state.h"
+#include "util/dict-encoding.h"
+
+#include "common/names.h"
+
+DECLARE_bool(convert_legacy_hive_parquet_utc_timestamps);
+
+using std::move;
+using std::sort;
+using namespace impala;
+using namespace impala::io;
+
+// Max entries in the dictionary before switching to PLAIN encoding. If a 
dictionary
+// has fewer entries, then the entire column is dictionary encoded. This 
threshold
+// is guaranteed to be true for Impala versions 2.9 or below.
+// THIS RECORDS INFORMATION ABOUT PAST BEHAVIOR. DO NOT CHANGE THIS CONSTANT.
+const int LEGACY_IMPALA_MAX_DICT_ENTRIES = 40000;
+
+const char* HdfsParquetScanner::LLVM_CLASS_NAME = 
"class.impala::HdfsParquetScanner";
+
+static const string PARQUET_MEM_LIMIT_EXCEEDED =
+    "HdfsParquetScanner::$0() failed to allocate $1 bytes for $2.";
+
+namespace impala {
+
+static const string IDEAL_RESERVATION_COUNTER_NAME = 
"ParquetRowGroupIdealReservation";
+static const string ACTUAL_RESERVATION_COUNTER_NAME = 
"ParquetRowGroupActualReservation";
+
+Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
+    const vector<HdfsFileDesc*>& files) {
+  DCHECK(!files.empty());
+  // Add Parquet-specific counters.
+  ADD_SUMMARY_STATS_COUNTER(
+      scan_node->runtime_profile(), IDEAL_RESERVATION_COUNTER_NAME, 
TUnit::BYTES);
+  ADD_SUMMARY_STATS_COUNTER(
+      scan_node->runtime_profile(), ACTUAL_RESERVATION_COUNTER_NAME, 
TUnit::BYTES);
+
+  for (HdfsFileDesc* file : files) {
+    // If the file size is less than 12 bytes, it is an invalid Parquet file.
+    if (file->file_length < 12) {
+      return Status(Substitute("Parquet file $0 has an invalid file length: 
$1",
+          file->filename, file->file_length));
+    }
+  }
+  return IssueFooterRanges(scan_node, THdfsFileFormat::PARQUET, files);
+}
+
+HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, 
RuntimeState* state)
+  : HdfsScanner(scan_node, state),
+    row_group_idx_(-1),
+    row_group_rows_read_(0),
+    advance_row_group_(true),
+    min_max_tuple_(nullptr),
+    row_batches_produced_(0),
+    scratch_batch_(new ScratchTupleBatch(
+        *scan_node->row_desc(), state_->batch_size(), 
scan_node->mem_tracker())),
+    metadata_range_(nullptr),
+    dictionary_pool_(new MemPool(scan_node->mem_tracker())),
+    assemble_rows_timer_(scan_node_->materialize_tuple_timer()),
+    process_footer_timer_stats_(nullptr),
+    num_cols_counter_(nullptr),
+    num_stats_filtered_row_groups_counter_(nullptr),
+    num_row_groups_counter_(nullptr),
+    num_scanners_with_no_reads_counter_(nullptr),
+    num_dict_filtered_row_groups_counter_(nullptr),
+    coll_items_read_counter_(0),
+    codegend_process_scratch_batch_fn_(nullptr) {
+  assemble_rows_timer_.Stop();
+}
+
+Status HdfsParquetScanner::Open(ScannerContext* context) {
+  RETURN_IF_ERROR(HdfsScanner::Open(context));
+  metadata_range_ = stream_->scan_range();
+  num_cols_counter_ =
+      ADD_COUNTER(scan_node_->runtime_profile(), "NumColumns", TUnit::UNIT);
+  num_stats_filtered_row_groups_counter_ =
+      ADD_COUNTER(scan_node_->runtime_profile(), "NumStatsFilteredRowGroups",
+          TUnit::UNIT);
+  num_row_groups_counter_ =
+      ADD_COUNTER(scan_node_->runtime_profile(), "NumRowGroups", TUnit::UNIT);
+  num_scanners_with_no_reads_counter_ =
+      ADD_COUNTER(scan_node_->runtime_profile(), "NumScannersWithNoReads", 
TUnit::UNIT);
+  num_dict_filtered_row_groups_counter_ =
+      ADD_COUNTER(scan_node_->runtime_profile(), "NumDictFilteredRowGroups", 
TUnit::UNIT);
+  process_footer_timer_stats_ =
+      ADD_SUMMARY_STATS_TIMER(scan_node_->runtime_profile(), 
"FooterProcessingTime");
+
+  codegend_process_scratch_batch_fn_ = reinterpret_cast<ProcessScratchBatchFn>(
+      scan_node_->GetCodegenFn(THdfsFileFormat::PARQUET));
+  if (codegend_process_scratch_batch_fn_ == nullptr) {
+    scan_node_->IncNumScannersCodegenDisabled();
+  } else {
+    scan_node_->IncNumScannersCodegenEnabled();
+  }
+
+  perm_pool_.reset(new MemPool(scan_node_->mem_tracker()));
+
+  // Allocate tuple buffer to evaluate conjuncts on parquet::Statistics.
+  const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
+  if (min_max_tuple_desc != nullptr) {
+    int64_t tuple_size = min_max_tuple_desc->byte_size();
+    uint8_t* buffer = perm_pool_->TryAllocate(tuple_size);
+    if (buffer == nullptr) {
+      string details = Substitute("Could not allocate buffer of $0 bytes for 
Parquet "
+          "statistics tuple for file '$1'.", tuple_size, filename());
+      return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, 
tuple_size);
+    }
+    min_max_tuple_ = reinterpret_cast<Tuple*>(buffer);
+  }
+
+  // Clone the min/max statistics conjuncts.
+  RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, state_,
+      expr_perm_pool_.get(), context_->expr_results_pool(),
+      scan_node_->min_max_conjunct_evals(), &min_max_conjunct_evals_));
+
+  for (int i = 0; i < context->filter_ctxs().size(); ++i) {
+    const FilterContext* ctx = &context->filter_ctxs()[i];
+    DCHECK(ctx->filter != nullptr);
+    filter_ctxs_.push_back(ctx);
+  }
+  filter_stats_.resize(filter_ctxs_.size());
+
+  DCHECK(parse_status_.ok()) << "Invalid parse_status_" << 
parse_status_.GetDetail();
+
+  // Each scan node can process multiple splits. Each split processes the 
footer once.
+  // We use a timer to measure the time taken to ProcessFooter() per split and 
add
+  // this time to the averaged timer.
+  MonotonicStopWatch single_footer_process_timer;
+  single_footer_process_timer.Start();
+  // First process the file metadata in the footer.
+  Status footer_status = ProcessFooter();
+  single_footer_process_timer.Stop();
+
+  
process_footer_timer_stats_->UpdateCounter(single_footer_process_timer.ElapsedTime());
+
+  // Release I/O buffers immediately to make sure they are cleaned up
+  // in case we return a non-OK status anywhere below.
+  stream_ = nullptr;
+  context_->ReleaseCompletedResources(true);
+  context_->ClearStreams();
+  RETURN_IF_ERROR(footer_status);
+
+  // Parse the file schema into an internal representation for schema 
resolution.
+  schema_resolver_.reset(new ParquetSchemaResolver(*scan_node_->hdfs_table(),
+      state_->query_options().parquet_fallback_schema_resolution,
+      state_->query_options().parquet_array_resolution));
+  RETURN_IF_ERROR(schema_resolver_->Init(&file_metadata_, filename()));
+
+  // We've processed the metadata and there are columns that need to be 
materialized.
+  RETURN_IF_ERROR(CreateColumnReaders(
+      *scan_node_->tuple_desc(), *schema_resolver_, &column_readers_));
+  COUNTER_SET(num_cols_counter_,
+      static_cast<int64_t>(CountScalarColumns(column_readers_)));
+  // Set top-level template tuple.
+  template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
+
+  RETURN_IF_ERROR(InitDictFilterStructures());
+  return Status::OK();
+}
+
+void HdfsParquetScanner::Close(RowBatch* row_batch) {
+  DCHECK(!is_closed_);
+  if (row_batch != nullptr) {
+    FlushRowGroupResources(row_batch);
+    row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), 
false);
+    if (scan_node_->HasRowBatchQueue()) {
+      static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(
+          unique_ptr<RowBatch>(row_batch));
+    }
+  } else {
+    template_tuple_pool_->FreeAll();
+    dictionary_pool_->FreeAll();
+    context_->ReleaseCompletedResources(true);
+    for (ParquetColumnReader* col_reader : column_readers_) 
col_reader->Close(nullptr);
+    // The scratch batch may still contain tuple data. We can get into this 
case if
+    // Open() fails or if the query is cancelled.
+    scratch_batch_->ReleaseResources(nullptr);
+  }
+  if (perm_pool_ != nullptr) {
+    perm_pool_->FreeAll();
+    perm_pool_.reset();
+  }
+
+  // Verify all resources (if any) have been transferred.
+  DCHECK_EQ(template_tuple_pool_->total_allocated_bytes(), 0);
+  DCHECK_EQ(dictionary_pool_->total_allocated_bytes(), 0);
+  DCHECK_EQ(scratch_batch_->total_allocated_bytes(), 0);
+
+  // Collect compression types for reporting completed ranges.
+  vector<THdfsCompression::type> compression_types;
+  stack<ParquetColumnReader*> readers;
+  for (ParquetColumnReader* r: column_readers_) readers.push(r);
+  while (!readers.empty()) {
+    ParquetColumnReader* reader = readers.top();
+    readers.pop();
+    if (reader->IsCollectionReader()) {
+      CollectionColumnReader* coll_reader = 
static_cast<CollectionColumnReader*>(reader);
+      for (ParquetColumnReader* r: *coll_reader->children()) readers.push(r);
+      continue;
+    }
+    BaseScalarColumnReader* scalar_reader = 
static_cast<BaseScalarColumnReader*>(reader);
+    compression_types.push_back(scalar_reader->codec());
+  }
+  assemble_rows_timer_.Stop();
+  assemble_rows_timer_.ReleaseCounter();
+
+  // If this was a metadata only read (i.e. count(*)), there are no columns.
+  if (compression_types.empty()) {
+    compression_types.push_back(THdfsCompression::NONE);
+    scan_node_->RangeComplete(THdfsFileFormat::PARQUET, compression_types, 
true);
+  } else {
+    scan_node_->RangeComplete(THdfsFileFormat::PARQUET, compression_types);
+  }
+  if (schema_resolver_.get() != nullptr) schema_resolver_.reset();
+
+  ScalarExprEvaluator::Close(min_max_conjunct_evals_, state_);
+
+  for (int i = 0; i < filter_ctxs_.size(); ++i) {
+    const FilterStats* stats = filter_ctxs_[i]->stats;
+    const LocalFilterStats& local = filter_stats_[i];
+    stats->IncrCounters(FilterStats::ROWS_KEY, local.total_possible,
+        local.considered, local.rejected);
+  }
+
+  CloseInternal();
+}
+
+// Get the start of the column.
+static int64_t GetColumnStartOffset(const parquet::ColumnMetaData& column) {
+  if (column.__isset.dictionary_page_offset) {
+    DCHECK_LT(column.dictionary_page_offset, column.data_page_offset);
+    return column.dictionary_page_offset;
+  }
+  return column.data_page_offset;
+}
+
+// Get the file offset of the middle of the row group.
+static int64_t GetRowGroupMidOffset(const parquet::RowGroup& row_group) {
+  int64_t start_offset = GetColumnStartOffset(row_group.columns[0].meta_data);
+
+  const parquet::ColumnMetaData& last_column =
+      row_group.columns[row_group.columns.size() - 1].meta_data;
+  int64_t end_offset =
+      GetColumnStartOffset(last_column) + last_column.total_compressed_size;
+
+  return start_offset + (end_offset - start_offset) / 2;
+}
+
+// Returns true if 'row_group' overlaps with 'split_range'.
+static bool CheckRowGroupOverlapsSplit(const parquet::RowGroup& row_group,
+    const ScanRange* split_range) {
+  int64_t row_group_start = 
GetColumnStartOffset(row_group.columns[0].meta_data);
+
+  const parquet::ColumnMetaData& last_column =
+      row_group.columns[row_group.columns.size() - 1].meta_data;
+  int64_t row_group_end =
+      GetColumnStartOffset(last_column) + last_column.total_compressed_size;
+
+  int64_t split_start = split_range->offset();
+  int64_t split_end = split_start + split_range->len();
+
+  return (split_start >= row_group_start && split_start < row_group_end) ||
+      (split_end > row_group_start && split_end <= row_group_end) ||
+      (split_start <= row_group_start && split_end >= row_group_end);
+}
+
+int HdfsParquetScanner::CountScalarColumns(
+    const vector<ParquetColumnReader*>& column_readers) {
+  DCHECK(!column_readers.empty() || scan_node_->optimize_parquet_count_star());
+  int num_columns = 0;
+  stack<ParquetColumnReader*> readers;
+  for (ParquetColumnReader* r: column_readers_) readers.push(r);
+  while (!readers.empty()) {
+    ParquetColumnReader* col_reader = readers.top();
+    readers.pop();
+    if (col_reader->IsCollectionReader()) {
+      CollectionColumnReader* collection_reader =
+          static_cast<CollectionColumnReader*>(col_reader);
+      for (ParquetColumnReader* r: *collection_reader->children()) 
readers.push(r);
+      continue;
+    }
+    ++num_columns;
+  }
+  return num_columns;
+}
+
+Status HdfsParquetScanner::ProcessSplit() {
+  DCHECK(scan_node_->HasRowBatchQueue());
+  HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_);
+  do {
+    if (FilterContext::CheckForAlwaysFalse(FilterStats::SPLITS_KEY,
+        context_->filter_ctxs())) {
+      eos_ = true;
+      break;
+    }
+    unique_ptr<RowBatch> batch = make_unique<RowBatch>(scan_node_->row_desc(),
+        state_->batch_size(), scan_node_->mem_tracker());
+    Status status = GetNextInternal(batch.get());
+    // Always add batch to the queue because it may contain data referenced by 
previously
+    // appended batches.
+    scan_node->AddMaterializedRowBatch(move(batch));
+    RETURN_IF_ERROR(status);
+    ++row_batches_produced_;
+    if ((row_batches_produced_ & (BATCHES_PER_FILTER_SELECTIVITY_CHECK - 1)) 
== 0) {
+      CheckFiltersEffectiveness();
+    }
+  } while (!eos_ && !scan_node_->ReachedLimit());
+  return Status::OK();
+}
+
+Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
+  DCHECK(parse_status_.ok()) << parse_status_.GetDetail();
+  if (scan_node_->optimize_parquet_count_star()) {
+    // Populate the single slot with the Parquet num rows statistic.
+    int64_t tuple_buf_size;
+    uint8_t* tuple_buf;
+    // We try to allocate a smaller row batch here because in most cases the 
number row
+    // groups in a file is much lower than the default row batch capacity.
+    int capacity = min(
+        static_cast<int>(file_metadata_.row_groups.size()), 
row_batch->capacity());
+    RETURN_IF_ERROR(RowBatch::ResizeAndAllocateTupleBuffer(state_,
+        row_batch->tuple_data_pool(), row_batch->row_desc()->GetRowSize(),
+        &capacity, &tuple_buf_size, &tuple_buf));
+    while (!row_batch->AtCapacity()) {
+      RETURN_IF_ERROR(NextRowGroup());
+      DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size());
+      DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows);
+      if (row_group_idx_ == file_metadata_.row_groups.size()) break;
+      Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf);
+      TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow());
+      InitTuple(template_tuple_, dst_tuple);
+      int64_t* dst_slot =
+          
dst_tuple->GetBigIntSlot(scan_node_->parquet_count_star_slot_offset());
+      *dst_slot = file_metadata_.row_groups[row_group_idx_].num_rows;
+      row_group_rows_read_ += *dst_slot;
+      dst_row->SetTuple(0, dst_tuple);
+      row_batch->CommitLastRow();
+      tuple_buf += scan_node_->tuple_desc()->byte_size();
+    }
+    eos_ = row_group_idx_ == file_metadata_.row_groups.size();
+    return Status::OK();
+  } else if (scan_node_->IsZeroSlotTableScan()) {
+    // There are no materialized slots and we are not optimizing count(*), e.g.
+    // "select 1 from alltypes". We can serve this query from just the file 
metadata.
+    // We don't need to read the column data.
+    if (row_group_rows_read_ == file_metadata_.num_rows) {
+      eos_ = true;
+      return Status::OK();
+    }
+    assemble_rows_timer_.Start();
+    DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows);
+    int64_t rows_remaining = file_metadata_.num_rows - row_group_rows_read_;
+    int max_tuples = min<int64_t>(row_batch->capacity(), rows_remaining);
+    TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
+    int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
+    Status status = CommitRows(row_batch, num_to_commit);
+    assemble_rows_timer_.Stop();
+    RETURN_IF_ERROR(status);
+    row_group_rows_read_ += max_tuples;
+    COUNTER_ADD(scan_node_->rows_read_counter(), row_group_rows_read_);
+    return Status::OK();
+  }
+
+  // Transfer remaining tuples from the scratch batch.
+  if (!scratch_batch_->AtEnd()) {
+    assemble_rows_timer_.Start();
+    int num_row_to_commit = TransferScratchTuples(row_batch);
+    assemble_rows_timer_.Stop();
+    RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit));
+    if (row_batch->AtCapacity()) return Status::OK();
+  }
+
+  while (advance_row_group_ || column_readers_[0]->RowGroupAtEnd()) {
+    // Transfer resources and clear streams if there is any leftover from the 
previous
+    // row group. We will create new streams for the next row group.
+    FlushRowGroupResources(row_batch);
+    if (!advance_row_group_) {
+      Status status =
+          ValidateEndOfRowGroup(column_readers_, row_group_idx_, 
row_group_rows_read_);
+      if (!status.ok()) 
RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
+    }
+    RETURN_IF_ERROR(NextRowGroup());
+    DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size());
+    if (row_group_idx_ == file_metadata_.row_groups.size()) {
+      eos_ = true;
+      DCHECK(parse_status_.ok());
+      return Status::OK();
+    }
+  }
+
+  // Apply any runtime filters to static tuples containing the partition keys 
for this
+  // partition. If any filter fails, we return immediately and stop processing 
this
+  // scan range.
+  if 
(!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(),
+      FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
+    eos_ = true;
+    DCHECK(parse_status_.ok());
+    return Status::OK();
+  }
+  assemble_rows_timer_.Start();
+  Status status = AssembleRows(column_readers_, row_batch, 
&advance_row_group_);
+  assemble_rows_timer_.Stop();
+  RETURN_IF_ERROR(status);
+  if (!parse_status_.ok()) {
+    RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg()));
+    parse_status_ = Status::OK();
+  }
+
+  return Status::OK();
+}
+
+Status HdfsParquetScanner::EvaluateStatsConjuncts(
+    const parquet::FileMetaData& file_metadata, const parquet::RowGroup& 
row_group,
+    bool* skip_row_group) {
+  *skip_row_group = false;
+
+  if (!state_->query_options().parquet_read_statistics) return Status::OK();
+
+  const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
+  if (!min_max_tuple_desc) return Status::OK();
+
+  int64_t tuple_size = min_max_tuple_desc->byte_size();
+
+  DCHECK(min_max_tuple_ != nullptr);
+  min_max_tuple_->Init(tuple_size);
+
+  DCHECK_EQ(min_max_tuple_desc->slots().size(), 
min_max_conjunct_evals_.size());
+  for (int i = 0; i < min_max_conjunct_evals_.size(); ++i) {
+    SlotDescriptor* slot_desc = min_max_tuple_desc->slots()[i];
+    ScalarExprEvaluator* eval = min_max_conjunct_evals_[i];
+
+    // Resolve column path to determine col idx.
+    SchemaNode* node = nullptr;
+    bool pos_field;
+    bool missing_field;
+    RETURN_IF_ERROR(schema_resolver_->ResolvePath(slot_desc->col_path(),
+        &node, &pos_field, &missing_field));
+
+    if (missing_field) {
+      // We are selecting a column that is not in the file. We would set its 
slot to NULL
+      // during the scan, so any predicate would evaluate to false. Return 
early. NULL
+      // comparisons cannot happen here, since predicates with NULL literals 
are filtered
+      // in the frontend.
+      *skip_row_group = true;
+      break;
+    }
+
+    if (pos_field) {
+      // The planner should not send predicates with 'pos' for stats filtering 
to the BE.
+      // In case there is a bug, we return an error, which will abort the 
query.
+      stringstream err;
+      err << "Statistics not supported for pos fields: " << 
slot_desc->DebugString();
+      DCHECK(false) << err.str();
+      return Status(err.str());
+    }
+
+    int col_idx = node->col_idx;
+    DCHECK_LT(col_idx, row_group.columns.size());
+
+    const vector<parquet::ColumnOrder>& col_orders = 
file_metadata.column_orders;
+    const parquet::ColumnOrder* col_order = nullptr;
+    if (col_idx < col_orders.size()) col_order = &col_orders[col_idx];
+
+    const parquet::ColumnChunk& col_chunk = row_group.columns[col_idx];
+    const ColumnType& col_type = slot_desc->type();
+
+    DCHECK(node->element != nullptr);
+
+    ColumnStatsReader stat_reader(col_chunk, col_type, col_order,  
*node->element);
+    if (col_type.IsTimestampType()) {
+      stat_reader.SetTimestampDecoder(CreateTimestampDecoder(*node->element));
+    }
+
+    int64_t null_count = 0;
+    bool null_count_result = stat_reader.ReadNullCountStat(&null_count);
+    if (null_count_result && null_count == col_chunk.meta_data.num_values) {
+      *skip_row_group = true;
+      break;
+    }
+
+    const string& fn_name = eval->root().function_name();
+    ColumnStatsReader::StatsField stats_field;
+    if (fn_name == "lt" || fn_name == "le") {
+      // We need to get min stats.
+      stats_field = ColumnStatsReader::StatsField::MIN;
+    } else if (fn_name == "gt" || fn_name == "ge") {
+      // We need to get max stats.
+      stats_field = ColumnStatsReader::StatsField::MAX;
+    } else {
+      DCHECK(false) << "Unsupported function name for statistics evaluation: " 
<< fn_name;
+      continue;
+    }
+
+    void* slot = min_max_tuple_->GetSlot(slot_desc->tuple_offset());
+    bool stats_read = stat_reader.ReadFromThrift(stats_field, slot);
+
+    if (stats_read) {
+      TupleRow row;
+      row.SetTuple(0, min_max_tuple_);
+      if (!ExecNode::EvalPredicate(eval, &row)) {
+        *skip_row_group = true;
+        break;
+      }
+    }
+  }
+
+  // Free any expr result allocations accumulated during conjunct evaluation.
+  context_->expr_results_pool()->Clear();
+  return Status::OK();
+}
+
+Status HdfsParquetScanner::NextRowGroup() {
+  const ScanRange* split_range = static_cast<ScanRangeMetadata*>(
+      metadata_range_->meta_data())->original_split;
+  int64_t split_offset = split_range->offset();
+  int64_t split_length = split_range->len();
+
+  HdfsFileDesc* file_desc = scan_node_->GetFileDesc(
+      context_->partition_descriptor()->id(), filename());
+
+  bool start_with_first_row_group = row_group_idx_ == -1;
+  bool misaligned_row_group_skipped = false;
+
+  advance_row_group_ = false;
+  row_group_rows_read_ = 0;
+
+  // Loop until we have found a non-empty row group, and successfully 
initialized and
+  // seeded the column readers. Return a non-OK status from within loop only 
if the error
+  // is non-recoverable, otherwise log the error and continue with the next 
row group.
+  while (true) {
+    // Reset the parse status for the next row group.
+    parse_status_ = Status::OK();
+    // Make sure that we don't have leftover resources from the file metadata 
scan range
+    // or previous row groups.
+    DCHECK_EQ(0, context_->NumStreams());
+
+    ++row_group_idx_;
+    if (row_group_idx_ >= file_metadata_.row_groups.size()) {
+      if (start_with_first_row_group && misaligned_row_group_skipped) {
+        // We started with the first row group and skipped all the row groups 
because
+        // they were misaligned. The execution flow won't reach this point if 
there is at
+        // least one non-empty row group which this scanner can process.
+        COUNTER_ADD(num_scanners_with_no_reads_counter_, 1);
+      }
+      break;
+    }
+    const parquet::RowGroup& row_group = 
file_metadata_.row_groups[row_group_idx_];
+    // Also check 'file_metadata_.num_rows' to make sure 'select count(*)' and 
'select *'
+    // behave consistently for corrupt files that have 
'file_metadata_.num_rows == 0'
+    // but some data in row groups.
+    if (row_group.num_rows == 0 || file_metadata_.num_rows == 0) continue;
+
+    RETURN_IF_ERROR(ParquetMetadataUtils::ValidateColumnOffsets(
+        file_desc->filename, file_desc->file_length, row_group));
+
+    // A row group is processed by the scanner whose split overlaps with the 
row
+    // group's mid point.
+    int64_t row_group_mid_pos = GetRowGroupMidOffset(row_group);
+    if (!(row_group_mid_pos >= split_offset &&
+        row_group_mid_pos < split_offset + split_length)) {
+      // The mid-point does not fall within the split, this row group will be 
handled by a
+      // different scanner.
+      // If the row group overlaps with the split, we found a misaligned row 
group.
+      misaligned_row_group_skipped |= CheckRowGroupOverlapsSplit(row_group, 
split_range);
+      continue;
+    }
+
+    COUNTER_ADD(num_row_groups_counter_, 1);
+
+    // Evaluate row group statistics.
+    bool skip_row_group_on_stats;
+    RETURN_IF_ERROR(
+        EvaluateStatsConjuncts(file_metadata_, row_group, 
&skip_row_group_on_stats));
+    if (skip_row_group_on_stats) {
+      COUNTER_ADD(num_stats_filtered_row_groups_counter_, 1);
+      continue;
+    }
+
+    InitCollectionColumns();
+    RETURN_IF_ERROR(InitScalarColumns());
+
+    // Start scanning dictionary filtering column readers, so we can read the 
dictionary
+    // pages in EvalDictionaryFilters().
+    
RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(dict_filterable_readers_));
+
+    // StartScans() may have allocated resources to scan columns. If we skip 
this row
+    // group below, we must call ReleaseSkippedRowGroupResources() before 
continuing.
+
+    // If there is a dictionary-encoded column where every value is eliminated
+    // by a conjunct, the row group can be eliminated. This initializes 
dictionaries
+    // for all columns visited.
+    bool skip_row_group_on_dict_filters;
+    Status status = EvalDictionaryFilters(row_group, 
&skip_row_group_on_dict_filters);
+    if (!status.ok()) {
+      // Either return an error or skip this row group if it is ok to ignore 
errors
+      RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
+      ReleaseSkippedRowGroupResources();
+      continue;
+    }
+    if (skip_row_group_on_dict_filters) {
+      COUNTER_ADD(num_dict_filtered_row_groups_counter_, 1);
+      ReleaseSkippedRowGroupResources();
+      continue;
+    }
+
+    // At this point, the row group has passed any filtering criteria
+    // Start scanning non-dictionary filtering column readers and initialize 
their
+    // dictionaries.
+    
RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(non_dict_filterable_readers_));
+    status = 
BaseScalarColumnReader::InitDictionaries(non_dict_filterable_readers_);
+    if (!status.ok()) {
+      // Either return an error or skip this row group if it is ok to ignore 
errors
+      RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
+      ReleaseSkippedRowGroupResources();
+      continue;
+    }
+    DCHECK(parse_status_.ok()) << "Invalid parse_status_" << 
parse_status_.GetDetail();
+    break;
+  }
+  DCHECK(parse_status_.ok());
+  return Status::OK();
+}
+
+void HdfsParquetScanner::FlushRowGroupResources(RowBatch* row_batch) {
+  DCHECK(row_batch != nullptr);
+  row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
+  scratch_batch_->ReleaseResources(row_batch->tuple_data_pool());
+  context_->ReleaseCompletedResources(true);
+  for (ParquetColumnReader* col_reader : column_readers_) 
col_reader->Close(row_batch);
+  context_->ClearStreams();
+}
+
+void HdfsParquetScanner::ReleaseSkippedRowGroupResources() {
+  dictionary_pool_->FreeAll();
+  scratch_batch_->ReleaseResources(nullptr);
+  context_->ReleaseCompletedResources(true);
+  for (ParquetColumnReader* col_reader : column_readers_) 
col_reader->Close(nullptr);
+  context_->ClearStreams();
+}
+
+bool HdfsParquetScanner::IsDictFilterable(BaseScalarColumnReader* col_reader) {
+  const SlotDescriptor* slot_desc = col_reader->slot_desc();
+  // Some queries do not need the column to be materialized, so slot_desc is 
NULL.
+  // For example, a count(*) with no predicates only needs to count records
+  // rather than materializing the values.
+  if (!slot_desc) return false;
+  // Does this column reader have any dictionary filter conjuncts?
+  auto dict_filter_it = dict_filter_map_.find(slot_desc->id());
+  if (dict_filter_it == dict_filter_map_.end()) return false;
+
+  // Certain datatypes (chars, timestamps) do not have the appropriate value 
in the
+  // file format and must be converted before return. This is true for the
+  // dictionary values, so skip these datatypes for now.
+  // TODO: The values should be converted during dictionary construction and 
stored
+  // in converted form in the dictionary.
+  if (col_reader->NeedsConversion()) return false;
+
+  // Certain datatypes (timestamps) need to validate the value, as certain bit
+  // combinations are not valid. The dictionary values are not validated, so
+  // skip these datatypes for now.
+  // TODO: This should be pushed into dictionary construction.
+  if (col_reader->NeedsValidation()) return false;
+
+  return true;
+}
+
+void HdfsParquetScanner::PartitionReaders(
+    const vector<ParquetColumnReader*>& readers, bool can_eval_dict_filters) {
+  for (auto* reader : readers) {
+    if (reader->IsCollectionReader()) {
+      CollectionColumnReader* col_reader = 
static_cast<CollectionColumnReader*>(reader);
+      collection_readers_.push_back(col_reader);
+      PartitionReaders(*col_reader->children(), can_eval_dict_filters);
+    } else {
+      BaseScalarColumnReader* scalar_reader =
+          static_cast<BaseScalarColumnReader*>(reader);
+      scalar_readers_.push_back(scalar_reader);
+      if (can_eval_dict_filters && IsDictFilterable(scalar_reader)) {
+        dict_filterable_readers_.push_back(scalar_reader);
+      } else {
+        non_dict_filterable_readers_.push_back(scalar_reader);
+      }
+    }
+  }
+}
+
+Status HdfsParquetScanner::InitDictFilterStructures() {
+  bool can_eval_dict_filters =
+      state_->query_options().parquet_dictionary_filtering && 
!dict_filter_map_.empty();
+
+  // Separate column readers into scalar and collection readers.
+  PartitionReaders(column_readers_, can_eval_dict_filters);
+
+  // Allocate tuple buffers for all tuple descriptors that are associated with 
conjuncts
+  // that can be dictionary filtered.
+  for (auto* col_reader : dict_filterable_readers_) {
+    const SlotDescriptor* slot_desc = col_reader->slot_desc();
+    const TupleDescriptor* tuple_desc = slot_desc->parent();
+    auto tuple_it = dict_filter_tuple_map_.find(tuple_desc);
+    if (tuple_it != dict_filter_tuple_map_.end()) continue;
+    int tuple_size = tuple_desc->byte_size();
+    if (tuple_size > 0) {
+      uint8_t* buffer = perm_pool_->TryAllocate(tuple_size);
+      if (buffer == nullptr) {
+        string details = Substitute(
+            PARQUET_MEM_LIMIT_EXCEEDED, "InitDictFilterStructures", tuple_size,
+            "Dictionary Filtering Tuple");
+        return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, 
tuple_size);
+      }
+      dict_filter_tuple_map_[tuple_desc] = reinterpret_cast<Tuple*>(buffer);
+    }
+  }
+  return Status::OK();
+}
+
+bool HdfsParquetScanner::IsDictionaryEncoded(
+    const parquet::ColumnMetaData& col_metadata) {
+  // The Parquet spec allows for column chunks to have mixed encodings
+  // where some data pages are dictionary-encoded and others are plain
+  // encoded. For example, a Parquet file writer might start writing
+  // a column chunk as dictionary encoded, but it will switch to plain
+  // encoding if the dictionary grows too large.
+  //
+  // In order for dictionary filters to skip the entire row group,
+  // the conjuncts must be evaluated on column chunks that are entirely
+  // encoded with the dictionary encoding. There are two checks
+  // available to verify this:
+  // 1. The encoding_stats field on the column chunk metadata provides
+  //    information about the number of data pages written in each
+  //    format. This allows for a specific check of whether all the
+  //    data pages are dictionary encoded.
+  // 2. The encodings field on the column chunk metadata lists the
+  //    encodings used. If this list contains the dictionary encoding
+  //    and does not include unexpected encodings (i.e. encodings not
+  //    associated with definition/repetition levels), then it is entirely
+  //    dictionary encoded.
+
+  if (col_metadata.__isset.encoding_stats) {
+    // Condition #1 above
+    for (const parquet::PageEncodingStats& enc_stat : 
col_metadata.encoding_stats) {
+      if (enc_stat.page_type == parquet::PageType::DATA_PAGE &&
+          enc_stat.encoding != parquet::Encoding::PLAIN_DICTIONARY &&
+          enc_stat.count > 0) {
+        return false;
+      }
+    }
+  } else {
+    // Condition #2 above
+    bool has_dict_encoding = false;
+    bool has_nondict_encoding = false;
+    for (const parquet::Encoding::type& encoding : col_metadata.encodings) {
+      if (encoding == parquet::Encoding::PLAIN_DICTIONARY) has_dict_encoding = 
true;
+
+      // RLE and BIT_PACKED are used for repetition/definition levels
+      if (encoding != parquet::Encoding::PLAIN_DICTIONARY &&
+          encoding != parquet::Encoding::RLE &&
+          encoding != parquet::Encoding::BIT_PACKED) {
+        has_nondict_encoding = true;
+        break;
+      }
+    }
+    // Not entirely dictionary encoded if:
+    // 1. No dictionary encoding listed
+    // OR
+    // 2. Some non-dictionary encoding is listed
+    if (!has_dict_encoding || has_nondict_encoding) return false;
+  }
+
+  return true;
+}
+
+Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& 
row_group,
+    bool* row_group_eliminated) {
+  *row_group_eliminated = false;
+  // Check if there's anything to do here.
+  if (dict_filterable_readers_.empty()) return Status::OK();
+
+  // Legacy impala files (< 2.9) require special handling, because they do not 
encode
+  // information about whether the column is 100% dictionary encoded.
+  bool is_legacy_impala = false;
+  if (file_version_.application == "impala" && file_version_.VersionLt(2,9,0)) 
{
+    is_legacy_impala = true;
+  }
+
+  // Keeps track of column readers that need to be initialized. For example, 
if a
+  // column cannot be filtered, then defer its dictionary initialization once 
we know
+  // the row group cannot be filtered.
+  vector<BaseScalarColumnReader*> deferred_dict_init_list;
+  // Keeps track of the initialized tuple associated with a TupleDescriptor.
+  unordered_map<const TupleDescriptor*, Tuple*> tuple_map;
+  for (BaseScalarColumnReader* scalar_reader : dict_filterable_readers_) {
+    const parquet::ColumnMetaData& col_metadata =
+        row_group.columns[scalar_reader->col_idx()].meta_data;
+
+    // Legacy impala files cannot be eliminated here, because the only way to
+    // determine whether the column is 100% dictionary encoded requires reading
+    // the dictionary.
+    if (!is_legacy_impala && !IsDictionaryEncoded(col_metadata)) {
+      // We cannot guarantee that this reader is 100% dictionary encoded,
+      // so dictionary filters cannot be used. Defer initializing its 
dictionary
+      // until after the other filters have been evaluated.
+      deferred_dict_init_list.push_back(scalar_reader);
+      continue;
+    }
+
+    RETURN_IF_ERROR(scalar_reader->InitDictionary());
+    DictDecoderBase* dictionary = scalar_reader->GetDictionaryDecoder();
+    if (!dictionary) continue;
+
+    // Legacy (version < 2.9) Impala files do not spill to PLAIN encoding until
+    // it reaches the maximum number of dictionary entries. If the dictionary
+    // has fewer entries, then it is 100% dictionary encoded.
+    if (is_legacy_impala &&
+        dictionary->num_entries() >= LEGACY_IMPALA_MAX_DICT_ENTRIES) continue;
+
+    const SlotDescriptor* slot_desc = scalar_reader->slot_desc();
+    DCHECK(slot_desc != nullptr);
+    const TupleDescriptor* tuple_desc = slot_desc->parent();
+    auto dict_filter_it = dict_filter_map_.find(slot_desc->id());
+    DCHECK(dict_filter_it != dict_filter_map_.end());
+    const vector<ScalarExprEvaluator*>& dict_filter_conjunct_evals =
+        dict_filter_it->second;
+    Tuple* dict_filter_tuple = nullptr;
+    auto dict_filter_tuple_it = tuple_map.find(tuple_desc);
+    if (dict_filter_tuple_it == tuple_map.end()) {
+      auto tuple_it = dict_filter_tuple_map_.find(tuple_desc);
+      DCHECK(tuple_it != dict_filter_tuple_map_.end());
+      dict_filter_tuple = tuple_it->second;
+      dict_filter_tuple->Init(tuple_desc->byte_size());
+      tuple_map[tuple_desc] = dict_filter_tuple;
+    } else {
+      dict_filter_tuple = dict_filter_tuple_it->second;
+    }
+
+    DCHECK(dict_filter_tuple != nullptr);
+    void* slot = dict_filter_tuple->GetSlot(slot_desc->tuple_offset());
+    bool column_has_match = false;
+    for (int dict_idx = 0; dict_idx < dictionary->num_entries(); ++dict_idx) {
+      if (dict_idx % 1024 == 0) {
+        // Don't let expr result allocations accumulate too much for large 
dictionaries or
+        // many row groups.
+        context_->expr_results_pool()->Clear();
+      }
+      dictionary->GetValue(dict_idx, slot);
+
+      // We can only eliminate this row group if no value from the dictionary 
matches.
+      // If any dictionary value passes the conjuncts, then move on to the 
next column.
+      TupleRow row;
+      row.SetTuple(0, dict_filter_tuple);
+      if (ExecNode::EvalConjuncts(dict_filter_conjunct_evals.data(),
+              dict_filter_conjunct_evals.size(), &row)) {
+        column_has_match = true;
+        break;
+      }
+    }
+    // Free all expr result allocations now that we're done with the filter.
+    context_->expr_results_pool()->Clear();
+
+    if (!column_has_match) {
+      // The column contains no value that matches the conjunct. The row group
+      // can be eliminated.
+      *row_group_eliminated = true;
+      return Status::OK();
+    }
+  }
+
+  // Any columns that were not 100% dictionary encoded need to initialize
+  // their dictionaries here.
+  
RETURN_IF_ERROR(BaseScalarColumnReader::InitDictionaries(deferred_dict_init_list));
+
+  return Status::OK();
+}
+
+/// High-level steps of this function:
+/// 1. Allocate 'scratch' memory for tuples able to hold a full batch
+/// 2. Populate the slots of all scratch tuples one column reader at a time,
+///    using the ColumnReader::Read*ValueBatch() functions.
+/// 3. Evaluate runtime filters and conjuncts against the scratch tuples and
+///    set the surviving tuples in the output batch. Transfer the ownership of
+///    scratch memory to the output batch once the scratch memory is exhausted.
+/// 4. Repeat steps above until we are done with the row group or an error
+///    occurred.
+/// TODO: Since the scratch batch is populated in a column-wise fashion, it is
+/// difficult to maintain a maximum memory footprint without throwing away at 
least
+/// some work. This point needs further experimentation and thought.
+Status HdfsParquetScanner::AssembleRows(
+    const vector<ParquetColumnReader*>& column_readers, RowBatch* row_batch,
+    bool* skip_row_group) {
+  DCHECK(!column_readers.empty());
+  DCHECK(row_batch != nullptr);
+  DCHECK_EQ(*skip_row_group, false);
+  DCHECK(scratch_batch_ != nullptr);
+
+  int64_t num_rows_read = 0;
+  while (!column_readers[0]->RowGroupAtEnd()) {
+    // Start a new scratch batch.
+    RETURN_IF_ERROR(scratch_batch_->Reset(state_));
+    InitTupleBuffer(template_tuple_, scratch_batch_->tuple_mem, 
scratch_batch_->capacity);
+
+    // Materialize the top-level slots into the scratch batch column-by-column.
+    int last_num_tuples = -1;
+    for (int c = 0; c < column_readers.size(); ++c) {
+      ParquetColumnReader* col_reader = column_readers[c];
+      bool continue_execution;
+      if (col_reader->max_rep_level() > 0) {
+        continue_execution = 
col_reader->ReadValueBatch(&scratch_batch_->aux_mem_pool,
+            scratch_batch_->capacity, tuple_byte_size_, 
scratch_batch_->tuple_mem,
+            &scratch_batch_->num_tuples);
+      } else {
+        continue_execution = col_reader->ReadNonRepeatedValueBatch(
+            &scratch_batch_->aux_mem_pool, scratch_batch_->capacity, 
tuple_byte_size_,
+            scratch_batch_->tuple_mem, &scratch_batch_->num_tuples);
+      }
+      // Check that all column readers populated the same number of values.
+      bool num_tuples_mismatch = c != 0 && last_num_tuples != 
scratch_batch_->num_tuples;
+      if (UNLIKELY(!continue_execution || num_tuples_mismatch)) {
+        // Skipping this row group. Free up all the resources with this row 
group.
+        FlushRowGroupResources(row_batch);
+        scratch_batch_->num_tuples = 0;
+        DCHECK(scratch_batch_->AtEnd());
+        *skip_row_group = true;
+        if (num_tuples_mismatch && continue_execution) {
+          Status err(Substitute("Corrupt Parquet file '$0': column '$1' "
+              "had $2 remaining values but expected $3", filename(),
+              col_reader->schema_element().name, last_num_tuples,
+              scratch_batch_->num_tuples));
+          parse_status_.MergeStatus(err);
+        }
+        return Status::OK();
+      }
+      last_num_tuples = scratch_batch_->num_tuples;
+    }
+    num_rows_read += scratch_batch_->num_tuples;
+    int num_row_to_commit = TransferScratchTuples(row_batch);
+    RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit));
+    if (row_batch->AtCapacity()) break;
+  }
+  row_group_rows_read_ += num_rows_read;
+  COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
+  // Merge Scanner-local counter into HdfsScanNode counter and reset.
+  COUNTER_ADD(scan_node_->collection_items_read_counter(), 
coll_items_read_counter_);
+  coll_items_read_counter_ = 0;
+  return Status::OK();
+}
+
+Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, int num_rows) {
+  DCHECK(dst_batch != nullptr);
+  dst_batch->CommitRows(num_rows);
+
+  if (context_->cancelled()) return Status::CancelledInternal("Parquet 
scanner");
+  // TODO: It's a really bad idea to propagate UDF error via the global 
RuntimeState.
+  // Store UDF error in thread local storage or make UDF return status so it 
can merge
+  // with parse_status_.
+  RETURN_IF_ERROR(state_->GetQueryStatus());
+  // Clear expr result allocations for this thread to avoid accumulating too 
much
+  // memory from evaluating the scanner conjuncts.
+  context_->expr_results_pool()->Clear();
+  return Status::OK();
+}
+
+int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {
+  // This function must not be called when the output batch is already full. 
As long as
+  // we always call CommitRows() after TransferScratchTuples(), the output 
batch can
+  // never be empty.
+  DCHECK_LT(dst_batch->num_rows(), dst_batch->capacity());
+  DCHECK_EQ(scan_node_->tuple_idx(), 0);
+  DCHECK_EQ(dst_batch->row_desc()->tuple_descriptors().size(), 1);
+  if (scratch_batch_->tuple_byte_size == 0) {
+    Tuple** output_row =
+        reinterpret_cast<Tuple**>(dst_batch->GetRow(dst_batch->num_rows()));
+    // We are materializing a collection with empty tuples. Add a NULL tuple 
to the
+    // output batch per remaining scratch tuple and return. No need to evaluate
+    // filters/conjuncts.
+    DCHECK(filter_ctxs_.empty());
+    DCHECK(conjunct_evals_->empty());
+    int num_tuples = min(dst_batch->capacity() - dst_batch->num_rows(),
+        scratch_batch_->num_tuples - scratch_batch_->tuple_idx);
+    memset(output_row, 0, num_tuples * sizeof(Tuple*));
+    scratch_batch_->tuple_idx += num_tuples;
+    // No data is required to back the empty tuples, so we should not attach 
any data to
+    // these batches.
+    DCHECK_EQ(0, scratch_batch_->total_allocated_bytes());
+    return num_tuples;
+  }
+
+  int num_rows_to_commit;
+  if (codegend_process_scratch_batch_fn_ != nullptr) {
+    num_rows_to_commit = codegend_process_scratch_batch_fn_(this, dst_batch);
+  } else {
+    num_rows_to_commit = ProcessScratchBatch(dst_batch);
+  }
+  scratch_batch_->FinalizeTupleTransfer(dst_batch, num_rows_to_commit);
+  return num_rows_to_commit;
+}
+
+Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node,
+    const vector<ScalarExpr*>& conjuncts, llvm::Function** 
process_scratch_batch_fn) {
+  DCHECK(node->runtime_state()->ShouldCodegen());
+  *process_scratch_batch_fn = nullptr;
+  LlvmCodeGen* codegen = node->runtime_state()->codegen();
+  DCHECK(codegen != nullptr);
+
+  llvm::Function* fn = codegen->GetFunction(IRFunction::PROCESS_SCRATCH_BATCH, 
true);
+  DCHECK(fn != nullptr);
+
+  llvm::Function* eval_conjuncts_fn;
+  RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjuncts, 
&eval_conjuncts_fn));
+  DCHECK(eval_conjuncts_fn != nullptr);
+
+  int replaced = codegen->ReplaceCallSites(fn, eval_conjuncts_fn, 
"EvalConjuncts");
+  DCHECK_REPLACE_COUNT(replaced, 1);
+
+  llvm::Function* eval_runtime_filters_fn;
+  RETURN_IF_ERROR(CodegenEvalRuntimeFilters(
+      codegen, node->filter_exprs(), &eval_runtime_filters_fn));
+  DCHECK(eval_runtime_filters_fn != nullptr);
+
+  replaced = codegen->ReplaceCallSites(fn, eval_runtime_filters_fn, 
"EvalRuntimeFilters");
+  DCHECK_REPLACE_COUNT(replaced, 1);
+
+  fn->setName("ProcessScratchBatch");
+  *process_scratch_batch_fn = codegen->FinalizeFunction(fn);
+  if (*process_scratch_batch_fn == nullptr) {
+    return Status("Failed to finalize process_scratch_batch_fn.");
+  }
+  return Status::OK();
+}
+
+bool HdfsParquetScanner::AssembleCollection(
+    const vector<ParquetColumnReader*>& column_readers, int 
new_collection_rep_level,
+    CollectionValueBuilder* coll_value_builder) {
+  DCHECK(!column_readers.empty());
+  DCHECK_GE(new_collection_rep_level, 0);
+  DCHECK(coll_value_builder != nullptr);
+
+  const TupleDescriptor* tuple_desc = &coll_value_builder->tuple_desc();
+  Tuple* template_tuple = template_tuple_map_[tuple_desc];
+  const vector<ScalarExprEvaluator*> evals =
+      conjunct_evals_map_[tuple_desc->id()];
+
+  int64_t rows_read = 0;
+  bool continue_execution = !scan_node_->ReachedLimit() && 
!context_->cancelled();
+  // Note that this will be set to true at the end of the row group or the end 
of the
+  // current collection (if applicable).
+  bool end_of_collection = column_readers[0]->rep_level() == -1;
+  // We only initialize end_of_collection to true here if we're at the end of 
the row
+  // group (otherwise it would always be true because we're on the "edge" of 
two
+  // collections), and only ProcessSplit() should call AssembleRows() at the 
end of the
+  // row group.
+  if (coll_value_builder != nullptr) DCHECK(!end_of_collection);
+
+  while (!end_of_collection && continue_execution) {
+    MemPool* pool;
+    Tuple* tuple;
+    TupleRow* row = nullptr;
+
+    int64_t num_rows;
+    // We're assembling item tuples into an CollectionValue
+    parse_status_ =
+        GetCollectionMemory(coll_value_builder, &pool, &tuple, &row, 
&num_rows);
+    if (UNLIKELY(!parse_status_.ok())) {
+      continue_execution = false;
+      break;
+    }
+    // 'num_rows' can be very high if we're writing to a large 
CollectionValue. Limit
+    // the number of rows we read at one time so we don't spend too long in the
+    // 'num_rows' loop below before checking for cancellation or limit reached.
+    num_rows = min(
+        num_rows, 
static_cast<int64_t>(scan_node_->runtime_state()->batch_size()));
+
+    int num_to_commit = 0;
+    int row_idx = 0;
+    for (row_idx = 0; row_idx < num_rows && !end_of_collection; ++row_idx) {
+      DCHECK(continue_execution);
+      // A tuple is produced iff the collection that contains its values is 
not empty and
+      // non-NULL. (Empty or NULL collections produce no output values, 
whereas NULL is
+      // output for the fields of NULL structs.)
+      bool materialize_tuple = column_readers[0]->def_level() >=
+          column_readers[0]->def_level_of_immediate_repeated_ancestor();
+      InitTuple(tuple_desc, template_tuple, tuple);
+      continue_execution =
+          ReadCollectionItem(column_readers, materialize_tuple, pool, tuple);
+      if (UNLIKELY(!continue_execution)) break;
+      end_of_collection = column_readers[0]->rep_level() <= 
new_collection_rep_level;
+
+      if (materialize_tuple) {
+        if (ExecNode::EvalConjuncts(evals.data(), evals.size(), row)) {
+          tuple = next_tuple(tuple_desc->byte_size(), tuple);
+          ++num_to_commit;
+        }
+      }
+    }
+
+    rows_read += row_idx;
+    coll_value_builder->CommitTuples(num_to_commit);
+    continue_execution &= !scan_node_->ReachedLimit() && 
!context_->cancelled();
+  }
+  coll_items_read_counter_ += rows_read;
+  if (end_of_collection) {
+    // All column readers should report the start of the same collection.
+    for (int c = 1; c < column_readers.size(); ++c) {
+      FILE_CHECK_EQ(column_readers[c]->rep_level(), 
column_readers[0]->rep_level());
+    }
+  }
+  return continue_execution;
+}
+
+inline bool HdfsParquetScanner::ReadCollectionItem(
+    const vector<ParquetColumnReader*>& column_readers,
+    bool materialize_tuple, MemPool* pool, Tuple* tuple) const {
+  DCHECK(!column_readers.empty());
+  bool continue_execution = true;
+  int size = column_readers.size();
+  for (int c = 0; c < size; ++c) {
+    ParquetColumnReader* col_reader = column_readers[c];
+    if (materialize_tuple) {
+      // All column readers for this tuple should a value to materialize.
+      FILE_CHECK_GE(col_reader->def_level(),
+                    col_reader->def_level_of_immediate_repeated_ancestor());
+      // Fill in position slot if applicable
+      const SlotDescriptor* pos_slot_desc = col_reader->pos_slot_desc();
+      if (pos_slot_desc != nullptr) {
+        col_reader->ReadPositionNonBatched(
+            tuple->GetBigIntSlot(pos_slot_desc->tuple_offset()));
+      }
+      continue_execution = col_reader->ReadValue(pool, tuple);
+    } else {
+      // A containing repeated field is empty or NULL
+      FILE_CHECK_LT(col_reader->def_level(),
+                    col_reader->def_level_of_immediate_repeated_ancestor());
+      continue_execution = col_reader->NextLevels();
+    }
+    if (UNLIKELY(!continue_execution)) break;
+  }
+  return continue_execution;
+}
+
+Status HdfsParquetScanner::ProcessFooter() {
+  const int64_t file_len = stream_->file_desc()->file_length;
+  const int64_t scan_range_len = stream_->scan_range()->len();
+
+  // We're processing the scan range issued in IssueInitialRanges(). The scan 
range should
+  // be the last FOOTER_BYTES of the file. !success means the file is shorter 
than we
+  // expect. Note we can't detect if the file is larger than we expect without 
attempting
+  // to read past the end of the scan range, but in this case we'll fail below 
trying to
+  // parse the footer.
+  DCHECK_LE(scan_range_len, FOOTER_SIZE);
+  uint8_t* buffer;
+  bool success = stream_->ReadBytes(scan_range_len, &buffer, &parse_status_);
+  if (!success) {
+    DCHECK(!parse_status_.ok());
+    if (parse_status_.code() == TErrorCode::SCANNER_INCOMPLETE_READ) {
+      VLOG_QUERY << "Metadata for file '" << filename() << "' appears stale: "
+                 << "metadata states file size to be "
+                 << PrettyPrinter::Print(file_len, TUnit::BYTES)
+                 << ", but could only read "
+                 << PrettyPrinter::Print(stream_->total_bytes_returned(), 
TUnit::BYTES);
+      return Status(TErrorCode::STALE_METADATA_FILE_TOO_SHORT, filename(),
+          scan_node_->hdfs_table()->fully_qualified_name());
+    }
+    return parse_status_;
+  }
+  DCHECK(stream_->eosr());
+
+  // Number of bytes in buffer after the fixed size footer is accounted for.
+  int remaining_bytes_buffered = scan_range_len - sizeof(int32_t) -
+      sizeof(PARQUET_VERSION_NUMBER);
+
+  // Make sure footer has enough bytes to contain the required information.
+  if (remaining_bytes_buffered < 0) {
+    return Status(Substitute("File '$0' is invalid. Missing metadata.", 
filename()));
+  }
+
+  // Validate magic file bytes are correct.
+  uint8_t* magic_number_ptr = buffer + scan_range_len - 
sizeof(PARQUET_VERSION_NUMBER);
+  if (memcmp(magic_number_ptr, PARQUET_VERSION_NUMBER,
+             sizeof(PARQUET_VERSION_NUMBER)) != 0) {
+    return Status(TErrorCode::PARQUET_BAD_VERSION_NUMBER, filename(),
+        string(reinterpret_cast<char*>(magic_number_ptr), 
sizeof(PARQUET_VERSION_NUMBER)),
+        scan_node_->hdfs_table()->fully_qualified_name());
+  }
+
+  // The size of the metadata is encoded as a 4 byte little endian value before
+  // the magic number
+  uint8_t* metadata_size_ptr = magic_number_ptr - sizeof(int32_t);
+  uint32_t metadata_size = *reinterpret_cast<uint32_t*>(metadata_size_ptr);
+  uint8_t* metadata_ptr = metadata_size_ptr - metadata_size;
+  // The start of the metadata is:
+  // file_len - 4-byte footer length field - 4-byte version number field - 
metadata size
+  int64_t metadata_start = file_len - sizeof(int32_t) - 
sizeof(PARQUET_VERSION_NUMBER) -
+      metadata_size;
+
+  // If the metadata was too big, we need to read it into a contiguous buffer 
before
+  // deserializing it.
+  ScopedBuffer metadata_buffer(scan_node_->mem_tracker());
+
+  DCHECK(metadata_range_ != nullptr);
+  if (UNLIKELY(metadata_size > remaining_bytes_buffered)) {
+    // In this case, the metadata is bigger than our guess meaning there are
+    // not enough bytes in the footer range from IssueInitialRanges().
+    // We'll just issue more ranges to the IoMgr that is the actual footer.
+    int64_t partition_id = context_->partition_descriptor()->id();
+    const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, 
filename());
+    DCHECK_EQ(file_desc, stream_->file_desc());
+    if (metadata_start < 0) {
+      return Status(Substitute("File '$0' is invalid. Invalid metadata size in 
file "
+          "footer: $1 bytes. File size: $2 bytes.", filename(), metadata_size, 
file_len));
+    }
+
+    if (!metadata_buffer.TryAllocate(metadata_size)) {
+      string details = Substitute("Could not allocate buffer of $0 bytes for 
Parquet "
+          "metadata for file '$1'.", metadata_size, filename());
+      return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, 
metadata_size);
+    }
+    metadata_ptr = metadata_buffer.buffer();
+
+    // Read the footer into the metadata buffer.
+    ScanRange* metadata_range = scan_node_->AllocateScanRange(
+        metadata_range_->fs(), filename(), metadata_size, metadata_start, 
partition_id,
+        metadata_range_->disk_id(), metadata_range_->expected_local(),
+        BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size));
+
+    unique_ptr<BufferDescriptor> io_buffer;
+    bool needs_buffers;
+    RETURN_IF_ERROR(
+        scan_node_->reader_context()->StartScanRange(metadata_range, 
&needs_buffers));
+    DCHECK(!needs_buffers) << "Already provided a buffer";
+    RETURN_IF_ERROR(metadata_range->GetNext(&io_buffer));
+    DCHECK_EQ(io_buffer->buffer(), metadata_buffer.buffer());
+    DCHECK_EQ(io_buffer->len(), metadata_size);
+    DCHECK(io_buffer->eosr());
+    metadata_range->ReturnBuffer(move(io_buffer));
+  }
+
+  // Deserialize file footer
+  // TODO: this takes ~7ms for a 1000-column table, figure out how to reduce 
this.
+  Status status =
+      DeserializeThriftMsg(metadata_ptr, &metadata_size, true, 
&file_metadata_);
+  if (!status.ok()) {
+    return Status(Substitute("File '$0' of length $1 bytes has invalid file 
metadata "
+        "at file offset $2, Error = $3.", filename(), file_len, metadata_start,
+        status.GetDetail()));
+  }
+
+  RETURN_IF_ERROR(ParquetMetadataUtils::ValidateFileVersion(file_metadata_, 
filename()));
+
+  // IMPALA-3943: Do not throw an error for empty files for backwards 
compatibility.
+  if (file_metadata_.num_rows == 0) {
+    // Warn if the num_rows is inconsistent with the row group metadata.
+    if (!file_metadata_.row_groups.empty()) {
+      bool has_non_empty_row_group = false;
+      for (const parquet::RowGroup& row_group : file_metadata_.row_groups) {
+        if (row_group.num_rows > 0) {
+          has_non_empty_row_group = true;
+          break;
+        }
+      }
+      // Warn if there is at least one non-empty row group.
+      if (has_non_empty_row_group) {
+        ErrorMsg msg(TErrorCode::PARQUET_ZERO_ROWS_IN_NON_EMPTY_FILE, 
filename());
+        state_->LogError(msg);
+      }
+    }
+    return Status::OK();
+  }
+
+  // Parse out the created by application version string
+  if (file_metadata_.__isset.created_by) {
+    file_version_ = ParquetFileVersion(file_metadata_.created_by);
+  }
+  if (file_metadata_.row_groups.empty()) {
+    return Status(
+        Substitute("Invalid file. This file: $0 has no row groups", 
filename()));
+  }
+  if (file_metadata_.num_rows < 0) {
+    return Status(Substitute("Corrupt Parquet file '$0': negative row count $1 
in "
+        "file metadata", filename(), file_metadata_.num_rows));
+  }
+  return Status::OK();
+}
+
+Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& 
tuple_desc,
+    const ParquetSchemaResolver& schema_resolver,
+    vector<ParquetColumnReader*>* column_readers) {
+  DCHECK(column_readers != nullptr);
+  DCHECK(column_readers->empty());
+
+  if (scan_node_->optimize_parquet_count_star()) {
+    // Column readers are not needed because we are not reading from any 
columns if this
+    // optimization is enabled.
+    return Status::OK();
+  }
+
+  // Each tuple can have at most one position slot. We'll process this slot 
desc last.
+  SlotDescriptor* pos_slot_desc = nullptr;
+
+  for (SlotDescriptor* slot_desc: tuple_desc.slots()) {
+    // Skip partition columns
+    if (&tuple_desc == scan_node_->tuple_desc() &&
+        slot_desc->col_pos() < scan_node_->num_partition_keys()) continue;
+
+    SchemaNode* node = nullptr;
+    bool pos_field;
+    bool missing_field;
+    RETURN_IF_ERROR(schema_resolver.ResolvePath(
+        slot_desc->col_path(), &node, &pos_field, &missing_field));
+
+    if (missing_field) {
+      // In this case, we are selecting a column that is not in the file.
+      // Update the template tuple to put a NULL in this slot.
+      Tuple** template_tuple = &template_tuple_map_[&tuple_desc];
+      if (*template_tuple == nullptr) {
+        *template_tuple =
+            Tuple::Create(tuple_desc.byte_size(), template_tuple_pool_.get());
+      }
+      (*template_tuple)->SetNull(slot_desc->null_indicator_offset());
+      continue;
+    }
+
+    if (pos_field) {
+      DCHECK(pos_slot_desc == nullptr)
+          << "There should only be one position slot per tuple";
+      pos_slot_desc = slot_desc;
+      continue;
+    }
+
+    RETURN_IF_ERROR(ParquetMetadataUtils::ValidateColumn(filename(), 
*node->element,
+        slot_desc, state_));
+
+    ParquetColumnReader* col_reader = ParquetColumnReader::Create(
+        *node, slot_desc->type().IsCollectionType(), slot_desc, this);
+    column_readers->push_back(col_reader);
+
+    if (col_reader->IsCollectionReader()) {
+      // Recursively populate col_reader's children
+      DCHECK(slot_desc->collection_item_descriptor() != nullptr);
+      const TupleDescriptor* item_tuple_desc = 
slot_desc->collection_item_descriptor();
+      CollectionColumnReader* collection_reader =
+          static_cast<CollectionColumnReader*>(col_reader);
+      RETURN_IF_ERROR(CreateColumnReaders(
+          *item_tuple_desc, schema_resolver, collection_reader->children()));
+    }
+  }
+
+  if (column_readers->empty()) {
+    // This is either a count(*) over a collection type (count(*) over the 
table is
+    // handled in ProcessFooter()), or no materialized columns appear in this 
file
+    // (e.g. due to schema evolution, or if there's only a position slot). 
Create a single
+    // column reader that we will use to count the number of tuples we should 
output. We
+    // will not read any values from this reader.
+    ParquetColumnReader* reader;
+    RETURN_IF_ERROR(CreateCountingReader(
+        tuple_desc.tuple_path(), schema_resolver, &reader));
+    column_readers->push_back(reader);
+  }
+
+  if (pos_slot_desc != nullptr) {
+    // 'tuple_desc' has a position slot. Use an existing column reader to 
populate it.
+    DCHECK(!column_readers->empty());
+    (*column_readers)[0]->set_pos_slot_desc(pos_slot_desc);
+  }
+
+  return Status::OK();
+}
+
+Status HdfsParquetScanner::CreateCountingReader(const SchemaPath& parent_path,
+    const ParquetSchemaResolver& schema_resolver, ParquetColumnReader** 
reader) {
+  SchemaNode* parent_node;
+  bool pos_field;
+  bool missing_field;
+  RETURN_IF_ERROR(schema_resolver.ResolvePath(
+      parent_path, &parent_node, &pos_field, &missing_field));
+
+  if (missing_field) {
+    // TODO: can we do anything else here?
+    return Status(Substitute("Could not find '$0' in file '$1'.",
+        PrintPath(*scan_node_->hdfs_table(), parent_path), filename()));
+  }
+  DCHECK(!pos_field);
+  DCHECK(parent_path.empty() || parent_node->is_repeated());
+
+  if (!parent_node->children.empty()) {
+    // Find a non-struct (i.e. collection or scalar) child of 'parent_node', 
which we will
+    // use to create the item reader
+    const SchemaNode* target_node = &parent_node->children[0];
+    while (!target_node->children.empty() && !target_node->is_repeated()) {
+      target_node = &target_node->children[0];
+    }
+
+    *reader = ParquetColumnReader::Create(
+        *target_node, target_node->is_repeated(), nullptr, this);
+    if (target_node->is_repeated()) {
+      // Find the closest scalar descendant of 'target_node' via breadth-first 
search, and
+      // create scalar reader to drive 'reader'. We find the closest (i.e. 
least-nested)
+      // descendant as a heuristic for picking a descendant with fewer values, 
so it's
+      // faster to scan.
+      // TODO: use different heuristic than least-nested? Fewest values?
+      const SchemaNode* node = nullptr;
+      queue<const SchemaNode*> nodes;
+      nodes.push(target_node);
+      while (!nodes.empty()) {
+        node = nodes.front();
+        nodes.pop();
+        if (node->children.size() > 0) {
+          for (const SchemaNode& child: node->children) nodes.push(&child);
+        } else {
+          // node is the least-nested scalar descendant of 'target_node'
+          break;
+        }
+      }
+      DCHECK(node->children.empty()) << node->DebugString();
+      CollectionColumnReader* parent_reader =
+          static_cast<CollectionColumnReader*>(*reader);
+      parent_reader->children()->push_back(
+          ParquetColumnReader::Create(*node, false, nullptr, this));
+    }
+  } else {
+    // Special case for a repeated scalar node. The repeated node represents 
both the
+    // parent collection and the child item.
+    *reader = ParquetColumnReader::Create(*parent_node, false, nullptr, this);
+  }
+
+  return Status::OK();
+}
+
+void HdfsParquetScanner::InitCollectionColumns() {
+  for (CollectionColumnReader* col_reader: collection_readers_) {
+    col_reader->Reset();
+  }
+}
+
+Status HdfsParquetScanner::InitScalarColumns() {
+  int64_t partition_id = context_->partition_descriptor()->id();
+  const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, 
filename());
+  DCHECK(file_desc != nullptr);
+  parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
+
+  // Used to validate that the number of values in each reader in 
column_readers_ at the
+  // same SchemaElement is the same.
+  unordered_map<const parquet::SchemaElement*, int> num_values_map;
+  for (BaseScalarColumnReader* scalar_reader : scalar_readers_) {
+    const parquet::ColumnChunk& col_chunk = 
row_group.columns[scalar_reader->col_idx()];
+    auto num_values_it = num_values_map.find(&scalar_reader->schema_element());
+    int num_values = -1;
+    if (num_values_it != num_values_map.end()) {
+      num_values = num_values_it->second;
+    } else {
+      num_values_map[&scalar_reader->schema_element()] = 
col_chunk.meta_data.num_values;
+    }
+    if (num_values != -1 && col_chunk.meta_data.num_values != num_values) {
+      // TODO: improve this error message by saying which columns are 
different,
+      // and also specify column in other error messages as appropriate
+      return Status(TErrorCode::PARQUET_NUM_COL_VALS_ERROR, 
scalar_reader->col_idx(),
+          col_chunk.meta_data.num_values, num_values, filename());
+    }
+    RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, 
row_group_idx_));
+  }
+  RETURN_IF_ERROR(DivideReservationBetweenColumns(scalar_readers_));
+  return Status::OK();
+}
+
+Status HdfsParquetScanner::DivideReservationBetweenColumns(
+    const vector<BaseScalarColumnReader*>& column_readers) {
+  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+  const int64_t min_buffer_size = io_mgr->min_buffer_size();
+  const int64_t max_buffer_size = io_mgr->max_buffer_size();
+  // The HdfsScanNode reservation calculation in the planner ensures that we 
have
+  // reservation for at least one buffer per column.
+  if (context_->total_reservation() < min_buffer_size * column_readers.size()) 
{
+    return Status(TErrorCode::INTERNAL_ERROR,
+        Substitute("Not enough reservation in Parquet scanner for file '$0'. 
Need at "
+                   "least $1 bytes per column for $2 columns but had $3 bytes",
+            filename(), min_buffer_size, column_readers.size(),
+            context_->total_reservation()));
+  }
+
+  vector<int64_t> col_range_lengths(column_readers.size());
+  for (int i = 0; i < column_readers.size(); ++i) {
+    col_range_lengths[i] = column_readers[i]->scan_range()->len();
+  }
+
+  // The scanner-wide stream was used only to read the file footer.  Each 
column has added
+  // its own stream. We can use the total reservation now that 'stream_''s 
resources have
+  // been released. We may benefit from increasing reservation further, so 
let's compute
+  // the ideal reservation to scan all the columns.
+  int64_t ideal_reservation = ComputeIdealReservation(col_range_lengths);
+  if (ideal_reservation > context_->total_reservation()) {
+    context_->TryIncreaseReservation(ideal_reservation);
+  }
+  
scan_node_->runtime_profile()->GetSummaryStatsCounter(ACTUAL_RESERVATION_COUNTER_NAME)->
+      UpdateCounter(context_->total_reservation());
+  
scan_node_->runtime_profile()->GetSummaryStatsCounter(IDEAL_RESERVATION_COUNTER_NAME)->
+      UpdateCounter(ideal_reservation);
+
+  vector<pair<int, int64_t>> tmp_reservations = 
DivideReservationBetweenColumnsHelper(
+      min_buffer_size, max_buffer_size, col_range_lengths, 
context_->total_reservation());
+  for (auto& tmp_reservation : tmp_reservations) {
+    
column_readers[tmp_reservation.first]->set_io_reservation(tmp_reservation.second);
+  }
+  return Status::OK();
+}
+
+int64_t HdfsParquetScanner::ComputeIdealReservation(
+    const vector<int64_t>& col_range_lengths) {
+  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+  int64_t ideal_reservation = 0;
+  for (int64_t len : col_range_lengths) {
+    ideal_reservation += io_mgr->ComputeIdealBufferReservation(len);
+  }
+  return ideal_reservation;
+}
+
+vector<pair<int, int64_t>> 
HdfsParquetScanner::DivideReservationBetweenColumnsHelper(
+    int64_t min_buffer_size, int64_t max_buffer_size,
+    const vector<int64_t>& col_range_lengths, int64_t 
reservation_to_distribute) {
+  // Pair of (column index, reservation allocated).
+  vector<pair<int, int64_t>> tmp_reservations;
+  for (int i = 0; i < col_range_lengths.size(); ++i) 
tmp_reservations.emplace_back(i, 0);
+
+  // Sort in descending order of length, breaking ties by index so that larger 
columns
+  // get allocated reservation first. It is common to have dramatically 
different column
+  // sizes in a single file because of different value sizes and 
compressibility. E.g.
+  // consider a large STRING "comment" field versus a highly compressible
+  // dictionary-encoded column with only a few distinct values. We want to 
give max-sized
+  // buffers to large columns first to maximize the size of I/Os that we do 
while reading
+  // this row group.
+  sort(tmp_reservations.begin(), tmp_reservations.end(),
+      [&col_range_lengths](
+          const pair<int, int64_t>& left, const pair<int, int64_t>& right) {
+        int64_t left_len = col_range_lengths[left.first];
+        int64_t right_len = col_range_lengths[right.first];
+        return left_len != right_len ? left_len > right_len : left.first < 
right.first;
+      });
+
+  // Set aside the minimum reservation per column.
+  reservation_to_distribute -= min_buffer_size * col_range_lengths.size();
+
+  // Allocate reservations to columns by repeatedly allocating either a 
max-sized buffer
+  // or a large enough buffer to fit the remaining data for each column. Do 
this
+  // round-robin up to the ideal number of I/O buffers.
+  for (int i = 0; i < DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE; ++i) {
+    for (auto& tmp_reservation : tmp_reservations) {
+      // Add back the reservation we set aside above.
+      if (i == 0) reservation_to_distribute += min_buffer_size;
+
+      int64_t bytes_left_in_range =
+          col_range_lengths[tmp_reservation.first] - tmp_reservation.second;
+      int64_t bytes_to_add;
+      if (bytes_left_in_range >= max_buffer_size) {
+        if (reservation_to_distribute >= max_buffer_size) {
+          bytes_to_add = max_buffer_size;
+        } else if (i == 0) {
+          DCHECK_EQ(0, tmp_reservation.second);
+          // Ensure this range gets at least one buffer on the first iteration.
+          bytes_to_add = 
BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute);
+        } else {
+          DCHECK_GT(tmp_reservation.second, 0);
+          // We need to read more than the max buffer size, but can't allocate 
a
+          // max-sized buffer. Stop adding buffers to this column: we prefer 
to use
+          // the existing max-sized buffers without small buffers mixed in so 
that
+          // we will alway do max-sized I/Os, which make efficient use of I/O 
devices.
+          bytes_to_add = 0;
+        }
+      } else if (bytes_left_in_range > 0 &&
+          reservation_to_distribute >= min_buffer_size) {
+        // Choose a buffer size that will fit the rest of the bytes left in 
the range.
+        bytes_to_add =
+            max(min_buffer_size, 
BitUtil::RoundUpToPowerOfTwo(bytes_left_in_range));
+        // But don't add more reservation than is available.
+        bytes_to_add =
+            min(bytes_to_add, 
BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute));
+      } else {
+        bytes_to_add = 0;
+      }
+      DCHECK(bytes_to_add == 0 || bytes_to_add >= min_buffer_size) << 
bytes_to_add;
+      reservation_to_distribute -= bytes_to_add;
+      tmp_reservation.second += bytes_to_add;
+      DCHECK_GE(reservation_to_distribute, 0);
+      DCHECK_GT(tmp_reservation.second, 0);
+    }
+  }
+  return tmp_reservations;
+}
+
+Status HdfsParquetScanner::InitDictionaries(
+    const vector<BaseScalarColumnReader*>& column_readers) {
+  for (BaseScalarColumnReader* scalar_reader : column_readers) {
+    RETURN_IF_ERROR(scalar_reader->InitDictionary());
+  }
+  return Status::OK();
+}
+
+Status HdfsParquetScanner::ValidateEndOfRowGroup(
+    const vector<ParquetColumnReader*>& column_readers, int row_group_idx,
+    int64_t rows_read) {
+  DCHECK(!column_readers.empty());
+  DCHECK(parse_status_.ok()) << "Don't overwrite parse_status_"
+      << parse_status_.GetDetail();
+
+  if (column_readers[0]->max_rep_level() == 0) {
+    // These column readers materialize table-level values (vs. collection 
values). Test
+    // if the expected number of rows from the file metadata matches the 
actual number of
+    // rows read from the file.
+    int64_t expected_rows_in_group = 
file_metadata_.row_groups[row_group_idx].num_rows;
+    if (rows_read != expected_rows_in_group) {
+      return Status(TErrorCode::PARQUET_GROUP_ROW_COUNT_ERROR, filename(), 
row_group_idx,
+          expected_rows_in_group, rows_read);
+    }
+  }
+
+  // Validate scalar column readers' state
+  int num_values_read = -1;
+  for (int c = 0; c < column_readers.size(); ++c) {
+    if (column_readers[c]->IsCollectionReader()) continue;
+    BaseScalarColumnReader* reader =
+        static_cast<BaseScalarColumnReader*>(column_readers[c]);
+    // All readers should have exhausted the final data page. This could fail 
if one
+    // column has more values than stated in the metadata, meaning the final 
data page
+    // will still have unread values.
+    if (reader->num_buffered_values_ != 0) {
+      return Status(Substitute("Corrupt Parquet metadata in file '$0': 
metadata reports "
+          "'$1' more values in data page than actually present", filename(),
+          reader->num_buffered_values_));
+    }
+    // Sanity check that the num_values_read_ value is the same for all 
readers. All
+    // readers should have been advanced in lockstep (the above check is more 
likely to
+    // fail if this not the case though, since num_values_read_ is only 
updated at the end
+    // of a data page).
+    if (num_values_read == -1) num_values_read = reader->num_values_read_;
+    DCHECK_EQ(reader->num_values_read_, num_values_read);
+    // ReadDataPage() uses metadata_->num_values to determine when the 
column's done
+    DCHECK(reader->num_values_read_ == reader->metadata_->num_values ||
+        !state_->abort_on_error());
+  }
+  return Status::OK();
+}
+
+ParquetTimestampDecoder HdfsParquetScanner::CreateTimestampDecoder(
+    const parquet::SchemaElement& element) {
+  bool timestamp_conversion_needed_for_int96_timestamps =
+      FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
+      file_version_.application == "parquet-mr";
+
+  return ParquetTimestampDecoder(element, &state_->local_time_zone(),
+      timestamp_conversion_needed_for_int96_timestamps);
+}
+}

Reply via email to