http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc
b/be/src/exec/parquet/hdfs-parquet-scanner.cc
new file mode 100644
index 0000000..9cfcdb8
--- /dev/null
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -0,0 +1,1684 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/parquet/hdfs-parquet-scanner.h"
+
+#include <algorithm>
+#include <queue>
+
+#include <gflags/gflags.h>
+#include <gutil/strings/substitute.h>
+
+#include "codegen/codegen-anyval.h"
+#include "exec/hdfs-scan-node.h"
+#include "exec/parquet/parquet-collection-column-reader.h"
+#include "exec/parquet/parquet-column-readers.h"
+#include "exec/parquet/parquet-column-stats.h"
+#include "exec/scanner-context.inline.h"
+#include "rpc/thrift-util.h"
+#include "runtime/collection-value-builder.h"
+#include "runtime/exec-env.h"
+#include "runtime/io/disk-io-mgr.h"
+#include "runtime/io/request-context.h"
+#include "runtime/runtime-filter.inline.h"
+#include "runtime/runtime-state.h"
+#include "util/dict-encoding.h"
+
+#include "common/names.h"
+
+DECLARE_bool(convert_legacy_hive_parquet_utc_timestamps);
+
+using std::move;
+using std::sort;
+using namespace impala;
+using namespace impala::io;
+
+// Max entries in the dictionary before switching to PLAIN encoding. If a
dictionary
+// has fewer entries, then the entire column is dictionary encoded. This
threshold
+// is guaranteed to be true for Impala versions 2.9 or below.
+// THIS RECORDS INFORMATION ABOUT PAST BEHAVIOR. DO NOT CHANGE THIS CONSTANT.
+const int LEGACY_IMPALA_MAX_DICT_ENTRIES = 40000;
+
+const char* HdfsParquetScanner::LLVM_CLASS_NAME =
"class.impala::HdfsParquetScanner";
+
+static const string PARQUET_MEM_LIMIT_EXCEEDED =
+ "HdfsParquetScanner::$0() failed to allocate $1 bytes for $2.";
+
+namespace impala {
+
+static const string IDEAL_RESERVATION_COUNTER_NAME =
"ParquetRowGroupIdealReservation";
+static const string ACTUAL_RESERVATION_COUNTER_NAME =
"ParquetRowGroupActualReservation";
+
+Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
+ const vector<HdfsFileDesc*>& files) {
+ DCHECK(!files.empty());
+ // Add Parquet-specific counters.
+ ADD_SUMMARY_STATS_COUNTER(
+ scan_node->runtime_profile(), IDEAL_RESERVATION_COUNTER_NAME,
TUnit::BYTES);
+ ADD_SUMMARY_STATS_COUNTER(
+ scan_node->runtime_profile(), ACTUAL_RESERVATION_COUNTER_NAME,
TUnit::BYTES);
+
+ for (HdfsFileDesc* file : files) {
+ // If the file size is less than 12 bytes, it is an invalid Parquet file.
+ if (file->file_length < 12) {
+ return Status(Substitute("Parquet file $0 has an invalid file length:
$1",
+ file->filename, file->file_length));
+ }
+ }
+ return IssueFooterRanges(scan_node, THdfsFileFormat::PARQUET, files);
+}
+
+HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node,
RuntimeState* state)
+ : HdfsScanner(scan_node, state),
+ row_group_idx_(-1),
+ row_group_rows_read_(0),
+ advance_row_group_(true),
+ min_max_tuple_(nullptr),
+ row_batches_produced_(0),
+ scratch_batch_(new ScratchTupleBatch(
+ *scan_node->row_desc(), state_->batch_size(),
scan_node->mem_tracker())),
+ metadata_range_(nullptr),
+ dictionary_pool_(new MemPool(scan_node->mem_tracker())),
+ assemble_rows_timer_(scan_node_->materialize_tuple_timer()),
+ process_footer_timer_stats_(nullptr),
+ num_cols_counter_(nullptr),
+ num_stats_filtered_row_groups_counter_(nullptr),
+ num_row_groups_counter_(nullptr),
+ num_scanners_with_no_reads_counter_(nullptr),
+ num_dict_filtered_row_groups_counter_(nullptr),
+ coll_items_read_counter_(0),
+ codegend_process_scratch_batch_fn_(nullptr) {
+ assemble_rows_timer_.Stop();
+}
+
+Status HdfsParquetScanner::Open(ScannerContext* context) {
+ RETURN_IF_ERROR(HdfsScanner::Open(context));
+ metadata_range_ = stream_->scan_range();
+ num_cols_counter_ =
+ ADD_COUNTER(scan_node_->runtime_profile(), "NumColumns", TUnit::UNIT);
+ num_stats_filtered_row_groups_counter_ =
+ ADD_COUNTER(scan_node_->runtime_profile(), "NumStatsFilteredRowGroups",
+ TUnit::UNIT);
+ num_row_groups_counter_ =
+ ADD_COUNTER(scan_node_->runtime_profile(), "NumRowGroups", TUnit::UNIT);
+ num_scanners_with_no_reads_counter_ =
+ ADD_COUNTER(scan_node_->runtime_profile(), "NumScannersWithNoReads",
TUnit::UNIT);
+ num_dict_filtered_row_groups_counter_ =
+ ADD_COUNTER(scan_node_->runtime_profile(), "NumDictFilteredRowGroups",
TUnit::UNIT);
+ process_footer_timer_stats_ =
+ ADD_SUMMARY_STATS_TIMER(scan_node_->runtime_profile(),
"FooterProcessingTime");
+
+ codegend_process_scratch_batch_fn_ = reinterpret_cast<ProcessScratchBatchFn>(
+ scan_node_->GetCodegenFn(THdfsFileFormat::PARQUET));
+ if (codegend_process_scratch_batch_fn_ == nullptr) {
+ scan_node_->IncNumScannersCodegenDisabled();
+ } else {
+ scan_node_->IncNumScannersCodegenEnabled();
+ }
+
+ perm_pool_.reset(new MemPool(scan_node_->mem_tracker()));
+
+ // Allocate tuple buffer to evaluate conjuncts on parquet::Statistics.
+ const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
+ if (min_max_tuple_desc != nullptr) {
+ int64_t tuple_size = min_max_tuple_desc->byte_size();
+ uint8_t* buffer = perm_pool_->TryAllocate(tuple_size);
+ if (buffer == nullptr) {
+ string details = Substitute("Could not allocate buffer of $0 bytes for
Parquet "
+ "statistics tuple for file '$1'.", tuple_size, filename());
+ return scan_node_->mem_tracker()->MemLimitExceeded(state_, details,
tuple_size);
+ }
+ min_max_tuple_ = reinterpret_cast<Tuple*>(buffer);
+ }
+
+ // Clone the min/max statistics conjuncts.
+ RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, state_,
+ expr_perm_pool_.get(), context_->expr_results_pool(),
+ scan_node_->min_max_conjunct_evals(), &min_max_conjunct_evals_));
+
+ for (int i = 0; i < context->filter_ctxs().size(); ++i) {
+ const FilterContext* ctx = &context->filter_ctxs()[i];
+ DCHECK(ctx->filter != nullptr);
+ filter_ctxs_.push_back(ctx);
+ }
+ filter_stats_.resize(filter_ctxs_.size());
+
+ DCHECK(parse_status_.ok()) << "Invalid parse_status_" <<
parse_status_.GetDetail();
+
+ // Each scan node can process multiple splits. Each split processes the
footer once.
+ // We use a timer to measure the time taken to ProcessFooter() per split and
add
+ // this time to the averaged timer.
+ MonotonicStopWatch single_footer_process_timer;
+ single_footer_process_timer.Start();
+ // First process the file metadata in the footer.
+ Status footer_status = ProcessFooter();
+ single_footer_process_timer.Stop();
+
+
process_footer_timer_stats_->UpdateCounter(single_footer_process_timer.ElapsedTime());
+
+ // Release I/O buffers immediately to make sure they are cleaned up
+ // in case we return a non-OK status anywhere below.
+ stream_ = nullptr;
+ context_->ReleaseCompletedResources(true);
+ context_->ClearStreams();
+ RETURN_IF_ERROR(footer_status);
+
+ // Parse the file schema into an internal representation for schema
resolution.
+ schema_resolver_.reset(new ParquetSchemaResolver(*scan_node_->hdfs_table(),
+ state_->query_options().parquet_fallback_schema_resolution,
+ state_->query_options().parquet_array_resolution));
+ RETURN_IF_ERROR(schema_resolver_->Init(&file_metadata_, filename()));
+
+ // We've processed the metadata and there are columns that need to be
materialized.
+ RETURN_IF_ERROR(CreateColumnReaders(
+ *scan_node_->tuple_desc(), *schema_resolver_, &column_readers_));
+ COUNTER_SET(num_cols_counter_,
+ static_cast<int64_t>(CountScalarColumns(column_readers_)));
+ // Set top-level template tuple.
+ template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
+
+ RETURN_IF_ERROR(InitDictFilterStructures());
+ return Status::OK();
+}
+
+void HdfsParquetScanner::Close(RowBatch* row_batch) {
+ DCHECK(!is_closed_);
+ if (row_batch != nullptr) {
+ FlushRowGroupResources(row_batch);
+ row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(),
false);
+ if (scan_node_->HasRowBatchQueue()) {
+ static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(
+ unique_ptr<RowBatch>(row_batch));
+ }
+ } else {
+ template_tuple_pool_->FreeAll();
+ dictionary_pool_->FreeAll();
+ context_->ReleaseCompletedResources(true);
+ for (ParquetColumnReader* col_reader : column_readers_)
col_reader->Close(nullptr);
+ // The scratch batch may still contain tuple data. We can get into this
case if
+ // Open() fails or if the query is cancelled.
+ scratch_batch_->ReleaseResources(nullptr);
+ }
+ if (perm_pool_ != nullptr) {
+ perm_pool_->FreeAll();
+ perm_pool_.reset();
+ }
+
+ // Verify all resources (if any) have been transferred.
+ DCHECK_EQ(template_tuple_pool_->total_allocated_bytes(), 0);
+ DCHECK_EQ(dictionary_pool_->total_allocated_bytes(), 0);
+ DCHECK_EQ(scratch_batch_->total_allocated_bytes(), 0);
+
+ // Collect compression types for reporting completed ranges.
+ vector<THdfsCompression::type> compression_types;
+ stack<ParquetColumnReader*> readers;
+ for (ParquetColumnReader* r: column_readers_) readers.push(r);
+ while (!readers.empty()) {
+ ParquetColumnReader* reader = readers.top();
+ readers.pop();
+ if (reader->IsCollectionReader()) {
+ CollectionColumnReader* coll_reader =
static_cast<CollectionColumnReader*>(reader);
+ for (ParquetColumnReader* r: *coll_reader->children()) readers.push(r);
+ continue;
+ }
+ BaseScalarColumnReader* scalar_reader =
static_cast<BaseScalarColumnReader*>(reader);
+ compression_types.push_back(scalar_reader->codec());
+ }
+ assemble_rows_timer_.Stop();
+ assemble_rows_timer_.ReleaseCounter();
+
+ // If this was a metadata only read (i.e. count(*)), there are no columns.
+ if (compression_types.empty()) {
+ compression_types.push_back(THdfsCompression::NONE);
+ scan_node_->RangeComplete(THdfsFileFormat::PARQUET, compression_types,
true);
+ } else {
+ scan_node_->RangeComplete(THdfsFileFormat::PARQUET, compression_types);
+ }
+ if (schema_resolver_.get() != nullptr) schema_resolver_.reset();
+
+ ScalarExprEvaluator::Close(min_max_conjunct_evals_, state_);
+
+ for (int i = 0; i < filter_ctxs_.size(); ++i) {
+ const FilterStats* stats = filter_ctxs_[i]->stats;
+ const LocalFilterStats& local = filter_stats_[i];
+ stats->IncrCounters(FilterStats::ROWS_KEY, local.total_possible,
+ local.considered, local.rejected);
+ }
+
+ CloseInternal();
+}
+
+// Get the start of the column.
+static int64_t GetColumnStartOffset(const parquet::ColumnMetaData& column) {
+ if (column.__isset.dictionary_page_offset) {
+ DCHECK_LT(column.dictionary_page_offset, column.data_page_offset);
+ return column.dictionary_page_offset;
+ }
+ return column.data_page_offset;
+}
+
+// Get the file offset of the middle of the row group.
+static int64_t GetRowGroupMidOffset(const parquet::RowGroup& row_group) {
+ int64_t start_offset = GetColumnStartOffset(row_group.columns[0].meta_data);
+
+ const parquet::ColumnMetaData& last_column =
+ row_group.columns[row_group.columns.size() - 1].meta_data;
+ int64_t end_offset =
+ GetColumnStartOffset(last_column) + last_column.total_compressed_size;
+
+ return start_offset + (end_offset - start_offset) / 2;
+}
+
+// Returns true if 'row_group' overlaps with 'split_range'.
+static bool CheckRowGroupOverlapsSplit(const parquet::RowGroup& row_group,
+ const ScanRange* split_range) {
+ int64_t row_group_start =
GetColumnStartOffset(row_group.columns[0].meta_data);
+
+ const parquet::ColumnMetaData& last_column =
+ row_group.columns[row_group.columns.size() - 1].meta_data;
+ int64_t row_group_end =
+ GetColumnStartOffset(last_column) + last_column.total_compressed_size;
+
+ int64_t split_start = split_range->offset();
+ int64_t split_end = split_start + split_range->len();
+
+ return (split_start >= row_group_start && split_start < row_group_end) ||
+ (split_end > row_group_start && split_end <= row_group_end) ||
+ (split_start <= row_group_start && split_end >= row_group_end);
+}
+
+int HdfsParquetScanner::CountScalarColumns(
+ const vector<ParquetColumnReader*>& column_readers) {
+ DCHECK(!column_readers.empty() || scan_node_->optimize_parquet_count_star());
+ int num_columns = 0;
+ stack<ParquetColumnReader*> readers;
+ for (ParquetColumnReader* r: column_readers_) readers.push(r);
+ while (!readers.empty()) {
+ ParquetColumnReader* col_reader = readers.top();
+ readers.pop();
+ if (col_reader->IsCollectionReader()) {
+ CollectionColumnReader* collection_reader =
+ static_cast<CollectionColumnReader*>(col_reader);
+ for (ParquetColumnReader* r: *collection_reader->children())
readers.push(r);
+ continue;
+ }
+ ++num_columns;
+ }
+ return num_columns;
+}
+
+Status HdfsParquetScanner::ProcessSplit() {
+ DCHECK(scan_node_->HasRowBatchQueue());
+ HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_);
+ do {
+ if (FilterContext::CheckForAlwaysFalse(FilterStats::SPLITS_KEY,
+ context_->filter_ctxs())) {
+ eos_ = true;
+ break;
+ }
+ unique_ptr<RowBatch> batch = make_unique<RowBatch>(scan_node_->row_desc(),
+ state_->batch_size(), scan_node_->mem_tracker());
+ Status status = GetNextInternal(batch.get());
+ // Always add batch to the queue because it may contain data referenced by
previously
+ // appended batches.
+ scan_node->AddMaterializedRowBatch(move(batch));
+ RETURN_IF_ERROR(status);
+ ++row_batches_produced_;
+ if ((row_batches_produced_ & (BATCHES_PER_FILTER_SELECTIVITY_CHECK - 1))
== 0) {
+ CheckFiltersEffectiveness();
+ }
+ } while (!eos_ && !scan_node_->ReachedLimit());
+ return Status::OK();
+}
+
+Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
+ DCHECK(parse_status_.ok()) << parse_status_.GetDetail();
+ if (scan_node_->optimize_parquet_count_star()) {
+ // Populate the single slot with the Parquet num rows statistic.
+ int64_t tuple_buf_size;
+ uint8_t* tuple_buf;
+ // We try to allocate a smaller row batch here because in most cases the
number row
+ // groups in a file is much lower than the default row batch capacity.
+ int capacity = min(
+ static_cast<int>(file_metadata_.row_groups.size()),
row_batch->capacity());
+ RETURN_IF_ERROR(RowBatch::ResizeAndAllocateTupleBuffer(state_,
+ row_batch->tuple_data_pool(), row_batch->row_desc()->GetRowSize(),
+ &capacity, &tuple_buf_size, &tuple_buf));
+ while (!row_batch->AtCapacity()) {
+ RETURN_IF_ERROR(NextRowGroup());
+ DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size());
+ DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows);
+ if (row_group_idx_ == file_metadata_.row_groups.size()) break;
+ Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf);
+ TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow());
+ InitTuple(template_tuple_, dst_tuple);
+ int64_t* dst_slot =
+
dst_tuple->GetBigIntSlot(scan_node_->parquet_count_star_slot_offset());
+ *dst_slot = file_metadata_.row_groups[row_group_idx_].num_rows;
+ row_group_rows_read_ += *dst_slot;
+ dst_row->SetTuple(0, dst_tuple);
+ row_batch->CommitLastRow();
+ tuple_buf += scan_node_->tuple_desc()->byte_size();
+ }
+ eos_ = row_group_idx_ == file_metadata_.row_groups.size();
+ return Status::OK();
+ } else if (scan_node_->IsZeroSlotTableScan()) {
+ // There are no materialized slots and we are not optimizing count(*), e.g.
+ // "select 1 from alltypes". We can serve this query from just the file
metadata.
+ // We don't need to read the column data.
+ if (row_group_rows_read_ == file_metadata_.num_rows) {
+ eos_ = true;
+ return Status::OK();
+ }
+ assemble_rows_timer_.Start();
+ DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows);
+ int64_t rows_remaining = file_metadata_.num_rows - row_group_rows_read_;
+ int max_tuples = min<int64_t>(row_batch->capacity(), rows_remaining);
+ TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
+ int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
+ Status status = CommitRows(row_batch, num_to_commit);
+ assemble_rows_timer_.Stop();
+ RETURN_IF_ERROR(status);
+ row_group_rows_read_ += max_tuples;
+ COUNTER_ADD(scan_node_->rows_read_counter(), row_group_rows_read_);
+ return Status::OK();
+ }
+
+ // Transfer remaining tuples from the scratch batch.
+ if (!scratch_batch_->AtEnd()) {
+ assemble_rows_timer_.Start();
+ int num_row_to_commit = TransferScratchTuples(row_batch);
+ assemble_rows_timer_.Stop();
+ RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit));
+ if (row_batch->AtCapacity()) return Status::OK();
+ }
+
+ while (advance_row_group_ || column_readers_[0]->RowGroupAtEnd()) {
+ // Transfer resources and clear streams if there is any leftover from the
previous
+ // row group. We will create new streams for the next row group.
+ FlushRowGroupResources(row_batch);
+ if (!advance_row_group_) {
+ Status status =
+ ValidateEndOfRowGroup(column_readers_, row_group_idx_,
row_group_rows_read_);
+ if (!status.ok())
RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
+ }
+ RETURN_IF_ERROR(NextRowGroup());
+ DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size());
+ if (row_group_idx_ == file_metadata_.row_groups.size()) {
+ eos_ = true;
+ DCHECK(parse_status_.ok());
+ return Status::OK();
+ }
+ }
+
+ // Apply any runtime filters to static tuples containing the partition keys
for this
+ // partition. If any filter fails, we return immediately and stop processing
this
+ // scan range.
+ if
(!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(),
+ FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
+ eos_ = true;
+ DCHECK(parse_status_.ok());
+ return Status::OK();
+ }
+ assemble_rows_timer_.Start();
+ Status status = AssembleRows(column_readers_, row_batch,
&advance_row_group_);
+ assemble_rows_timer_.Stop();
+ RETURN_IF_ERROR(status);
+ if (!parse_status_.ok()) {
+ RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg()));
+ parse_status_ = Status::OK();
+ }
+
+ return Status::OK();
+}
+
+Status HdfsParquetScanner::EvaluateStatsConjuncts(
+ const parquet::FileMetaData& file_metadata, const parquet::RowGroup&
row_group,
+ bool* skip_row_group) {
+ *skip_row_group = false;
+
+ if (!state_->query_options().parquet_read_statistics) return Status::OK();
+
+ const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
+ if (!min_max_tuple_desc) return Status::OK();
+
+ int64_t tuple_size = min_max_tuple_desc->byte_size();
+
+ DCHECK(min_max_tuple_ != nullptr);
+ min_max_tuple_->Init(tuple_size);
+
+ DCHECK_EQ(min_max_tuple_desc->slots().size(),
min_max_conjunct_evals_.size());
+ for (int i = 0; i < min_max_conjunct_evals_.size(); ++i) {
+ SlotDescriptor* slot_desc = min_max_tuple_desc->slots()[i];
+ ScalarExprEvaluator* eval = min_max_conjunct_evals_[i];
+
+ // Resolve column path to determine col idx.
+ SchemaNode* node = nullptr;
+ bool pos_field;
+ bool missing_field;
+ RETURN_IF_ERROR(schema_resolver_->ResolvePath(slot_desc->col_path(),
+ &node, &pos_field, &missing_field));
+
+ if (missing_field) {
+ // We are selecting a column that is not in the file. We would set its
slot to NULL
+ // during the scan, so any predicate would evaluate to false. Return
early. NULL
+ // comparisons cannot happen here, since predicates with NULL literals
are filtered
+ // in the frontend.
+ *skip_row_group = true;
+ break;
+ }
+
+ if (pos_field) {
+ // The planner should not send predicates with 'pos' for stats filtering
to the BE.
+ // In case there is a bug, we return an error, which will abort the
query.
+ stringstream err;
+ err << "Statistics not supported for pos fields: " <<
slot_desc->DebugString();
+ DCHECK(false) << err.str();
+ return Status(err.str());
+ }
+
+ int col_idx = node->col_idx;
+ DCHECK_LT(col_idx, row_group.columns.size());
+
+ const vector<parquet::ColumnOrder>& col_orders =
file_metadata.column_orders;
+ const parquet::ColumnOrder* col_order = nullptr;
+ if (col_idx < col_orders.size()) col_order = &col_orders[col_idx];
+
+ const parquet::ColumnChunk& col_chunk = row_group.columns[col_idx];
+ const ColumnType& col_type = slot_desc->type();
+
+ DCHECK(node->element != nullptr);
+
+ ColumnStatsReader stat_reader(col_chunk, col_type, col_order,
*node->element);
+ if (col_type.IsTimestampType()) {
+ stat_reader.SetTimestampDecoder(CreateTimestampDecoder(*node->element));
+ }
+
+ int64_t null_count = 0;
+ bool null_count_result = stat_reader.ReadNullCountStat(&null_count);
+ if (null_count_result && null_count == col_chunk.meta_data.num_values) {
+ *skip_row_group = true;
+ break;
+ }
+
+ const string& fn_name = eval->root().function_name();
+ ColumnStatsReader::StatsField stats_field;
+ if (fn_name == "lt" || fn_name == "le") {
+ // We need to get min stats.
+ stats_field = ColumnStatsReader::StatsField::MIN;
+ } else if (fn_name == "gt" || fn_name == "ge") {
+ // We need to get max stats.
+ stats_field = ColumnStatsReader::StatsField::MAX;
+ } else {
+ DCHECK(false) << "Unsupported function name for statistics evaluation: "
<< fn_name;
+ continue;
+ }
+
+ void* slot = min_max_tuple_->GetSlot(slot_desc->tuple_offset());
+ bool stats_read = stat_reader.ReadFromThrift(stats_field, slot);
+
+ if (stats_read) {
+ TupleRow row;
+ row.SetTuple(0, min_max_tuple_);
+ if (!ExecNode::EvalPredicate(eval, &row)) {
+ *skip_row_group = true;
+ break;
+ }
+ }
+ }
+
+ // Free any expr result allocations accumulated during conjunct evaluation.
+ context_->expr_results_pool()->Clear();
+ return Status::OK();
+}
+
+Status HdfsParquetScanner::NextRowGroup() {
+ const ScanRange* split_range = static_cast<ScanRangeMetadata*>(
+ metadata_range_->meta_data())->original_split;
+ int64_t split_offset = split_range->offset();
+ int64_t split_length = split_range->len();
+
+ HdfsFileDesc* file_desc = scan_node_->GetFileDesc(
+ context_->partition_descriptor()->id(), filename());
+
+ bool start_with_first_row_group = row_group_idx_ == -1;
+ bool misaligned_row_group_skipped = false;
+
+ advance_row_group_ = false;
+ row_group_rows_read_ = 0;
+
+ // Loop until we have found a non-empty row group, and successfully
initialized and
+ // seeded the column readers. Return a non-OK status from within loop only
if the error
+ // is non-recoverable, otherwise log the error and continue with the next
row group.
+ while (true) {
+ // Reset the parse status for the next row group.
+ parse_status_ = Status::OK();
+ // Make sure that we don't have leftover resources from the file metadata
scan range
+ // or previous row groups.
+ DCHECK_EQ(0, context_->NumStreams());
+
+ ++row_group_idx_;
+ if (row_group_idx_ >= file_metadata_.row_groups.size()) {
+ if (start_with_first_row_group && misaligned_row_group_skipped) {
+ // We started with the first row group and skipped all the row groups
because
+ // they were misaligned. The execution flow won't reach this point if
there is at
+ // least one non-empty row group which this scanner can process.
+ COUNTER_ADD(num_scanners_with_no_reads_counter_, 1);
+ }
+ break;
+ }
+ const parquet::RowGroup& row_group =
file_metadata_.row_groups[row_group_idx_];
+ // Also check 'file_metadata_.num_rows' to make sure 'select count(*)' and
'select *'
+ // behave consistently for corrupt files that have
'file_metadata_.num_rows == 0'
+ // but some data in row groups.
+ if (row_group.num_rows == 0 || file_metadata_.num_rows == 0) continue;
+
+ RETURN_IF_ERROR(ParquetMetadataUtils::ValidateColumnOffsets(
+ file_desc->filename, file_desc->file_length, row_group));
+
+ // A row group is processed by the scanner whose split overlaps with the
row
+ // group's mid point.
+ int64_t row_group_mid_pos = GetRowGroupMidOffset(row_group);
+ if (!(row_group_mid_pos >= split_offset &&
+ row_group_mid_pos < split_offset + split_length)) {
+ // The mid-point does not fall within the split, this row group will be
handled by a
+ // different scanner.
+ // If the row group overlaps with the split, we found a misaligned row
group.
+ misaligned_row_group_skipped |= CheckRowGroupOverlapsSplit(row_group,
split_range);
+ continue;
+ }
+
+ COUNTER_ADD(num_row_groups_counter_, 1);
+
+ // Evaluate row group statistics.
+ bool skip_row_group_on_stats;
+ RETURN_IF_ERROR(
+ EvaluateStatsConjuncts(file_metadata_, row_group,
&skip_row_group_on_stats));
+ if (skip_row_group_on_stats) {
+ COUNTER_ADD(num_stats_filtered_row_groups_counter_, 1);
+ continue;
+ }
+
+ InitCollectionColumns();
+ RETURN_IF_ERROR(InitScalarColumns());
+
+ // Start scanning dictionary filtering column readers, so we can read the
dictionary
+ // pages in EvalDictionaryFilters().
+
RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(dict_filterable_readers_));
+
+ // StartScans() may have allocated resources to scan columns. If we skip
this row
+ // group below, we must call ReleaseSkippedRowGroupResources() before
continuing.
+
+ // If there is a dictionary-encoded column where every value is eliminated
+ // by a conjunct, the row group can be eliminated. This initializes
dictionaries
+ // for all columns visited.
+ bool skip_row_group_on_dict_filters;
+ Status status = EvalDictionaryFilters(row_group,
&skip_row_group_on_dict_filters);
+ if (!status.ok()) {
+ // Either return an error or skip this row group if it is ok to ignore
errors
+ RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
+ ReleaseSkippedRowGroupResources();
+ continue;
+ }
+ if (skip_row_group_on_dict_filters) {
+ COUNTER_ADD(num_dict_filtered_row_groups_counter_, 1);
+ ReleaseSkippedRowGroupResources();
+ continue;
+ }
+
+ // At this point, the row group has passed any filtering criteria
+ // Start scanning non-dictionary filtering column readers and initialize
their
+ // dictionaries.
+
RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(non_dict_filterable_readers_));
+ status =
BaseScalarColumnReader::InitDictionaries(non_dict_filterable_readers_);
+ if (!status.ok()) {
+ // Either return an error or skip this row group if it is ok to ignore
errors
+ RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
+ ReleaseSkippedRowGroupResources();
+ continue;
+ }
+ DCHECK(parse_status_.ok()) << "Invalid parse_status_" <<
parse_status_.GetDetail();
+ break;
+ }
+ DCHECK(parse_status_.ok());
+ return Status::OK();
+}
+
+void HdfsParquetScanner::FlushRowGroupResources(RowBatch* row_batch) {
+ DCHECK(row_batch != nullptr);
+ row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
+ scratch_batch_->ReleaseResources(row_batch->tuple_data_pool());
+ context_->ReleaseCompletedResources(true);
+ for (ParquetColumnReader* col_reader : column_readers_)
col_reader->Close(row_batch);
+ context_->ClearStreams();
+}
+
+void HdfsParquetScanner::ReleaseSkippedRowGroupResources() {
+ dictionary_pool_->FreeAll();
+ scratch_batch_->ReleaseResources(nullptr);
+ context_->ReleaseCompletedResources(true);
+ for (ParquetColumnReader* col_reader : column_readers_)
col_reader->Close(nullptr);
+ context_->ClearStreams();
+}
+
+bool HdfsParquetScanner::IsDictFilterable(BaseScalarColumnReader* col_reader) {
+ const SlotDescriptor* slot_desc = col_reader->slot_desc();
+ // Some queries do not need the column to be materialized, so slot_desc is
NULL.
+ // For example, a count(*) with no predicates only needs to count records
+ // rather than materializing the values.
+ if (!slot_desc) return false;
+ // Does this column reader have any dictionary filter conjuncts?
+ auto dict_filter_it = dict_filter_map_.find(slot_desc->id());
+ if (dict_filter_it == dict_filter_map_.end()) return false;
+
+ // Certain datatypes (chars, timestamps) do not have the appropriate value
in the
+ // file format and must be converted before return. This is true for the
+ // dictionary values, so skip these datatypes for now.
+ // TODO: The values should be converted during dictionary construction and
stored
+ // in converted form in the dictionary.
+ if (col_reader->NeedsConversion()) return false;
+
+ // Certain datatypes (timestamps) need to validate the value, as certain bit
+ // combinations are not valid. The dictionary values are not validated, so
+ // skip these datatypes for now.
+ // TODO: This should be pushed into dictionary construction.
+ if (col_reader->NeedsValidation()) return false;
+
+ return true;
+}
+
+void HdfsParquetScanner::PartitionReaders(
+ const vector<ParquetColumnReader*>& readers, bool can_eval_dict_filters) {
+ for (auto* reader : readers) {
+ if (reader->IsCollectionReader()) {
+ CollectionColumnReader* col_reader =
static_cast<CollectionColumnReader*>(reader);
+ collection_readers_.push_back(col_reader);
+ PartitionReaders(*col_reader->children(), can_eval_dict_filters);
+ } else {
+ BaseScalarColumnReader* scalar_reader =
+ static_cast<BaseScalarColumnReader*>(reader);
+ scalar_readers_.push_back(scalar_reader);
+ if (can_eval_dict_filters && IsDictFilterable(scalar_reader)) {
+ dict_filterable_readers_.push_back(scalar_reader);
+ } else {
+ non_dict_filterable_readers_.push_back(scalar_reader);
+ }
+ }
+ }
+}
+
+Status HdfsParquetScanner::InitDictFilterStructures() {
+ bool can_eval_dict_filters =
+ state_->query_options().parquet_dictionary_filtering &&
!dict_filter_map_.empty();
+
+ // Separate column readers into scalar and collection readers.
+ PartitionReaders(column_readers_, can_eval_dict_filters);
+
+ // Allocate tuple buffers for all tuple descriptors that are associated with
conjuncts
+ // that can be dictionary filtered.
+ for (auto* col_reader : dict_filterable_readers_) {
+ const SlotDescriptor* slot_desc = col_reader->slot_desc();
+ const TupleDescriptor* tuple_desc = slot_desc->parent();
+ auto tuple_it = dict_filter_tuple_map_.find(tuple_desc);
+ if (tuple_it != dict_filter_tuple_map_.end()) continue;
+ int tuple_size = tuple_desc->byte_size();
+ if (tuple_size > 0) {
+ uint8_t* buffer = perm_pool_->TryAllocate(tuple_size);
+ if (buffer == nullptr) {
+ string details = Substitute(
+ PARQUET_MEM_LIMIT_EXCEEDED, "InitDictFilterStructures", tuple_size,
+ "Dictionary Filtering Tuple");
+ return scan_node_->mem_tracker()->MemLimitExceeded(state_, details,
tuple_size);
+ }
+ dict_filter_tuple_map_[tuple_desc] = reinterpret_cast<Tuple*>(buffer);
+ }
+ }
+ return Status::OK();
+}
+
+bool HdfsParquetScanner::IsDictionaryEncoded(
+ const parquet::ColumnMetaData& col_metadata) {
+ // The Parquet spec allows for column chunks to have mixed encodings
+ // where some data pages are dictionary-encoded and others are plain
+ // encoded. For example, a Parquet file writer might start writing
+ // a column chunk as dictionary encoded, but it will switch to plain
+ // encoding if the dictionary grows too large.
+ //
+ // In order for dictionary filters to skip the entire row group,
+ // the conjuncts must be evaluated on column chunks that are entirely
+ // encoded with the dictionary encoding. There are two checks
+ // available to verify this:
+ // 1. The encoding_stats field on the column chunk metadata provides
+ // information about the number of data pages written in each
+ // format. This allows for a specific check of whether all the
+ // data pages are dictionary encoded.
+ // 2. The encodings field on the column chunk metadata lists the
+ // encodings used. If this list contains the dictionary encoding
+ // and does not include unexpected encodings (i.e. encodings not
+ // associated with definition/repetition levels), then it is entirely
+ // dictionary encoded.
+
+ if (col_metadata.__isset.encoding_stats) {
+ // Condition #1 above
+ for (const parquet::PageEncodingStats& enc_stat :
col_metadata.encoding_stats) {
+ if (enc_stat.page_type == parquet::PageType::DATA_PAGE &&
+ enc_stat.encoding != parquet::Encoding::PLAIN_DICTIONARY &&
+ enc_stat.count > 0) {
+ return false;
+ }
+ }
+ } else {
+ // Condition #2 above
+ bool has_dict_encoding = false;
+ bool has_nondict_encoding = false;
+ for (const parquet::Encoding::type& encoding : col_metadata.encodings) {
+ if (encoding == parquet::Encoding::PLAIN_DICTIONARY) has_dict_encoding =
true;
+
+ // RLE and BIT_PACKED are used for repetition/definition levels
+ if (encoding != parquet::Encoding::PLAIN_DICTIONARY &&
+ encoding != parquet::Encoding::RLE &&
+ encoding != parquet::Encoding::BIT_PACKED) {
+ has_nondict_encoding = true;
+ break;
+ }
+ }
+ // Not entirely dictionary encoded if:
+ // 1. No dictionary encoding listed
+ // OR
+ // 2. Some non-dictionary encoding is listed
+ if (!has_dict_encoding || has_nondict_encoding) return false;
+ }
+
+ return true;
+}
+
+Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup&
row_group,
+ bool* row_group_eliminated) {
+ *row_group_eliminated = false;
+ // Check if there's anything to do here.
+ if (dict_filterable_readers_.empty()) return Status::OK();
+
+ // Legacy impala files (< 2.9) require special handling, because they do not
encode
+ // information about whether the column is 100% dictionary encoded.
+ bool is_legacy_impala = false;
+ if (file_version_.application == "impala" && file_version_.VersionLt(2,9,0))
{
+ is_legacy_impala = true;
+ }
+
+ // Keeps track of column readers that need to be initialized. For example,
if a
+ // column cannot be filtered, then defer its dictionary initialization once
we know
+ // the row group cannot be filtered.
+ vector<BaseScalarColumnReader*> deferred_dict_init_list;
+ // Keeps track of the initialized tuple associated with a TupleDescriptor.
+ unordered_map<const TupleDescriptor*, Tuple*> tuple_map;
+ for (BaseScalarColumnReader* scalar_reader : dict_filterable_readers_) {
+ const parquet::ColumnMetaData& col_metadata =
+ row_group.columns[scalar_reader->col_idx()].meta_data;
+
+ // Legacy impala files cannot be eliminated here, because the only way to
+ // determine whether the column is 100% dictionary encoded requires reading
+ // the dictionary.
+ if (!is_legacy_impala && !IsDictionaryEncoded(col_metadata)) {
+ // We cannot guarantee that this reader is 100% dictionary encoded,
+ // so dictionary filters cannot be used. Defer initializing its
dictionary
+ // until after the other filters have been evaluated.
+ deferred_dict_init_list.push_back(scalar_reader);
+ continue;
+ }
+
+ RETURN_IF_ERROR(scalar_reader->InitDictionary());
+ DictDecoderBase* dictionary = scalar_reader->GetDictionaryDecoder();
+ if (!dictionary) continue;
+
+ // Legacy (version < 2.9) Impala files do not spill to PLAIN encoding until
+ // it reaches the maximum number of dictionary entries. If the dictionary
+ // has fewer entries, then it is 100% dictionary encoded.
+ if (is_legacy_impala &&
+ dictionary->num_entries() >= LEGACY_IMPALA_MAX_DICT_ENTRIES) continue;
+
+ const SlotDescriptor* slot_desc = scalar_reader->slot_desc();
+ DCHECK(slot_desc != nullptr);
+ const TupleDescriptor* tuple_desc = slot_desc->parent();
+ auto dict_filter_it = dict_filter_map_.find(slot_desc->id());
+ DCHECK(dict_filter_it != dict_filter_map_.end());
+ const vector<ScalarExprEvaluator*>& dict_filter_conjunct_evals =
+ dict_filter_it->second;
+ Tuple* dict_filter_tuple = nullptr;
+ auto dict_filter_tuple_it = tuple_map.find(tuple_desc);
+ if (dict_filter_tuple_it == tuple_map.end()) {
+ auto tuple_it = dict_filter_tuple_map_.find(tuple_desc);
+ DCHECK(tuple_it != dict_filter_tuple_map_.end());
+ dict_filter_tuple = tuple_it->second;
+ dict_filter_tuple->Init(tuple_desc->byte_size());
+ tuple_map[tuple_desc] = dict_filter_tuple;
+ } else {
+ dict_filter_tuple = dict_filter_tuple_it->second;
+ }
+
+ DCHECK(dict_filter_tuple != nullptr);
+ void* slot = dict_filter_tuple->GetSlot(slot_desc->tuple_offset());
+ bool column_has_match = false;
+ for (int dict_idx = 0; dict_idx < dictionary->num_entries(); ++dict_idx) {
+ if (dict_idx % 1024 == 0) {
+ // Don't let expr result allocations accumulate too much for large
dictionaries or
+ // many row groups.
+ context_->expr_results_pool()->Clear();
+ }
+ dictionary->GetValue(dict_idx, slot);
+
+ // We can only eliminate this row group if no value from the dictionary
matches.
+ // If any dictionary value passes the conjuncts, then move on to the
next column.
+ TupleRow row;
+ row.SetTuple(0, dict_filter_tuple);
+ if (ExecNode::EvalConjuncts(dict_filter_conjunct_evals.data(),
+ dict_filter_conjunct_evals.size(), &row)) {
+ column_has_match = true;
+ break;
+ }
+ }
+ // Free all expr result allocations now that we're done with the filter.
+ context_->expr_results_pool()->Clear();
+
+ if (!column_has_match) {
+ // The column contains no value that matches the conjunct. The row group
+ // can be eliminated.
+ *row_group_eliminated = true;
+ return Status::OK();
+ }
+ }
+
+ // Any columns that were not 100% dictionary encoded need to initialize
+ // their dictionaries here.
+
RETURN_IF_ERROR(BaseScalarColumnReader::InitDictionaries(deferred_dict_init_list));
+
+ return Status::OK();
+}
+
+/// High-level steps of this function:
+/// 1. Allocate 'scratch' memory for tuples able to hold a full batch
+/// 2. Populate the slots of all scratch tuples one column reader at a time,
+/// using the ColumnReader::Read*ValueBatch() functions.
+/// 3. Evaluate runtime filters and conjuncts against the scratch tuples and
+/// set the surviving tuples in the output batch. Transfer the ownership of
+/// scratch memory to the output batch once the scratch memory is exhausted.
+/// 4. Repeat steps above until we are done with the row group or an error
+/// occurred.
+/// TODO: Since the scratch batch is populated in a column-wise fashion, it is
+/// difficult to maintain a maximum memory footprint without throwing away at
least
+/// some work. This point needs further experimentation and thought.
+Status HdfsParquetScanner::AssembleRows(
+ const vector<ParquetColumnReader*>& column_readers, RowBatch* row_batch,
+ bool* skip_row_group) {
+ DCHECK(!column_readers.empty());
+ DCHECK(row_batch != nullptr);
+ DCHECK_EQ(*skip_row_group, false);
+ DCHECK(scratch_batch_ != nullptr);
+
+ int64_t num_rows_read = 0;
+ while (!column_readers[0]->RowGroupAtEnd()) {
+ // Start a new scratch batch.
+ RETURN_IF_ERROR(scratch_batch_->Reset(state_));
+ InitTupleBuffer(template_tuple_, scratch_batch_->tuple_mem,
scratch_batch_->capacity);
+
+ // Materialize the top-level slots into the scratch batch column-by-column.
+ int last_num_tuples = -1;
+ for (int c = 0; c < column_readers.size(); ++c) {
+ ParquetColumnReader* col_reader = column_readers[c];
+ bool continue_execution;
+ if (col_reader->max_rep_level() > 0) {
+ continue_execution =
col_reader->ReadValueBatch(&scratch_batch_->aux_mem_pool,
+ scratch_batch_->capacity, tuple_byte_size_,
scratch_batch_->tuple_mem,
+ &scratch_batch_->num_tuples);
+ } else {
+ continue_execution = col_reader->ReadNonRepeatedValueBatch(
+ &scratch_batch_->aux_mem_pool, scratch_batch_->capacity,
tuple_byte_size_,
+ scratch_batch_->tuple_mem, &scratch_batch_->num_tuples);
+ }
+ // Check that all column readers populated the same number of values.
+ bool num_tuples_mismatch = c != 0 && last_num_tuples !=
scratch_batch_->num_tuples;
+ if (UNLIKELY(!continue_execution || num_tuples_mismatch)) {
+ // Skipping this row group. Free up all the resources with this row
group.
+ FlushRowGroupResources(row_batch);
+ scratch_batch_->num_tuples = 0;
+ DCHECK(scratch_batch_->AtEnd());
+ *skip_row_group = true;
+ if (num_tuples_mismatch && continue_execution) {
+ Status err(Substitute("Corrupt Parquet file '$0': column '$1' "
+ "had $2 remaining values but expected $3", filename(),
+ col_reader->schema_element().name, last_num_tuples,
+ scratch_batch_->num_tuples));
+ parse_status_.MergeStatus(err);
+ }
+ return Status::OK();
+ }
+ last_num_tuples = scratch_batch_->num_tuples;
+ }
+ num_rows_read += scratch_batch_->num_tuples;
+ int num_row_to_commit = TransferScratchTuples(row_batch);
+ RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit));
+ if (row_batch->AtCapacity()) break;
+ }
+ row_group_rows_read_ += num_rows_read;
+ COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
+ // Merge Scanner-local counter into HdfsScanNode counter and reset.
+ COUNTER_ADD(scan_node_->collection_items_read_counter(),
coll_items_read_counter_);
+ coll_items_read_counter_ = 0;
+ return Status::OK();
+}
+
+Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, int num_rows) {
+ DCHECK(dst_batch != nullptr);
+ dst_batch->CommitRows(num_rows);
+
+ if (context_->cancelled()) return Status::CancelledInternal("Parquet
scanner");
+ // TODO: It's a really bad idea to propagate UDF error via the global
RuntimeState.
+ // Store UDF error in thread local storage or make UDF return status so it
can merge
+ // with parse_status_.
+ RETURN_IF_ERROR(state_->GetQueryStatus());
+ // Clear expr result allocations for this thread to avoid accumulating too
much
+ // memory from evaluating the scanner conjuncts.
+ context_->expr_results_pool()->Clear();
+ return Status::OK();
+}
+
+int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {
+ // This function must not be called when the output batch is already full.
As long as
+ // we always call CommitRows() after TransferScratchTuples(), the output
batch can
+ // never be empty.
+ DCHECK_LT(dst_batch->num_rows(), dst_batch->capacity());
+ DCHECK_EQ(scan_node_->tuple_idx(), 0);
+ DCHECK_EQ(dst_batch->row_desc()->tuple_descriptors().size(), 1);
+ if (scratch_batch_->tuple_byte_size == 0) {
+ Tuple** output_row =
+ reinterpret_cast<Tuple**>(dst_batch->GetRow(dst_batch->num_rows()));
+ // We are materializing a collection with empty tuples. Add a NULL tuple
to the
+ // output batch per remaining scratch tuple and return. No need to evaluate
+ // filters/conjuncts.
+ DCHECK(filter_ctxs_.empty());
+ DCHECK(conjunct_evals_->empty());
+ int num_tuples = min(dst_batch->capacity() - dst_batch->num_rows(),
+ scratch_batch_->num_tuples - scratch_batch_->tuple_idx);
+ memset(output_row, 0, num_tuples * sizeof(Tuple*));
+ scratch_batch_->tuple_idx += num_tuples;
+ // No data is required to back the empty tuples, so we should not attach
any data to
+ // these batches.
+ DCHECK_EQ(0, scratch_batch_->total_allocated_bytes());
+ return num_tuples;
+ }
+
+ int num_rows_to_commit;
+ if (codegend_process_scratch_batch_fn_ != nullptr) {
+ num_rows_to_commit = codegend_process_scratch_batch_fn_(this, dst_batch);
+ } else {
+ num_rows_to_commit = ProcessScratchBatch(dst_batch);
+ }
+ scratch_batch_->FinalizeTupleTransfer(dst_batch, num_rows_to_commit);
+ return num_rows_to_commit;
+}
+
+Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node,
+ const vector<ScalarExpr*>& conjuncts, llvm::Function**
process_scratch_batch_fn) {
+ DCHECK(node->runtime_state()->ShouldCodegen());
+ *process_scratch_batch_fn = nullptr;
+ LlvmCodeGen* codegen = node->runtime_state()->codegen();
+ DCHECK(codegen != nullptr);
+
+ llvm::Function* fn = codegen->GetFunction(IRFunction::PROCESS_SCRATCH_BATCH,
true);
+ DCHECK(fn != nullptr);
+
+ llvm::Function* eval_conjuncts_fn;
+ RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjuncts,
&eval_conjuncts_fn));
+ DCHECK(eval_conjuncts_fn != nullptr);
+
+ int replaced = codegen->ReplaceCallSites(fn, eval_conjuncts_fn,
"EvalConjuncts");
+ DCHECK_REPLACE_COUNT(replaced, 1);
+
+ llvm::Function* eval_runtime_filters_fn;
+ RETURN_IF_ERROR(CodegenEvalRuntimeFilters(
+ codegen, node->filter_exprs(), &eval_runtime_filters_fn));
+ DCHECK(eval_runtime_filters_fn != nullptr);
+
+ replaced = codegen->ReplaceCallSites(fn, eval_runtime_filters_fn,
"EvalRuntimeFilters");
+ DCHECK_REPLACE_COUNT(replaced, 1);
+
+ fn->setName("ProcessScratchBatch");
+ *process_scratch_batch_fn = codegen->FinalizeFunction(fn);
+ if (*process_scratch_batch_fn == nullptr) {
+ return Status("Failed to finalize process_scratch_batch_fn.");
+ }
+ return Status::OK();
+}
+
+bool HdfsParquetScanner::AssembleCollection(
+ const vector<ParquetColumnReader*>& column_readers, int
new_collection_rep_level,
+ CollectionValueBuilder* coll_value_builder) {
+ DCHECK(!column_readers.empty());
+ DCHECK_GE(new_collection_rep_level, 0);
+ DCHECK(coll_value_builder != nullptr);
+
+ const TupleDescriptor* tuple_desc = &coll_value_builder->tuple_desc();
+ Tuple* template_tuple = template_tuple_map_[tuple_desc];
+ const vector<ScalarExprEvaluator*> evals =
+ conjunct_evals_map_[tuple_desc->id()];
+
+ int64_t rows_read = 0;
+ bool continue_execution = !scan_node_->ReachedLimit() &&
!context_->cancelled();
+ // Note that this will be set to true at the end of the row group or the end
of the
+ // current collection (if applicable).
+ bool end_of_collection = column_readers[0]->rep_level() == -1;
+ // We only initialize end_of_collection to true here if we're at the end of
the row
+ // group (otherwise it would always be true because we're on the "edge" of
two
+ // collections), and only ProcessSplit() should call AssembleRows() at the
end of the
+ // row group.
+ if (coll_value_builder != nullptr) DCHECK(!end_of_collection);
+
+ while (!end_of_collection && continue_execution) {
+ MemPool* pool;
+ Tuple* tuple;
+ TupleRow* row = nullptr;
+
+ int64_t num_rows;
+ // We're assembling item tuples into an CollectionValue
+ parse_status_ =
+ GetCollectionMemory(coll_value_builder, &pool, &tuple, &row,
&num_rows);
+ if (UNLIKELY(!parse_status_.ok())) {
+ continue_execution = false;
+ break;
+ }
+ // 'num_rows' can be very high if we're writing to a large
CollectionValue. Limit
+ // the number of rows we read at one time so we don't spend too long in the
+ // 'num_rows' loop below before checking for cancellation or limit reached.
+ num_rows = min(
+ num_rows,
static_cast<int64_t>(scan_node_->runtime_state()->batch_size()));
+
+ int num_to_commit = 0;
+ int row_idx = 0;
+ for (row_idx = 0; row_idx < num_rows && !end_of_collection; ++row_idx) {
+ DCHECK(continue_execution);
+ // A tuple is produced iff the collection that contains its values is
not empty and
+ // non-NULL. (Empty or NULL collections produce no output values,
whereas NULL is
+ // output for the fields of NULL structs.)
+ bool materialize_tuple = column_readers[0]->def_level() >=
+ column_readers[0]->def_level_of_immediate_repeated_ancestor();
+ InitTuple(tuple_desc, template_tuple, tuple);
+ continue_execution =
+ ReadCollectionItem(column_readers, materialize_tuple, pool, tuple);
+ if (UNLIKELY(!continue_execution)) break;
+ end_of_collection = column_readers[0]->rep_level() <=
new_collection_rep_level;
+
+ if (materialize_tuple) {
+ if (ExecNode::EvalConjuncts(evals.data(), evals.size(), row)) {
+ tuple = next_tuple(tuple_desc->byte_size(), tuple);
+ ++num_to_commit;
+ }
+ }
+ }
+
+ rows_read += row_idx;
+ coll_value_builder->CommitTuples(num_to_commit);
+ continue_execution &= !scan_node_->ReachedLimit() &&
!context_->cancelled();
+ }
+ coll_items_read_counter_ += rows_read;
+ if (end_of_collection) {
+ // All column readers should report the start of the same collection.
+ for (int c = 1; c < column_readers.size(); ++c) {
+ FILE_CHECK_EQ(column_readers[c]->rep_level(),
column_readers[0]->rep_level());
+ }
+ }
+ return continue_execution;
+}
+
+inline bool HdfsParquetScanner::ReadCollectionItem(
+ const vector<ParquetColumnReader*>& column_readers,
+ bool materialize_tuple, MemPool* pool, Tuple* tuple) const {
+ DCHECK(!column_readers.empty());
+ bool continue_execution = true;
+ int size = column_readers.size();
+ for (int c = 0; c < size; ++c) {
+ ParquetColumnReader* col_reader = column_readers[c];
+ if (materialize_tuple) {
+ // All column readers for this tuple should a value to materialize.
+ FILE_CHECK_GE(col_reader->def_level(),
+ col_reader->def_level_of_immediate_repeated_ancestor());
+ // Fill in position slot if applicable
+ const SlotDescriptor* pos_slot_desc = col_reader->pos_slot_desc();
+ if (pos_slot_desc != nullptr) {
+ col_reader->ReadPositionNonBatched(
+ tuple->GetBigIntSlot(pos_slot_desc->tuple_offset()));
+ }
+ continue_execution = col_reader->ReadValue(pool, tuple);
+ } else {
+ // A containing repeated field is empty or NULL
+ FILE_CHECK_LT(col_reader->def_level(),
+ col_reader->def_level_of_immediate_repeated_ancestor());
+ continue_execution = col_reader->NextLevels();
+ }
+ if (UNLIKELY(!continue_execution)) break;
+ }
+ return continue_execution;
+}
+
+Status HdfsParquetScanner::ProcessFooter() {
+ const int64_t file_len = stream_->file_desc()->file_length;
+ const int64_t scan_range_len = stream_->scan_range()->len();
+
+ // We're processing the scan range issued in IssueInitialRanges(). The scan
range should
+ // be the last FOOTER_BYTES of the file. !success means the file is shorter
than we
+ // expect. Note we can't detect if the file is larger than we expect without
attempting
+ // to read past the end of the scan range, but in this case we'll fail below
trying to
+ // parse the footer.
+ DCHECK_LE(scan_range_len, FOOTER_SIZE);
+ uint8_t* buffer;
+ bool success = stream_->ReadBytes(scan_range_len, &buffer, &parse_status_);
+ if (!success) {
+ DCHECK(!parse_status_.ok());
+ if (parse_status_.code() == TErrorCode::SCANNER_INCOMPLETE_READ) {
+ VLOG_QUERY << "Metadata for file '" << filename() << "' appears stale: "
+ << "metadata states file size to be "
+ << PrettyPrinter::Print(file_len, TUnit::BYTES)
+ << ", but could only read "
+ << PrettyPrinter::Print(stream_->total_bytes_returned(),
TUnit::BYTES);
+ return Status(TErrorCode::STALE_METADATA_FILE_TOO_SHORT, filename(),
+ scan_node_->hdfs_table()->fully_qualified_name());
+ }
+ return parse_status_;
+ }
+ DCHECK(stream_->eosr());
+
+ // Number of bytes in buffer after the fixed size footer is accounted for.
+ int remaining_bytes_buffered = scan_range_len - sizeof(int32_t) -
+ sizeof(PARQUET_VERSION_NUMBER);
+
+ // Make sure footer has enough bytes to contain the required information.
+ if (remaining_bytes_buffered < 0) {
+ return Status(Substitute("File '$0' is invalid. Missing metadata.",
filename()));
+ }
+
+ // Validate magic file bytes are correct.
+ uint8_t* magic_number_ptr = buffer + scan_range_len -
sizeof(PARQUET_VERSION_NUMBER);
+ if (memcmp(magic_number_ptr, PARQUET_VERSION_NUMBER,
+ sizeof(PARQUET_VERSION_NUMBER)) != 0) {
+ return Status(TErrorCode::PARQUET_BAD_VERSION_NUMBER, filename(),
+ string(reinterpret_cast<char*>(magic_number_ptr),
sizeof(PARQUET_VERSION_NUMBER)),
+ scan_node_->hdfs_table()->fully_qualified_name());
+ }
+
+ // The size of the metadata is encoded as a 4 byte little endian value before
+ // the magic number
+ uint8_t* metadata_size_ptr = magic_number_ptr - sizeof(int32_t);
+ uint32_t metadata_size = *reinterpret_cast<uint32_t*>(metadata_size_ptr);
+ uint8_t* metadata_ptr = metadata_size_ptr - metadata_size;
+ // The start of the metadata is:
+ // file_len - 4-byte footer length field - 4-byte version number field -
metadata size
+ int64_t metadata_start = file_len - sizeof(int32_t) -
sizeof(PARQUET_VERSION_NUMBER) -
+ metadata_size;
+
+ // If the metadata was too big, we need to read it into a contiguous buffer
before
+ // deserializing it.
+ ScopedBuffer metadata_buffer(scan_node_->mem_tracker());
+
+ DCHECK(metadata_range_ != nullptr);
+ if (UNLIKELY(metadata_size > remaining_bytes_buffered)) {
+ // In this case, the metadata is bigger than our guess meaning there are
+ // not enough bytes in the footer range from IssueInitialRanges().
+ // We'll just issue more ranges to the IoMgr that is the actual footer.
+ int64_t partition_id = context_->partition_descriptor()->id();
+ const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id,
filename());
+ DCHECK_EQ(file_desc, stream_->file_desc());
+ if (metadata_start < 0) {
+ return Status(Substitute("File '$0' is invalid. Invalid metadata size in
file "
+ "footer: $1 bytes. File size: $2 bytes.", filename(), metadata_size,
file_len));
+ }
+
+ if (!metadata_buffer.TryAllocate(metadata_size)) {
+ string details = Substitute("Could not allocate buffer of $0 bytes for
Parquet "
+ "metadata for file '$1'.", metadata_size, filename());
+ return scan_node_->mem_tracker()->MemLimitExceeded(state_, details,
metadata_size);
+ }
+ metadata_ptr = metadata_buffer.buffer();
+
+ // Read the footer into the metadata buffer.
+ ScanRange* metadata_range = scan_node_->AllocateScanRange(
+ metadata_range_->fs(), filename(), metadata_size, metadata_start,
partition_id,
+ metadata_range_->disk_id(), metadata_range_->expected_local(),
+ BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size));
+
+ unique_ptr<BufferDescriptor> io_buffer;
+ bool needs_buffers;
+ RETURN_IF_ERROR(
+ scan_node_->reader_context()->StartScanRange(metadata_range,
&needs_buffers));
+ DCHECK(!needs_buffers) << "Already provided a buffer";
+ RETURN_IF_ERROR(metadata_range->GetNext(&io_buffer));
+ DCHECK_EQ(io_buffer->buffer(), metadata_buffer.buffer());
+ DCHECK_EQ(io_buffer->len(), metadata_size);
+ DCHECK(io_buffer->eosr());
+ metadata_range->ReturnBuffer(move(io_buffer));
+ }
+
+ // Deserialize file footer
+ // TODO: this takes ~7ms for a 1000-column table, figure out how to reduce
this.
+ Status status =
+ DeserializeThriftMsg(metadata_ptr, &metadata_size, true,
&file_metadata_);
+ if (!status.ok()) {
+ return Status(Substitute("File '$0' of length $1 bytes has invalid file
metadata "
+ "at file offset $2, Error = $3.", filename(), file_len, metadata_start,
+ status.GetDetail()));
+ }
+
+ RETURN_IF_ERROR(ParquetMetadataUtils::ValidateFileVersion(file_metadata_,
filename()));
+
+ // IMPALA-3943: Do not throw an error for empty files for backwards
compatibility.
+ if (file_metadata_.num_rows == 0) {
+ // Warn if the num_rows is inconsistent with the row group metadata.
+ if (!file_metadata_.row_groups.empty()) {
+ bool has_non_empty_row_group = false;
+ for (const parquet::RowGroup& row_group : file_metadata_.row_groups) {
+ if (row_group.num_rows > 0) {
+ has_non_empty_row_group = true;
+ break;
+ }
+ }
+ // Warn if there is at least one non-empty row group.
+ if (has_non_empty_row_group) {
+ ErrorMsg msg(TErrorCode::PARQUET_ZERO_ROWS_IN_NON_EMPTY_FILE,
filename());
+ state_->LogError(msg);
+ }
+ }
+ return Status::OK();
+ }
+
+ // Parse out the created by application version string
+ if (file_metadata_.__isset.created_by) {
+ file_version_ = ParquetFileVersion(file_metadata_.created_by);
+ }
+ if (file_metadata_.row_groups.empty()) {
+ return Status(
+ Substitute("Invalid file. This file: $0 has no row groups",
filename()));
+ }
+ if (file_metadata_.num_rows < 0) {
+ return Status(Substitute("Corrupt Parquet file '$0': negative row count $1
in "
+ "file metadata", filename(), file_metadata_.num_rows));
+ }
+ return Status::OK();
+}
+
+Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor&
tuple_desc,
+ const ParquetSchemaResolver& schema_resolver,
+ vector<ParquetColumnReader*>* column_readers) {
+ DCHECK(column_readers != nullptr);
+ DCHECK(column_readers->empty());
+
+ if (scan_node_->optimize_parquet_count_star()) {
+ // Column readers are not needed because we are not reading from any
columns if this
+ // optimization is enabled.
+ return Status::OK();
+ }
+
+ // Each tuple can have at most one position slot. We'll process this slot
desc last.
+ SlotDescriptor* pos_slot_desc = nullptr;
+
+ for (SlotDescriptor* slot_desc: tuple_desc.slots()) {
+ // Skip partition columns
+ if (&tuple_desc == scan_node_->tuple_desc() &&
+ slot_desc->col_pos() < scan_node_->num_partition_keys()) continue;
+
+ SchemaNode* node = nullptr;
+ bool pos_field;
+ bool missing_field;
+ RETURN_IF_ERROR(schema_resolver.ResolvePath(
+ slot_desc->col_path(), &node, &pos_field, &missing_field));
+
+ if (missing_field) {
+ // In this case, we are selecting a column that is not in the file.
+ // Update the template tuple to put a NULL in this slot.
+ Tuple** template_tuple = &template_tuple_map_[&tuple_desc];
+ if (*template_tuple == nullptr) {
+ *template_tuple =
+ Tuple::Create(tuple_desc.byte_size(), template_tuple_pool_.get());
+ }
+ (*template_tuple)->SetNull(slot_desc->null_indicator_offset());
+ continue;
+ }
+
+ if (pos_field) {
+ DCHECK(pos_slot_desc == nullptr)
+ << "There should only be one position slot per tuple";
+ pos_slot_desc = slot_desc;
+ continue;
+ }
+
+ RETURN_IF_ERROR(ParquetMetadataUtils::ValidateColumn(filename(),
*node->element,
+ slot_desc, state_));
+
+ ParquetColumnReader* col_reader = ParquetColumnReader::Create(
+ *node, slot_desc->type().IsCollectionType(), slot_desc, this);
+ column_readers->push_back(col_reader);
+
+ if (col_reader->IsCollectionReader()) {
+ // Recursively populate col_reader's children
+ DCHECK(slot_desc->collection_item_descriptor() != nullptr);
+ const TupleDescriptor* item_tuple_desc =
slot_desc->collection_item_descriptor();
+ CollectionColumnReader* collection_reader =
+ static_cast<CollectionColumnReader*>(col_reader);
+ RETURN_IF_ERROR(CreateColumnReaders(
+ *item_tuple_desc, schema_resolver, collection_reader->children()));
+ }
+ }
+
+ if (column_readers->empty()) {
+ // This is either a count(*) over a collection type (count(*) over the
table is
+ // handled in ProcessFooter()), or no materialized columns appear in this
file
+ // (e.g. due to schema evolution, or if there's only a position slot).
Create a single
+ // column reader that we will use to count the number of tuples we should
output. We
+ // will not read any values from this reader.
+ ParquetColumnReader* reader;
+ RETURN_IF_ERROR(CreateCountingReader(
+ tuple_desc.tuple_path(), schema_resolver, &reader));
+ column_readers->push_back(reader);
+ }
+
+ if (pos_slot_desc != nullptr) {
+ // 'tuple_desc' has a position slot. Use an existing column reader to
populate it.
+ DCHECK(!column_readers->empty());
+ (*column_readers)[0]->set_pos_slot_desc(pos_slot_desc);
+ }
+
+ return Status::OK();
+}
+
+Status HdfsParquetScanner::CreateCountingReader(const SchemaPath& parent_path,
+ const ParquetSchemaResolver& schema_resolver, ParquetColumnReader**
reader) {
+ SchemaNode* parent_node;
+ bool pos_field;
+ bool missing_field;
+ RETURN_IF_ERROR(schema_resolver.ResolvePath(
+ parent_path, &parent_node, &pos_field, &missing_field));
+
+ if (missing_field) {
+ // TODO: can we do anything else here?
+ return Status(Substitute("Could not find '$0' in file '$1'.",
+ PrintPath(*scan_node_->hdfs_table(), parent_path), filename()));
+ }
+ DCHECK(!pos_field);
+ DCHECK(parent_path.empty() || parent_node->is_repeated());
+
+ if (!parent_node->children.empty()) {
+ // Find a non-struct (i.e. collection or scalar) child of 'parent_node',
which we will
+ // use to create the item reader
+ const SchemaNode* target_node = &parent_node->children[0];
+ while (!target_node->children.empty() && !target_node->is_repeated()) {
+ target_node = &target_node->children[0];
+ }
+
+ *reader = ParquetColumnReader::Create(
+ *target_node, target_node->is_repeated(), nullptr, this);
+ if (target_node->is_repeated()) {
+ // Find the closest scalar descendant of 'target_node' via breadth-first
search, and
+ // create scalar reader to drive 'reader'. We find the closest (i.e.
least-nested)
+ // descendant as a heuristic for picking a descendant with fewer values,
so it's
+ // faster to scan.
+ // TODO: use different heuristic than least-nested? Fewest values?
+ const SchemaNode* node = nullptr;
+ queue<const SchemaNode*> nodes;
+ nodes.push(target_node);
+ while (!nodes.empty()) {
+ node = nodes.front();
+ nodes.pop();
+ if (node->children.size() > 0) {
+ for (const SchemaNode& child: node->children) nodes.push(&child);
+ } else {
+ // node is the least-nested scalar descendant of 'target_node'
+ break;
+ }
+ }
+ DCHECK(node->children.empty()) << node->DebugString();
+ CollectionColumnReader* parent_reader =
+ static_cast<CollectionColumnReader*>(*reader);
+ parent_reader->children()->push_back(
+ ParquetColumnReader::Create(*node, false, nullptr, this));
+ }
+ } else {
+ // Special case for a repeated scalar node. The repeated node represents
both the
+ // parent collection and the child item.
+ *reader = ParquetColumnReader::Create(*parent_node, false, nullptr, this);
+ }
+
+ return Status::OK();
+}
+
+void HdfsParquetScanner::InitCollectionColumns() {
+ for (CollectionColumnReader* col_reader: collection_readers_) {
+ col_reader->Reset();
+ }
+}
+
+Status HdfsParquetScanner::InitScalarColumns() {
+ int64_t partition_id = context_->partition_descriptor()->id();
+ const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id,
filename());
+ DCHECK(file_desc != nullptr);
+ parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
+
+ // Used to validate that the number of values in each reader in
column_readers_ at the
+ // same SchemaElement is the same.
+ unordered_map<const parquet::SchemaElement*, int> num_values_map;
+ for (BaseScalarColumnReader* scalar_reader : scalar_readers_) {
+ const parquet::ColumnChunk& col_chunk =
row_group.columns[scalar_reader->col_idx()];
+ auto num_values_it = num_values_map.find(&scalar_reader->schema_element());
+ int num_values = -1;
+ if (num_values_it != num_values_map.end()) {
+ num_values = num_values_it->second;
+ } else {
+ num_values_map[&scalar_reader->schema_element()] =
col_chunk.meta_data.num_values;
+ }
+ if (num_values != -1 && col_chunk.meta_data.num_values != num_values) {
+ // TODO: improve this error message by saying which columns are
different,
+ // and also specify column in other error messages as appropriate
+ return Status(TErrorCode::PARQUET_NUM_COL_VALS_ERROR,
scalar_reader->col_idx(),
+ col_chunk.meta_data.num_values, num_values, filename());
+ }
+ RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk,
row_group_idx_));
+ }
+ RETURN_IF_ERROR(DivideReservationBetweenColumns(scalar_readers_));
+ return Status::OK();
+}
+
+Status HdfsParquetScanner::DivideReservationBetweenColumns(
+ const vector<BaseScalarColumnReader*>& column_readers) {
+ DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+ const int64_t min_buffer_size = io_mgr->min_buffer_size();
+ const int64_t max_buffer_size = io_mgr->max_buffer_size();
+ // The HdfsScanNode reservation calculation in the planner ensures that we
have
+ // reservation for at least one buffer per column.
+ if (context_->total_reservation() < min_buffer_size * column_readers.size())
{
+ return Status(TErrorCode::INTERNAL_ERROR,
+ Substitute("Not enough reservation in Parquet scanner for file '$0'.
Need at "
+ "least $1 bytes per column for $2 columns but had $3 bytes",
+ filename(), min_buffer_size, column_readers.size(),
+ context_->total_reservation()));
+ }
+
+ vector<int64_t> col_range_lengths(column_readers.size());
+ for (int i = 0; i < column_readers.size(); ++i) {
+ col_range_lengths[i] = column_readers[i]->scan_range()->len();
+ }
+
+ // The scanner-wide stream was used only to read the file footer. Each
column has added
+ // its own stream. We can use the total reservation now that 'stream_''s
resources have
+ // been released. We may benefit from increasing reservation further, so
let's compute
+ // the ideal reservation to scan all the columns.
+ int64_t ideal_reservation = ComputeIdealReservation(col_range_lengths);
+ if (ideal_reservation > context_->total_reservation()) {
+ context_->TryIncreaseReservation(ideal_reservation);
+ }
+
scan_node_->runtime_profile()->GetSummaryStatsCounter(ACTUAL_RESERVATION_COUNTER_NAME)->
+ UpdateCounter(context_->total_reservation());
+
scan_node_->runtime_profile()->GetSummaryStatsCounter(IDEAL_RESERVATION_COUNTER_NAME)->
+ UpdateCounter(ideal_reservation);
+
+ vector<pair<int, int64_t>> tmp_reservations =
DivideReservationBetweenColumnsHelper(
+ min_buffer_size, max_buffer_size, col_range_lengths,
context_->total_reservation());
+ for (auto& tmp_reservation : tmp_reservations) {
+
column_readers[tmp_reservation.first]->set_io_reservation(tmp_reservation.second);
+ }
+ return Status::OK();
+}
+
+int64_t HdfsParquetScanner::ComputeIdealReservation(
+ const vector<int64_t>& col_range_lengths) {
+ DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+ int64_t ideal_reservation = 0;
+ for (int64_t len : col_range_lengths) {
+ ideal_reservation += io_mgr->ComputeIdealBufferReservation(len);
+ }
+ return ideal_reservation;
+}
+
+vector<pair<int, int64_t>>
HdfsParquetScanner::DivideReservationBetweenColumnsHelper(
+ int64_t min_buffer_size, int64_t max_buffer_size,
+ const vector<int64_t>& col_range_lengths, int64_t
reservation_to_distribute) {
+ // Pair of (column index, reservation allocated).
+ vector<pair<int, int64_t>> tmp_reservations;
+ for (int i = 0; i < col_range_lengths.size(); ++i)
tmp_reservations.emplace_back(i, 0);
+
+ // Sort in descending order of length, breaking ties by index so that larger
columns
+ // get allocated reservation first. It is common to have dramatically
different column
+ // sizes in a single file because of different value sizes and
compressibility. E.g.
+ // consider a large STRING "comment" field versus a highly compressible
+ // dictionary-encoded column with only a few distinct values. We want to
give max-sized
+ // buffers to large columns first to maximize the size of I/Os that we do
while reading
+ // this row group.
+ sort(tmp_reservations.begin(), tmp_reservations.end(),
+ [&col_range_lengths](
+ const pair<int, int64_t>& left, const pair<int, int64_t>& right) {
+ int64_t left_len = col_range_lengths[left.first];
+ int64_t right_len = col_range_lengths[right.first];
+ return left_len != right_len ? left_len > right_len : left.first <
right.first;
+ });
+
+ // Set aside the minimum reservation per column.
+ reservation_to_distribute -= min_buffer_size * col_range_lengths.size();
+
+ // Allocate reservations to columns by repeatedly allocating either a
max-sized buffer
+ // or a large enough buffer to fit the remaining data for each column. Do
this
+ // round-robin up to the ideal number of I/O buffers.
+ for (int i = 0; i < DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE; ++i) {
+ for (auto& tmp_reservation : tmp_reservations) {
+ // Add back the reservation we set aside above.
+ if (i == 0) reservation_to_distribute += min_buffer_size;
+
+ int64_t bytes_left_in_range =
+ col_range_lengths[tmp_reservation.first] - tmp_reservation.second;
+ int64_t bytes_to_add;
+ if (bytes_left_in_range >= max_buffer_size) {
+ if (reservation_to_distribute >= max_buffer_size) {
+ bytes_to_add = max_buffer_size;
+ } else if (i == 0) {
+ DCHECK_EQ(0, tmp_reservation.second);
+ // Ensure this range gets at least one buffer on the first iteration.
+ bytes_to_add =
BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute);
+ } else {
+ DCHECK_GT(tmp_reservation.second, 0);
+ // We need to read more than the max buffer size, but can't allocate
a
+ // max-sized buffer. Stop adding buffers to this column: we prefer
to use
+ // the existing max-sized buffers without small buffers mixed in so
that
+ // we will alway do max-sized I/Os, which make efficient use of I/O
devices.
+ bytes_to_add = 0;
+ }
+ } else if (bytes_left_in_range > 0 &&
+ reservation_to_distribute >= min_buffer_size) {
+ // Choose a buffer size that will fit the rest of the bytes left in
the range.
+ bytes_to_add =
+ max(min_buffer_size,
BitUtil::RoundUpToPowerOfTwo(bytes_left_in_range));
+ // But don't add more reservation than is available.
+ bytes_to_add =
+ min(bytes_to_add,
BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute));
+ } else {
+ bytes_to_add = 0;
+ }
+ DCHECK(bytes_to_add == 0 || bytes_to_add >= min_buffer_size) <<
bytes_to_add;
+ reservation_to_distribute -= bytes_to_add;
+ tmp_reservation.second += bytes_to_add;
+ DCHECK_GE(reservation_to_distribute, 0);
+ DCHECK_GT(tmp_reservation.second, 0);
+ }
+ }
+ return tmp_reservations;
+}
+
+Status HdfsParquetScanner::InitDictionaries(
+ const vector<BaseScalarColumnReader*>& column_readers) {
+ for (BaseScalarColumnReader* scalar_reader : column_readers) {
+ RETURN_IF_ERROR(scalar_reader->InitDictionary());
+ }
+ return Status::OK();
+}
+
+Status HdfsParquetScanner::ValidateEndOfRowGroup(
+ const vector<ParquetColumnReader*>& column_readers, int row_group_idx,
+ int64_t rows_read) {
+ DCHECK(!column_readers.empty());
+ DCHECK(parse_status_.ok()) << "Don't overwrite parse_status_"
+ << parse_status_.GetDetail();
+
+ if (column_readers[0]->max_rep_level() == 0) {
+ // These column readers materialize table-level values (vs. collection
values). Test
+ // if the expected number of rows from the file metadata matches the
actual number of
+ // rows read from the file.
+ int64_t expected_rows_in_group =
file_metadata_.row_groups[row_group_idx].num_rows;
+ if (rows_read != expected_rows_in_group) {
+ return Status(TErrorCode::PARQUET_GROUP_ROW_COUNT_ERROR, filename(),
row_group_idx,
+ expected_rows_in_group, rows_read);
+ }
+ }
+
+ // Validate scalar column readers' state
+ int num_values_read = -1;
+ for (int c = 0; c < column_readers.size(); ++c) {
+ if (column_readers[c]->IsCollectionReader()) continue;
+ BaseScalarColumnReader* reader =
+ static_cast<BaseScalarColumnReader*>(column_readers[c]);
+ // All readers should have exhausted the final data page. This could fail
if one
+ // column has more values than stated in the metadata, meaning the final
data page
+ // will still have unread values.
+ if (reader->num_buffered_values_ != 0) {
+ return Status(Substitute("Corrupt Parquet metadata in file '$0':
metadata reports "
+ "'$1' more values in data page than actually present", filename(),
+ reader->num_buffered_values_));
+ }
+ // Sanity check that the num_values_read_ value is the same for all
readers. All
+ // readers should have been advanced in lockstep (the above check is more
likely to
+ // fail if this not the case though, since num_values_read_ is only
updated at the end
+ // of a data page).
+ if (num_values_read == -1) num_values_read = reader->num_values_read_;
+ DCHECK_EQ(reader->num_values_read_, num_values_read);
+ // ReadDataPage() uses metadata_->num_values to determine when the
column's done
+ DCHECK(reader->num_values_read_ == reader->metadata_->num_values ||
+ !state_->abort_on_error());
+ }
+ return Status::OK();
+}
+
+ParquetTimestampDecoder HdfsParquetScanner::CreateTimestampDecoder(
+ const parquet::SchemaElement& element) {
+ bool timestamp_conversion_needed_for_int96_timestamps =
+ FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
+ file_version_.application == "parquet-mr";
+
+ return ParquetTimestampDecoder(element, &state_->local_time_zone(),
+ timestamp_conversion_needed_for_int96_timestamps);
+}
+}