Repository: incubator-impala Updated Branches: refs/heads/master ac1215fd3 -> 834365618
IMPALA-3905: Add HdfsScanner::GetNext() interface and implementation for Parquet. This is a first step towards making our scan node single threaded since we are moving to an execution model where multi-threading is done at the fragment level. This patch adds a new synchronous HdfsScanner::GetNext() interface and implements it for the Parquet scanner. The async execution via HdfsScanner::ProcessSplit() is still supported and is implemented by repeatedly calling GetNext() for code sharing purposes. I did not yet add a single-threaded scan node that uses GetNext(). The new code will be excercised by the existing scan node and tests. Testing: I ran an exhaustive private build which passed. I also ran a microbenchmark on a big TPCH lineitem table and there was no significant difference in scan performance. Change-Id: Iab50770bac05afcda4d3404fb4f53a0104931eb0 Reviewed-on: http://gerrit.cloudera.org:8080/3801 Reviewed-by: Alex Behm <[email protected]> Tested-by: Alex Behm <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/83436561 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/83436561 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/83436561 Branch: refs/heads/master Commit: 8343656189d631491b7372a60e228c851b61165c Parents: ac1215f Author: Alex Behm <[email protected]> Authored: Mon Jul 11 21:54:21 2016 -0700 Committer: Alex Behm <[email protected]> Committed: Sat Jul 30 23:52:27 2016 +0000 ---------------------------------------------------------------------- be/src/exec/base-sequence-scanner.cc | 23 +- be/src/exec/base-sequence-scanner.h | 6 +- be/src/exec/hdfs-avro-scanner.cc | 9 +- be/src/exec/hdfs-avro-scanner.h | 4 +- be/src/exec/hdfs-parquet-scanner.cc | 491 +++++++++++++++++------------- be/src/exec/hdfs-parquet-scanner.h | 82 +++-- be/src/exec/hdfs-rcfile-scanner.cc | 9 +- be/src/exec/hdfs-rcfile-scanner.h | 5 +- be/src/exec/hdfs-scan-node.cc | 31 +- be/src/exec/hdfs-scan-node.h | 24 +- be/src/exec/hdfs-scanner-ir.cc | 1 + be/src/exec/hdfs-scanner.cc | 22 +- be/src/exec/hdfs-scanner.h | 78 +++-- be/src/exec/hdfs-sequence-scanner.cc | 9 +- be/src/exec/hdfs-sequence-scanner.h | 17 +- be/src/exec/hdfs-text-scanner.cc | 27 +- be/src/exec/hdfs-text-scanner.h | 7 +- be/src/exec/parquet-column-readers.h | 16 +- 18 files changed, 501 insertions(+), 360 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/base-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc index 268fdae..2af3ee7 100644 --- a/be/src/exec/base-sequence-scanner.cc +++ b/be/src/exec/base-sequence-scanner.cc @@ -63,8 +63,9 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNode* scan_node, return Status::OK(); } -BaseSequenceScanner::BaseSequenceScanner(HdfsScanNode* node, RuntimeState* state) - : HdfsScanner(node, state), +BaseSequenceScanner::BaseSequenceScanner(HdfsScanNode* node, RuntimeState* state, + bool add_batches_to_queue) + : HdfsScanner(node, state, add_batches_to_queue), header_(NULL), block_start_(0), total_block_size_(0), @@ -83,15 +84,17 @@ BaseSequenceScanner::BaseSequenceScanner() BaseSequenceScanner::~BaseSequenceScanner() { } -Status BaseSequenceScanner::Prepare(ScannerContext* context) { - RETURN_IF_ERROR(HdfsScanner::Prepare(context)); +Status BaseSequenceScanner::Open(ScannerContext* context) { + RETURN_IF_ERROR(HdfsScanner::Open(context)); stream_->set_read_past_size_cb(bind(&BaseSequenceScanner::ReadPastSize, this, _1)); bytes_skipped_counter_ = ADD_COUNTER( scan_node_->runtime_profile(), "BytesSkipped", TUnit::BYTES); + // Allocate a new row batch. May fail if mem limit is exceeded. + RETURN_IF_ERROR(StartNewRowBatch()); return Status::OK(); } -void BaseSequenceScanner::Close() { +void BaseSequenceScanner::Close(RowBatch* row_batch) { VLOG_FILE << "Bytes read past scan range: " << -stream_->bytes_left(); VLOG_FILE << "Average block size: " << (num_syncs_ > 1 ? total_block_size_ / (num_syncs_ - 1) : 0); @@ -101,9 +104,10 @@ void BaseSequenceScanner::Close() { decompressor_->Close(); decompressor_.reset(NULL); } - if (batch_ != NULL) { - AttachPool(data_buffer_pool_.get(), false); - AddFinalRowBatch(); + if (row_batch != NULL) { + row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false); + context_->ReleaseCompletedResources(row_batch, true); + if (add_batches_to_queue_) scan_node_->AddMaterializedRowBatch(row_batch); } // Verify all resources (if any) have been transferred. DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0); @@ -112,10 +116,11 @@ void BaseSequenceScanner::Close() { if (!only_parsing_header_ && header_ != NULL) { scan_node_->RangeComplete(file_format(), header_->compression_type); } - HdfsScanner::Close(); + HdfsScanner::Close(row_batch); } Status BaseSequenceScanner::ProcessSplit() { + DCHECK(add_batches_to_queue_); header_ = reinterpret_cast<FileHeader*>( scan_node_->GetFileMetadata(stream_->filename())); if (header_ == NULL) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/base-sequence-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/base-sequence-scanner.h b/be/src/exec/base-sequence-scanner.h index ea7ad78..7c02aef 100644 --- a/be/src/exec/base-sequence-scanner.h +++ b/be/src/exec/base-sequence-scanner.h @@ -38,8 +38,8 @@ class BaseSequenceScanner : public HdfsScanner { static Status IssueInitialRanges(HdfsScanNode* scan_node, const std::vector<HdfsFileDesc*>& files); - virtual Status Prepare(ScannerContext* context); - virtual void Close(); + virtual Status Open(ScannerContext* context); + virtual void Close(RowBatch* row_batch); virtual Status ProcessSplit(); virtual ~BaseSequenceScanner(); @@ -99,7 +99,7 @@ class BaseSequenceScanner : public HdfsScanner { /// Returns type of scanner: e.g. rcfile, seqfile virtual THdfsFileFormat::type file_format() const = 0; - BaseSequenceScanner(HdfsScanNode*, RuntimeState*); + BaseSequenceScanner(HdfsScanNode*, RuntimeState*, bool); /// Read and validate sync marker against header_->sync. Returns non-ok if the sync /// marker did not match. Scanners should always use this function to read sync markers, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-avro-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc index 09d3955..39a204c 100644 --- a/be/src/exec/hdfs-avro-scanner.cc +++ b/be/src/exec/hdfs-avro-scanner.cc @@ -52,8 +52,9 @@ const string AVRO_MEM_LIMIT_EXCEEDED = "HdfsAvroScanner::$0() failed to allocate #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_ -HdfsAvroScanner::HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state) - : BaseSequenceScanner(scan_node, state), +HdfsAvroScanner::HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state, + bool add_batches_to_queue) + : BaseSequenceScanner(scan_node, state, add_batches_to_queue), avro_header_(NULL), codegend_decode_avro_data_(NULL) { } @@ -65,8 +66,8 @@ HdfsAvroScanner::HdfsAvroScanner() DCHECK(TestInfo::is_test()); } -Status HdfsAvroScanner::Prepare(ScannerContext* context) { - RETURN_IF_ERROR(BaseSequenceScanner::Prepare(context)); +Status HdfsAvroScanner::Open(ScannerContext* context) { + RETURN_IF_ERROR(BaseSequenceScanner::Open(context)); if (scan_node_->avro_schema().schema == NULL) { return Status("Missing Avro schema in scan node. This could be due to stale " "metadata. Running 'invalidate metadata <tablename>' may resolve the problem."); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-avro-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h index ac2ba40..1cd218c 100644 --- a/be/src/exec/hdfs-avro-scanner.h +++ b/be/src/exec/hdfs-avro-scanner.h @@ -86,9 +86,9 @@ class HdfsAvroScanner : public BaseSequenceScanner { /// Avro file: {'O', 'b', 'j', 1} static const uint8_t AVRO_VERSION_HEADER[4]; - HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state); + HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state, bool add_batches_to_queue); - virtual Status Prepare(ScannerContext* context); + virtual Status Open(ScannerContext* context); /// Codegen parsing records, writing tuples and evaluating predicates. static llvm::Function* Codegen(HdfsScanNode*, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index 6855fef..02f8a3e 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -132,8 +132,12 @@ DiskIoMgr::ScanRange* HdfsParquetScanner::FindFooterSplit(HdfsFileDesc* file) { namespace impala { -HdfsParquetScanner::HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* state) - : HdfsScanner(scan_node, state), +HdfsParquetScanner::HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* state, + bool add_batches_to_queue) + : HdfsScanner(scan_node, state, add_batches_to_queue), + row_group_idx_(-1), + row_group_rows_read_(0), + advance_row_group_(true), scratch_batch_(new ScratchTupleBatch( scan_node->row_desc(), state_->batch_size(), scan_node->mem_tracker())), metadata_range_(NULL), @@ -144,8 +148,9 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* st assemble_rows_timer_.Stop(); } -Status HdfsParquetScanner::Prepare(ScannerContext* context) { - RETURN_IF_ERROR(HdfsScanner::Prepare(context)); +Status HdfsParquetScanner::Open(ScannerContext* context) { + RETURN_IF_ERROR(HdfsScanner::Open(context)); + stream_->set_contains_tuple_data(false); metadata_range_ = stream_->scan_range(); num_cols_counter_ = ADD_COUNTER(scan_node_->runtime_profile(), "NumColumns", TUnit::UNIT); @@ -162,56 +167,74 @@ Status HdfsParquetScanner::Prepare(ScannerContext* context) { if (!ctx->filter->AlwaysTrue()) filter_ctxs_.push_back(ctx); } filter_stats_.resize(filter_ctxs_.size()); + + DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail(); + + // First process the file metadata in the footer. + Status status = ProcessFooter(); + // Release I/O buffers immediately to make sure they are cleaned up + // in case we return a non-OK status anywhere below. + context_->ReleaseCompletedResources(NULL, true); + RETURN_IF_ERROR(status); + + // Parse the file schema into an internal representation for schema resolution. + schema_resolver_.reset(new ParquetSchemaResolver(*scan_node_->hdfs_table(), + state_->query_options().parquet_fallback_schema_resolution)); + RETURN_IF_ERROR(schema_resolver_->Init(&file_metadata_, filename())); + + // We've processed the metadata and there are columns that need to be materialized. + RETURN_IF_ERROR(CreateColumnReaders( + *scan_node_->tuple_desc(), *schema_resolver_, &column_readers_)); + COUNTER_SET(num_cols_counter_, + static_cast<int64_t>(CountScalarColumns(column_readers_))); + // Set top-level template tuple. + template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()]; + + // The scanner-wide stream was used only to read the file footer. Each column has added + // its own stream. + stream_ = NULL; return Status::OK(); } -void HdfsParquetScanner::Close() { - vector<THdfsCompression::type> compression_types; +void HdfsParquetScanner::Close(RowBatch* row_batch) { + if (row_batch != NULL) { + FlushRowGroupResources(row_batch); + if (add_batches_to_queue_) scan_node_->AddMaterializedRowBatch(row_batch); + } + // Verify all resources (if any) have been transferred. + DCHECK_EQ(dictionary_pool_.get()->total_allocated_bytes(), 0); + DCHECK_EQ(scratch_batch_->mem_pool()->total_allocated_bytes(), 0); + DCHECK_EQ(context_->num_completed_io_buffers(), 0); - // Visit each column reader, including collection reader children. + // Collect compression types for reporting completed ranges. + vector<THdfsCompression::type> compression_types; stack<ParquetColumnReader*> readers; for (ParquetColumnReader* r: column_readers_) readers.push(r); while (!readers.empty()) { - ParquetColumnReader* col_reader = readers.top(); + ParquetColumnReader* reader = readers.top(); readers.pop(); - - if (col_reader->IsCollectionReader()) { - CollectionColumnReader* collection_reader = - static_cast<CollectionColumnReader*>(col_reader); - for (ParquetColumnReader* r: *collection_reader->children()) readers.push(r); + if (reader->IsCollectionReader()) { + CollectionColumnReader* coll_reader = static_cast<CollectionColumnReader*>(reader); + for (ParquetColumnReader* r: *coll_reader->children()) readers.push(r); continue; } - - BaseScalarColumnReader* scalar_reader = - static_cast<BaseScalarColumnReader*>(col_reader); - if (scalar_reader->decompressed_data_pool() != NULL) { - // No need to commit the row batches with the AttachPool() calls - // since AddFinalRowBatch() already does below. - AttachPool(scalar_reader->decompressed_data_pool(), false); - } - scalar_reader->Close(); + BaseScalarColumnReader* scalar_reader = static_cast<BaseScalarColumnReader*>(reader); compression_types.push_back(scalar_reader->codec()); } - if (batch_ != NULL) { - AttachPool(dictionary_pool_.get(), false); - AttachPool(scratch_batch_->mem_pool(), false); - AddFinalRowBatch(); - } - // Verify all resources (if any) have been transferred. - DCHECK_EQ(dictionary_pool_.get()->total_allocated_bytes(), 0); - DCHECK_EQ(scratch_batch_->mem_pool()->total_allocated_bytes(), 0); - DCHECK_EQ(context_->num_completed_io_buffers(), 0); + assemble_rows_timer_.Stop(); + assemble_rows_timer_.ReleaseCounter(); + // If this was a metadata only read (i.e. count(*)), there are no columns. if (compression_types.empty()) compression_types.push_back(THdfsCompression::NONE); scan_node_->RangeComplete(THdfsFileFormat::PARQUET, compression_types); - assemble_rows_timer_.Stop(); - assemble_rows_timer_.ReleaseCounter(); if (level_cache_pool_.get() != NULL) { level_cache_pool_->FreeAll(); - level_cache_pool_.reset(NULL); + level_cache_pool_.reset(); } + if (schema_resolver_.get() != NULL) schema_resolver_.reset(); + for (int i = 0; i < filter_ctxs_.size(); ++i) { const FilterStats* stats = filter_ctxs_[i]->stats; const LocalFilterStats& local = filter_stats_[i]; @@ -219,7 +242,7 @@ void HdfsParquetScanner::Close() { local.considered, local.rejected); } - HdfsScanner::Close(); + HdfsScanner::Close(row_batch); } // Get the start of the column. @@ -263,33 +286,102 @@ int HdfsParquetScanner::CountScalarColumns(const vector<ParquetColumnReader*>& c } Status HdfsParquetScanner::ProcessSplit() { - DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail(); - // First process the file metadata in the footer - bool eosr; - RETURN_IF_ERROR(ProcessFooter(&eosr)); - if (eosr) return Status::OK(); + DCHECK(add_batches_to_queue_); + bool scanner_eos = false; + do { + RETURN_IF_ERROR(StartNewRowBatch()); + RETURN_IF_ERROR(GetNextInternal(batch_, &scanner_eos)); + scan_node_->AddMaterializedRowBatch(batch_); + } while (!scanner_eos && !scan_node_->ReachedLimit()); + + // Transfer the remaining resources to this new batch in Close(). + RETURN_IF_ERROR(StartNewRowBatch()); + return Status::OK(); +} - // Parse the file schema into an internal representation for schema resolution. - ParquetSchemaResolver schema_resolver(*scan_node_->hdfs_table(), - state_->query_options().parquet_fallback_schema_resolution); - RETURN_IF_ERROR(schema_resolver.Init(&file_metadata_, filename())); +Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch, bool* eos) { + if (scan_node_->IsZeroSlotTableScan()) { + // There are no materialized slots, e.g. count(*) over the table. We can serve + // this query from just the file metadata. We don't need to read the column data. + if (row_group_rows_read_ == file_metadata_.num_rows) { + *eos = true; + return Status::OK(); + } + assemble_rows_timer_.Start(); + int rows_remaining = file_metadata_.num_rows - row_group_rows_read_; + int max_tuples = min(row_batch->capacity(), rows_remaining); + TupleRow* current_row = row_batch->GetRow(row_batch->AddRow()); + int num_to_commit = WriteEmptyTuples(context_, current_row, max_tuples); + Status status = CommitRows(row_batch, num_to_commit); + assemble_rows_timer_.Stop(); + RETURN_IF_ERROR(status); + row_group_rows_read_ += num_to_commit; + COUNTER_ADD(scan_node_->rows_read_counter(), row_group_rows_read_); + return Status::OK(); + } - // We've processed the metadata and there are columns that need to be materialized. - RETURN_IF_ERROR( - CreateColumnReaders(*scan_node_->tuple_desc(), schema_resolver, &column_readers_)); - COUNTER_SET(num_cols_counter_, - static_cast<int64_t>(CountScalarColumns(column_readers_))); - // Set top-level template tuple. - template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()]; + // Transfer remaining tuples from the scratch batch. + if (!scratch_batch_->AtEnd()) { + assemble_rows_timer_.Start(); + int num_row_to_commit = TransferScratchTuples(row_batch); + assemble_rows_timer_.Stop(); + RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit)); + if (row_batch->AtCapacity()) return Status::OK(); + } - // The scanner-wide stream was used only to read the file footer. Each column has added - // its own stream. - stream_ = NULL; + while (advance_row_group_ || column_readers_[0]->RowGroupAtEnd()) { + if (!advance_row_group_) { + // End of the previous row group. Transfer resources and clear streams because + // we will create new streams for the next row group. + FlushRowGroupResources(row_batch); + context_->ClearStreams(); + Status status = + ValidateEndOfRowGroup(column_readers_, row_group_idx_, row_group_rows_read_); + if (!status.ok()) RETURN_IF_ERROR(state_->LogOrReturnError(status.msg())); + } + RETURN_IF_ERROR(NextRowGroup()); + if (row_group_idx_ >= file_metadata_.row_groups.size()) { + *eos = true; + DCHECK(parse_status_.ok()); + return Status::OK(); + } + } + + // Apply any runtime filters to static tuples containing the partition keys for this + // partition. If any filter fails, we return immediately and stop processing this + // scan range. + if (!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(), + FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) { + *eos = true; + DCHECK(parse_status_.ok()); + return Status::OK(); + } + assemble_rows_timer_.Start(); + Status status = AssembleRows(column_readers_, row_batch, &advance_row_group_); + assemble_rows_timer_.Stop(); + RETURN_IF_ERROR(status); + if (!parse_status_.ok()) { + RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg())); + parse_status_ = Status::OK(); + } + + return Status::OK(); +} + +Status HdfsParquetScanner::NextRowGroup() { + advance_row_group_ = false; + row_group_rows_read_ = 0; - // Iterate through each row group in the file and process any row groups that fall - // within this split. - for (int i = 0; i < file_metadata_.row_groups.size(); ++i) { - const parquet::RowGroup& row_group = file_metadata_.row_groups[i]; + // Loop until we have found a non-empty row group, and successfully initialized and + // seeded the column readers. Return a non-OK status from within loop only if the error + // is non-recoverable, otherwise log the error and continue with the next row group. + while (true) { + // Reset the parse status for the next row group. + parse_status_ = Status::OK(); + + ++row_group_idx_; + if (row_group_idx_ >= file_metadata_.row_groups.size()) break; + const parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_]; if (row_group.num_rows == 0) continue; const DiskIoMgr::ScanRange* split_range = reinterpret_cast<ScanRangeMetadata*>( @@ -302,23 +394,16 @@ Status HdfsParquetScanner::ProcessSplit() { int64_t split_offset = split_range->offset(); int64_t split_length = split_range->len(); if (!(row_group_mid_pos >= split_offset && - row_group_mid_pos < split_offset + split_length)) continue; + row_group_mid_pos < split_offset + split_length)) { + // A row group is processed by the scanner whose split overlaps with the row + // group's mid point. This row group will be handled by a different scanner. + continue; + } COUNTER_ADD(num_row_groups_counter_, 1); - // Attach any resources and clear the streams before starting a new row group. These - // streams could either be just the footer stream or streams for the previous row - // group. - context_->ReleaseCompletedResources(batch_, /* done */ true); - context_->ClearStreams(); - // Commit the rows to flush the row batch from the previous row group - CommitRows(0); - - RETURN_IF_ERROR(InitColumns(i, column_readers_)); - - assemble_rows_timer_.Start(); - // Prepare column readers for first read - bool continue_execution = true; + RETURN_IF_ERROR(InitColumns(row_group_idx_, column_readers_)); + bool seeding_ok = true; for (ParquetColumnReader* col_reader: column_readers_) { // Seed collection and boolean column readers with NextLevel(). // The ScalarColumnReaders use an optimized ReadValueBatch() that @@ -328,48 +413,127 @@ Status HdfsParquetScanner::ProcessSplit() { // will allow better sharing of code between the row-wise and column-wise // materialization strategies. if (col_reader->NeedsSeedingForBatchedReading()) { - continue_execution = col_reader->NextLevels(); + if (!col_reader->NextLevels()) { + seeding_ok = false; + break; + } } - if (!continue_execution) break; DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail(); } - bool filters_pass = true; - if (continue_execution) { - continue_execution = AssembleRows(column_readers_, i, &filters_pass); - assemble_rows_timer_.Stop(); + if (!parse_status_.ok()) { + RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg())); + } else if (seeding_ok) { + // Found a non-empty row group and successfully initialized the column readers. + break; } + } - // Check the query_status_ before logging the parse_status_. query_status_ is merged - // with parse_status_ in AssembleRows(). It's misleading to log query_status_ as parse - // error because it is shared by all threads in the same fragment instance and it's - // unclear which threads caused the error. - // - // TODO: It's a really bad idea to propagate UDF error via the global RuntimeState. - // Store UDF error in thread local storage or make UDF return status so it can merge - // with parse_status_. - RETURN_IF_ERROR(state_->GetQueryStatus()); - if (UNLIKELY(!parse_status_.ok())) { - RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg())); + DCHECK(parse_status_.ok()); + return Status::OK(); +} + +void HdfsParquetScanner::FlushRowGroupResources(RowBatch* row_batch) { + DCHECK(row_batch != NULL); + row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false); + row_batch->tuple_data_pool()->AcquireData(scratch_batch_->mem_pool(), false); + context_->ReleaseCompletedResources(row_batch, true); + for (ParquetColumnReader* col_reader: column_readers_) { + col_reader->Close(row_batch); + } +} + +/// High-level steps of this function: +/// 1. Allocate 'scratch' memory for tuples able to hold a full batch +/// 2. Populate the slots of all scratch tuples one column reader at a time, +/// using the ColumnReader::Read*ValueBatch() functions. +/// 3. Evaluate runtime filters and conjuncts against the scratch tuples and +/// set the surviving tuples in the output batch. Transfer the ownership of +/// scratch memory to the output batch once the scratch memory is exhausted. +/// 4. Repeat steps above until we are done with the row group or an error +/// occurred. +/// TODO: Since the scratch batch is populated in a column-wise fashion, it is +/// difficult to maintain a maximum memory footprint without throwing away at least +/// some work. This point needs further experimentation and thought. +Status HdfsParquetScanner::AssembleRows( + const vector<ParquetColumnReader*>& column_readers, RowBatch* row_batch, + bool* skip_row_group) { + DCHECK(!column_readers.empty()); + DCHECK(row_batch != NULL); + DCHECK_EQ(*skip_row_group, false); + DCHECK(scratch_batch_ != NULL); + + while (!column_readers[0]->RowGroupAtEnd()) { + // Start a new scratch batch. + RETURN_IF_ERROR(scratch_batch_->Reset(state_)); + int scratch_capacity = scratch_batch_->capacity(); + + // Initialize tuple memory. + for (int i = 0; i < scratch_capacity; ++i) { + InitTuple(template_tuple_, scratch_batch_->GetTuple(i)); } - if (scan_node_->ReachedLimit()) return Status::OK(); - if (context_->cancelled()) return Status::OK(); - if (!filters_pass) return Status::OK(); - - DCHECK(continue_execution || !state_->abort_on_error()); - // We should be at the end of the row group if we get this far with no parse error - if (parse_status_.ok()) DCHECK(column_readers_[0]->RowGroupAtEnd()); - // Reset parse_status_ for the next row group. - parse_status_ = Status::OK(); + + // Materialize the top-level slots into the scratch batch column-by-column. + int last_num_tuples = -1; + int num_col_readers = column_readers.size(); + bool continue_execution = true; + for (int c = 0; c < num_col_readers; ++c) { + ParquetColumnReader* col_reader = column_readers[c]; + if (col_reader->max_rep_level() > 0) { + continue_execution = col_reader->ReadValueBatch( + scratch_batch_->mem_pool(), scratch_capacity, tuple_byte_size_, + scratch_batch_->tuple_mem, &scratch_batch_->num_tuples); + } else { + continue_execution = col_reader->ReadNonRepeatedValueBatch( + scratch_batch_->mem_pool(), scratch_capacity, tuple_byte_size_, + scratch_batch_->tuple_mem, &scratch_batch_->num_tuples); + } + if (UNLIKELY(!continue_execution)) { + *skip_row_group = true; + return Status::OK(); + } + // Check that all column readers populated the same number of values. + if (c != 0) DCHECK_EQ(last_num_tuples, scratch_batch_->num_tuples); + last_num_tuples = scratch_batch_->num_tuples; + } + row_group_rows_read_ += scratch_batch_->num_tuples; + COUNTER_ADD(scan_node_->rows_read_counter(), scratch_batch_->num_tuples); + + int num_row_to_commit = TransferScratchTuples(row_batch); + RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit)); + if (row_batch->AtCapacity()) return Status::OK(); + } + + return Status::OK(); +} + +Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, int num_rows) { + DCHECK(dst_batch != NULL); + dst_batch->CommitRows(num_rows); + + // We need to pass the row batch to the scan node if there is too much memory attached, + // which can happen if the query is very selective. We need to release memory even + // if no rows passed predicates. + if (dst_batch->AtCapacity() || context_->num_completed_io_buffers() > 0) { + context_->ReleaseCompletedResources(dst_batch, /* done */ false); + } + if (context_->cancelled()) return Status::CANCELLED; + // TODO: It's a really bad idea to propagate UDF error via the global RuntimeState. + // Store UDF error in thread local storage or make UDF return status so it can merge + // with parse_status_. + RETURN_IF_ERROR(state_->GetQueryStatus()); + // Free local expr allocations for this thread + for (const auto& kv: scanner_conjuncts_map_) { + ExprContext::FreeLocalAllocations(kv.second); } return Status::OK(); } -int HdfsParquetScanner::TransferScratchTuples() { +int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) { // This function must not be called when the output batch is already full. As long as // we always call CommitRows() after TransferScratchTuples(), the output batch can // never be empty. - DCHECK_LT(batch_->num_rows(), batch_->capacity()); + DCHECK_LT(dst_batch->num_rows(), dst_batch->capacity()); const bool has_filters = !filter_ctxs_.empty(); const bool has_conjuncts = !scanner_conjunct_ctxs_->empty(); @@ -378,10 +542,11 @@ int HdfsParquetScanner::TransferScratchTuples() { // Start/end/current iterators over the output rows. DCHECK_EQ(scan_node_->tuple_idx(), 0); - DCHECK_EQ(batch_->row_desc().tuple_descriptors().size(), 1); + DCHECK_EQ(dst_batch->row_desc().tuple_descriptors().size(), 1); Tuple** output_row_start = - reinterpret_cast<Tuple**>(batch_->GetRow(batch_->num_rows())); - Tuple** output_row_end = output_row_start + (batch_->capacity() - batch_->num_rows()); + reinterpret_cast<Tuple**>(dst_batch->GetRow(dst_batch->num_rows())); + Tuple** output_row_end = + output_row_start + (dst_batch->capacity() - dst_batch->num_rows()); Tuple** output_row = output_row_start; // Start/end/current iterators over the scratch tuples. @@ -397,7 +562,7 @@ int HdfsParquetScanner::TransferScratchTuples() { DCHECK(!has_filters); DCHECK(!has_conjuncts); DCHECK_EQ(scratch_batch_->mem_pool()->total_allocated_bytes(), 0); - int num_tuples = min(batch_->capacity() - batch_->num_rows(), + int num_tuples = min(dst_batch->capacity() - dst_batch->num_rows(), scratch_batch_->num_tuples - scratch_batch_->tuple_idx); memset(output_row, 0, num_tuples * sizeof(Tuple*)); scratch_batch_->tuple_idx += num_tuples; @@ -430,7 +595,7 @@ int HdfsParquetScanner::TransferScratchTuples() { // quickly accumulate memory in the output batch, hit the memory capacity limit, // and return an output batch with relatively few rows. if (scratch_batch_->AtEnd()) { - batch_->tuple_data_pool()->AcquireData(scratch_batch_->mem_pool(), false); + dst_batch->tuple_data_pool()->AcquireData(scratch_batch_->mem_pool(), false); } return output_row - output_row_start; } @@ -465,90 +630,6 @@ bool HdfsParquetScanner::EvalRuntimeFilters(TupleRow* row) { return true; } -/// High-level steps of this function: -/// 1. Allocate 'scratch' memory for tuples able to hold a full batch -/// 2. Populate the slots of all scratch tuples one column reader at a time, -/// using the ColumnReader::Read*ValueBatch() functions. -/// 3. Evaluate runtime filters and conjuncts against the scratch tuples and -/// set the surviving tuples in the output batch. Transfer the ownership of -/// scratch memory to the output batch once the scratch memory is exhausted. -/// 4. Repeat steps above until we are done with the row group or an error -/// occurred. -/// TODO: Since the scratch batch is populated in a column-wise fashion, it is -/// difficult to maintain a maximum memory footprint without throwing away at least -/// some work. This point needs further experimentation and thought. -bool HdfsParquetScanner::AssembleRows( - const vector<ParquetColumnReader*>& column_readers, int row_group_idx, bool* filters_pass) { - DCHECK(!column_readers.empty()); - DCHECK(scratch_batch_ != NULL); - - int64_t rows_read = 0; - bool continue_execution = !scan_node_->ReachedLimit() && !context_->cancelled(); - while (!column_readers[0]->RowGroupAtEnd()) { - if (UNLIKELY(!continue_execution)) break; - - // Apply any runtime filters to static tuples containing the partition keys for this - // partition. If any filter fails, we return immediately and stop processing this - // row group. - if (!scan_node_->PartitionPassesFilterPredicates( - context_->partition_descriptor()->id(), - FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) { - *filters_pass = false; - return false; - } - - // Start a new scratch batch. - parse_status_.MergeStatus(scratch_batch_->Reset(state_)); - if (UNLIKELY(!parse_status_.ok())) return false; - int scratch_capacity = scratch_batch_->capacity(); - - // Initialize tuple memory. - for (int i = 0; i < scratch_capacity; ++i) { - InitTuple(template_tuple_, scratch_batch_->GetTuple(i)); - } - - // Materialize the top-level slots into the scratch batch column-by-column. - int last_num_tuples = -1; - int num_col_readers = column_readers.size(); - for (int c = 0; c < num_col_readers; ++c) { - ParquetColumnReader* col_reader = column_readers[c]; - if (col_reader->max_rep_level() > 0) { - continue_execution = col_reader->ReadValueBatch( - scratch_batch_->mem_pool(), scratch_capacity, tuple_byte_size_, - scratch_batch_->tuple_mem, &scratch_batch_->num_tuples); - } else { - continue_execution = col_reader->ReadNonRepeatedValueBatch( - scratch_batch_->mem_pool(), scratch_capacity, tuple_byte_size_, - scratch_batch_->tuple_mem, &scratch_batch_->num_tuples); - } - if (UNLIKELY(!continue_execution)) return false; - // Check that all column readers populated the same number of values. - if (c != 0) DCHECK_EQ(last_num_tuples, scratch_batch_->num_tuples); - last_num_tuples = scratch_batch_->num_tuples; - } - - // Keep transferring scratch tuples to output batches until the scratch batch - // is empty. CommitRows() creates new output batches as necessary. - do { - int num_row_to_commit = TransferScratchTuples(); - parse_status_.MergeStatus(CommitRows(num_row_to_commit)); - if (UNLIKELY(!parse_status_.ok())) return false; - } while (!scratch_batch_->AtEnd()); - - rows_read += scratch_batch_->num_tuples; - COUNTER_ADD(scan_node_->rows_read_counter(), scratch_batch_->num_tuples); - continue_execution &= parse_status_.ok(); - continue_execution &= !scan_node_->ReachedLimit() && !context_->cancelled(); - } - - if (column_readers[0]->RowGroupAtEnd() && parse_status_.ok()) { - parse_status_ = ValidateEndOfRowGroup(column_readers, row_group_idx, rows_read); - continue_execution &= parse_status_.ok(); - } - - return continue_execution; -} - bool HdfsParquetScanner::AssembleCollection( const vector<ParquetColumnReader*>& column_readers, int new_collection_rep_level, CollectionValueBuilder* coll_value_builder) { @@ -654,8 +735,7 @@ inline bool HdfsParquetScanner::ReadCollectionItem( return continue_execution; } -Status HdfsParquetScanner::ProcessFooter(bool* eosr) { - *eosr = false; +Status HdfsParquetScanner::ProcessFooter() { int64_t len = stream_->scan_range()->len(); // We're processing the scan range issued in IssueInitialRanges(). The scan range should @@ -769,33 +849,6 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) { if (file_metadata_.__isset.created_by) { file_version_ = ParquetFileVersion(file_metadata_.created_by); } - - if (scan_node_->IsZeroSlotTableScan()) { - // There are no materialized slots, e.g. count(*) over the table. We can serve - // this query from just the file metadata. We don't need to read the column data. - int64_t num_tuples = file_metadata_.num_rows; - COUNTER_ADD(scan_node_->rows_read_counter(), num_tuples); - - while (num_tuples > 0) { - MemPool* pool; - Tuple* tuple; - TupleRow* current_row; - int max_tuples = GetMemory(&pool, &tuple, ¤t_row); - max_tuples = min<int64_t>(max_tuples, num_tuples); - num_tuples -= max_tuples; - - int num_to_commit = WriteEmptyTuples(context_, current_row, max_tuples); - RETURN_IF_ERROR(CommitRows(num_to_commit)); - } - - *eosr = true; - return Status::OK(); - } else if (file_metadata_.num_rows == 0) { - // Empty file - *eosr = true; - return Status::OK(); - } - if (file_metadata_.row_groups.empty()) { return Status( Substitute("Invalid file. This file: $0 has no row groups", filename())); @@ -1075,7 +1128,7 @@ Status HdfsParquetScanner::ValidateEndOfRowGroup( // column has more values than stated in the metadata, meaning the final data page // will still have unread values. if (reader->num_buffered_values_ != 0) { - return Status(Substitute("Corrupt parquet metadata in file '$0': metadata reports " + return Status(Substitute("Corrupt Parquet metadata in file '$0': metadata reports " "'$1' more values in data page than actually present", filename(), reader->num_buffered_values_)); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-parquet-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h index 3791ae1..830701d 100644 --- a/be/src/exec/hdfs-parquet-scanner.h +++ b/be/src/exec/hdfs-parquet-scanner.h @@ -317,18 +317,19 @@ class BoolColumnReader; /// the ScannerContext. class HdfsParquetScanner : public HdfsScanner { public: - HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* state); - + HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* state, + bool add_batches_to_queue); virtual ~HdfsParquetScanner() {}; - virtual Status Prepare(ScannerContext* context); - virtual void Close(); - virtual Status ProcessSplit(); /// Issue just the footer range for each file. We'll then parse the footer and pick /// out the columns we want. static Status IssueInitialRanges(HdfsScanNode* scan_node, const std::vector<HdfsFileDesc*>& files); + virtual Status Open(ScannerContext* context); + virtual Status ProcessSplit(); + virtual void Close(RowBatch* row_batch); + /// The repetition level is set to this value to indicate the end of a row group. static const int16_t ROW_GROUP_END = numeric_limits<int16_t>::min(); /// Indicates an invalid definition or repetition level. @@ -347,6 +348,20 @@ class HdfsParquetScanner : public HdfsScanner { /// need to issue another read. static const int64_t FOOTER_SIZE = 1024 * 100; + /// Index of the current row group being processed. Initialized to -1 which indicates + /// that we have not started processing the first row group yet (GetNext() has not yet + /// been called). + int32_t row_group_idx_; + + /// Counts the number of rows processed for the current row group. + int64_t row_group_rows_read_; + + /// Indicates whether we should advance to the next row group in the next GetNext(). + /// Starts out as true to move to the very first row group. + bool advance_row_group_; + + boost::scoped_ptr<ParquetSchemaResolver> schema_resolver_; + /// Cached runtime filter contexts, one for each filter that applies to this column. vector<const FilterContext*> filter_ctxs_; @@ -411,32 +426,35 @@ class HdfsParquetScanner : public HdfsScanner { const char* filename() const { return metadata_range_->file(); } - /// Reads data using 'column_readers' to materialize top-level tuples. - /// - /// Returns true when the row group is complete and execution can be safely resumed. - /// Returns false if execution should be aborted due to: - /// - parse_error_ is set - /// - query is cancelled - /// - scan node limit was reached - /// - the scanned file can be skipped based on runtime filters - /// When false is returned the column_readers are left in an undefined state and - /// execution should be aborted immediately by the caller. - /// - /// 'row_group_idx' is used for calling into ValidateEndOfRowGroup() when the end - /// of the row group is reached. - /// - /// If 'filters_pass' is set to false by this method, the partition columns associated - /// with this row group did not pass all the runtime filters (and therefore only filter - /// contexts that apply only to partition columns are checked). - bool AssembleRows(const std::vector<ParquetColumnReader*>& column_readers, - int row_group_idx, bool* filters_pass); + virtual Status GetNextInternal(RowBatch* row_batch, bool* eos); + + /// Advances 'row_group_idx_' to the next non-empty row group and initializes + /// the column readers to scan it. Recoverable errors are logged to the runtime + /// state. Only returns a non-OK status if a non-recoverable error is encountered + /// (or abort_on_error is true). If OK is returned, 'parse_status_' is guaranteed + /// to be OK as well. + Status NextRowGroup(); + + /// Reads data using 'column_readers' to materialize top-level tuples into 'row_batch'. + /// Returns a non-OK status if a non-recoverable error was encountered and execution + /// of this query should be terminated immediately. + /// May set *skip_row_group to indicate that the current row group should be skipped, + /// e.g., due to a parse error, but execution should continue. + Status AssembleRows(const std::vector<ParquetColumnReader*>& column_readers, + RowBatch* row_batch, bool* skip_row_group); + + /// Commit num_rows to the given row batch. + /// Returns OK if the query is not cancelled and hasn't exceeded any mem limits. + /// Scanner can call this with 0 rows to flush any pending resources (attached pools + /// and io buffers) to minimize memory consumption. + Status CommitRows(RowBatch* dst_batch, int num_rows); /// Evaluates runtime filters and conjuncts (if any) against the tuples in - /// 'scratch_batch_', and adds the surviving tuples to the output batch. - /// Transfers the ownership of tuple memory to the output batch when the + /// 'scratch_batch_', and adds the surviving tuples to the given batch. + /// Transfers the ownership of tuple memory to the target batch when the /// scratch batch is exhausted. - /// Returns the number of rows that should be committed to the output batch. - int TransferScratchTuples(); + /// Returns the number of rows that should be committed to the given batch. + int TransferScratchTuples(RowBatch* dst_batch); /// Evaluates runtime filters (if any) against the given row. Returns true if /// they passed, false otherwise. Maintains the runtime filter stats, determines @@ -474,8 +492,7 @@ class HdfsParquetScanner : public HdfsScanner { /// Process the file footer and parse file_metadata_. This should be called with the /// last FOOTER_SIZE bytes in context_. - /// *eosr is a return value. If true, the scan range is complete (e.g. select count(*)) - Status ProcessFooter(bool* eosr); + Status ProcessFooter(); /// Populates 'column_readers' for the slots in 'tuple_desc', including creating child /// readers for any collections. Schema resolution is handled in this function as @@ -516,6 +533,11 @@ class HdfsParquetScanner : public HdfsScanner { /// Part of the HdfsScanner interface, not used in Parquet. Status InitNewRange() { return Status::OK(); }; + + /// Transfers the remaining resources backing tuples such as IO buffers and memory + /// from mem pools to the given row batch. Closes all column readers. + /// Should be called after completing a row group and when returning the last batch. + void FlushRowGroupResources(RowBatch* row_batch); }; } // namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-rcfile-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc index 3914845..65fe502 100644 --- a/be/src/exec/hdfs-rcfile-scanner.cc +++ b/be/src/exec/hdfs-rcfile-scanner.cc @@ -50,15 +50,16 @@ const uint8_t HdfsRCFileScanner::RCFILE_VERSION_HEADER[4] = {'R', 'C', 'F', 1}; // Macro to convert between SerdeUtil errors to Status returns. #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_ -HdfsRCFileScanner::HdfsRCFileScanner(HdfsScanNode* scan_node, RuntimeState* state) - : BaseSequenceScanner(scan_node, state) { +HdfsRCFileScanner::HdfsRCFileScanner(HdfsScanNode* scan_node, RuntimeState* state, + bool add_batches_to_queue) + : BaseSequenceScanner(scan_node, state, add_batches_to_queue) { } HdfsRCFileScanner::~HdfsRCFileScanner() { } -Status HdfsRCFileScanner::Prepare(ScannerContext* context) { - RETURN_IF_ERROR(BaseSequenceScanner::Prepare(context)); +Status HdfsRCFileScanner::Open(ScannerContext* context) { + RETURN_IF_ERROR(BaseSequenceScanner::Open(context)); text_converter_.reset( new TextConverter(0, scan_node_->hdfs_table()->null_column_value())); scan_node_->IncNumScannersCodegenDisabled(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-rcfile-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-rcfile-scanner.h b/be/src/exec/hdfs-rcfile-scanner.h index b86f797..314c870 100644 --- a/be/src/exec/hdfs-rcfile-scanner.h +++ b/be/src/exec/hdfs-rcfile-scanner.h @@ -230,10 +230,11 @@ class Tuple; /// A scanner for reading RCFiles into tuples. class HdfsRCFileScanner : public BaseSequenceScanner { public: - HdfsRCFileScanner(HdfsScanNode* scan_node, RuntimeState* state); + HdfsRCFileScanner(HdfsScanNode* scan_node, RuntimeState* state, + bool add_batches_to_queue); virtual ~HdfsRCFileScanner(); - virtual Status Prepare(ScannerContext* context); + virtual Status Open(ScannerContext* context); void DebugString(int indentation_level, std::stringstream* out) const; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index 9a89ac3..7904071 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -64,8 +64,6 @@ DEFINE_bool(suppress_unknown_disk_id_warnings, false, " provide volume/disk information."); DEFINE_int32(runtime_filter_wait_time_ms, 1000, "(Advanced) the maximum time, in ms, " "that a scan node will wait for expected runtime filters to arrive."); -DECLARE_string(cgroup_hierarchy_path); -DECLARE_bool(enable_rm); #ifndef NDEBUG DECLARE_bool(skip_file_runtime_filtering); @@ -194,7 +192,7 @@ bool HdfsScanNode::FilePassesFilterPredicates(const vector<FilterContext>& filte if (filter_ctxs_.size() == 0) return true; ScanRangeMetadata* metadata = reinterpret_cast<ScanRangeMetadata*>(file->splits[0]->meta_data()); - if (!PartitionPassesFilterPredicates(metadata->partition_id, FilterStats::FILES_KEY, + if (!PartitionPassesFilters(metadata->partition_id, FilterStats::FILES_KEY, filter_ctxs)) { for (int j = 0; j < file->splits.size(); ++j) { // Mark range as complete to ensure progress. @@ -372,7 +370,7 @@ void* HdfsScanNode::GetCodegenFn(THdfsFileFormat::type type) { return it->second; } -Status HdfsScanNode::CreateAndPrepareScanner(HdfsPartitionDescriptor* partition, +Status HdfsScanNode::CreateAndOpenScanner(HdfsPartitionDescriptor* partition, ScannerContext* context, scoped_ptr<HdfsScanner>* scanner) { DCHECK(context != NULL); THdfsCompression::type compression = @@ -386,20 +384,20 @@ Status HdfsScanNode::CreateAndPrepareScanner(HdfsPartitionDescriptor* partition, if (compression == THdfsCompression::LZO) { scanner->reset(HdfsLzoTextScanner::GetHdfsLzoTextScanner(this, runtime_state_)); } else { - scanner->reset(new HdfsTextScanner(this, runtime_state_)); + scanner->reset(new HdfsTextScanner(this, runtime_state_, true)); } break; case THdfsFileFormat::SEQUENCE_FILE: - scanner->reset(new HdfsSequenceScanner(this, runtime_state_)); + scanner->reset(new HdfsSequenceScanner(this, runtime_state_, true)); break; case THdfsFileFormat::RC_FILE: - scanner->reset(new HdfsRCFileScanner(this, runtime_state_)); + scanner->reset(new HdfsRCFileScanner(this, runtime_state_, true)); break; case THdfsFileFormat::AVRO: - scanner->reset(new HdfsAvroScanner(this, runtime_state_)); + scanner->reset(new HdfsAvroScanner(this, runtime_state_, true)); break; case THdfsFileFormat::PARQUET: - scanner->reset(new HdfsParquetScanner(this, runtime_state_)); + scanner->reset(new HdfsParquetScanner(this, runtime_state_, true)); break; default: return Status(Substitute("Unknown Hdfs file format type: $0", @@ -408,8 +406,8 @@ Status HdfsScanNode::CreateAndPrepareScanner(HdfsPartitionDescriptor* partition, DCHECK(scanner->get() != NULL); Status status = ExecDebugAction(TExecNodePhase::PREPARE_SCANNER, runtime_state_); if (status.ok()) { - status = scanner->get()->Prepare(context); - if (!status.ok()) scanner->get()->Close(); + status = scanner->get()->Open(context); + if (!status.ok()) scanner->get()->Close(scanner->get()->batch()); } else { context->ClearStreams(); } @@ -1140,7 +1138,7 @@ void HdfsScanNode::ScannerThread() { runtime_state_->resource_pool()->ReleaseThreadToken(false); } -bool HdfsScanNode::PartitionPassesFilterPredicates(int32_t partition_id, +bool HdfsScanNode::PartitionPassesFilters(int32_t partition_id, const string& stats_name, const vector<FilterContext>& filter_ctxs) { if (filter_ctxs.size() == 0) return true; DCHECK_EQ(filter_ctxs.size(), filter_ctxs_.size()) @@ -1198,8 +1196,7 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs, // done. See FilePassesFilterPredicates() for the correct logic to mark all splits in a // file as done; the correct fix here is to do that for every file in a thread-safe way. if (!FileFormatIsSequenceBased(partition->file_format())) { - if (!PartitionPassesFilterPredicates(partition_id, FilterStats::SPLITS_KEY, - filter_ctxs)) { + if (!PartitionPassesFilters(partition_id, FilterStats::SPLITS_KEY, filter_ctxs)) { // Avoid leaking unread buffers in scan_range. scan_range->Cancel(Status::CANCELLED); // Mark scan range as done. @@ -1211,7 +1208,7 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs, ScannerContext context(runtime_state_, this, partition, scan_range, filter_ctxs); scoped_ptr<HdfsScanner> scanner; - Status status = CreateAndPrepareScanner(partition, &context, &scanner); + Status status = CreateAndOpenScanner(partition, &context, &scanner); if (!status.ok()) { // If preparation fails, avoid leaking unread buffers in the scan_range. scan_range->Cancel(status); @@ -1243,7 +1240,9 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs, VLOG_QUERY << ss.str(); } - scanner->Close(); + // Transfer the remaining resources to the final row batch (if any) and add it to + // the row batch queue. + scanner->Close(scanner->batch()); return status; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-scan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h index c611a45..13b07bd 100644 --- a/be/src/exec/hdfs-scan-node.h +++ b/be/src/exec/hdfs-scan-node.h @@ -152,7 +152,7 @@ class HdfsScanNode : public ScanNode { /// Returns number of partition key slots. int num_materialized_partition_keys() const { return partition_key_slots_.size(); } - const TupleDescriptor* tuple_desc() { return tuple_desc_; } + const TupleDescriptor* tuple_desc() const { return tuple_desc_; } const HdfsTableDescriptor* hdfs_table() { return hdfs_table_; } @@ -262,7 +262,9 @@ class HdfsScanNode : public ScanNode { /// Called by the scanner when a range is complete. Used to trigger done_ and /// to log progress. This *must* only be called after the scanner has completely - /// finished the scan range (i.e. context->Flush()). + /// finished the scan range (i.e. context->Flush()), and has added the final + /// row batch to the row batch queue. Otherwise the last batch may be + /// lost due to racing with shutting down the row batch queue. void RangeComplete(const THdfsFileFormat::type& file_type, const THdfsCompression::type& compression_type); /// Same as above except for when multiple compression codecs were used @@ -279,7 +281,7 @@ class HdfsScanNode : public ScanNode { void ComputeSlotMaterializationOrder(std::vector<int>* order) const; /// Returns true if there are no materialized slots, such as a count(*) over the table. - inline bool IsZeroSlotTableScan() { + inline bool IsZeroSlotTableScan() const { return materialized_slots().empty() && tuple_desc()->tuple_path().empty(); } @@ -305,12 +307,12 @@ class HdfsScanNode : public ScanNode { /// /// 'filter_ctxs' is either an empty list, in which case filtering is disabled and the /// function returns true, or a set of filter contexts to evaluate. - bool PartitionPassesFilterPredicates(int32_t partition_id, - const std::string& stats_name, const std::vector<FilterContext>& filter_ctxs); + bool PartitionPassesFilters(int32_t partition_id, const std::string& stats_name, + const std::vector<FilterContext>& filter_ctxs); const std::vector<FilterContext> filter_ctxs() const { return filter_ctxs_; } - private: + protected: friend class ScannerContext; RuntimeState* runtime_state_; @@ -503,15 +505,13 @@ class HdfsScanNode : public ScanNode { /// -1 if no callback is registered. int32_t rm_callback_id_; - /// Called when scanner threads are available for this scan node. This will - /// try to spin up as many scanner threads as the quota allows. - /// This is also called whenever a new range is added to the IoMgr to 'pull' - /// thread tokens if they are available. + /// Tries to spin up as many scanner threads as the quota allows. Called explicitly + /// (e.g., when adding new ranges) or when threads are available for this scan node. void ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool); - /// Create and prepare new scanner for this partition type. + /// Create and open new scanner for this partition type. /// If the scanner is successfully created, it is returned in 'scanner'. - Status CreateAndPrepareScanner(HdfsPartitionDescriptor* partition, + Status CreateAndOpenScanner(HdfsPartitionDescriptor* partition, ScannerContext* context, boost::scoped_ptr<HdfsScanner>* scanner); /// Main function for scanner thread. This thread pulls the next range to be http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-scanner-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner-ir.cc b/be/src/exec/hdfs-scanner-ir.cc index 26763e9..0c5d6f8 100644 --- a/be/src/exec/hdfs-scanner-ir.cc +++ b/be/src/exec/hdfs-scanner-ir.cc @@ -34,6 +34,7 @@ int HdfsScanner::WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row, int row_ FieldLocation* fields, int num_tuples, int max_added_tuples, int slots_per_tuple, int row_idx_start) { + DCHECK(add_batches_to_queue_); DCHECK(tuple_ != NULL); uint8_t* tuple_row_mem = reinterpret_cast<uint8_t*>(tuple_row); uint8_t* tuple_mem = reinterpret_cast<uint8_t*>(tuple_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc index ea101c0..c02ad18 100644 --- a/be/src/exec/hdfs-scanner.cc +++ b/be/src/exec/hdfs-scanner.cc @@ -53,9 +53,11 @@ using namespace strings; const char* FieldLocation::LLVM_CLASS_NAME = "struct.impala::FieldLocation"; const char* HdfsScanner::LLVM_CLASS_NAME = "class.impala::HdfsScanner"; -HdfsScanner::HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state) +HdfsScanner::HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state, + bool add_batches_to_queue) : scan_node_(scan_node), state_(state), + add_batches_to_queue_(add_batches_to_queue), context_(NULL), stream_(NULL), scanner_conjunct_ctxs_(NULL), @@ -75,6 +77,7 @@ HdfsScanner::HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state) HdfsScanner::HdfsScanner() : scan_node_(NULL), state_(NULL), + add_batches_to_queue_(true), context_(NULL), stream_(NULL), scanner_conjunct_ctxs_(NULL), @@ -93,10 +96,9 @@ HdfsScanner::HdfsScanner() } HdfsScanner::~HdfsScanner() { - DCHECK(batch_ == NULL); } -Status HdfsScanner::Prepare(ScannerContext* context) { +Status HdfsScanner::Open(ScannerContext* context) { context_ = context; stream_ = context->GetStream(); @@ -116,13 +118,11 @@ Status HdfsScanner::Prepare(ScannerContext* context) { state_, context_->partition_descriptor()->partition_key_value_ctxs()); template_tuple_map_[scan_node_->tuple_desc()] = template_tuple_; - // Allocate a new row batch. May fail if mem limit is exceeded. - RETURN_IF_ERROR(StartNewRowBatch()); decompress_timer_ = ADD_TIMER(scan_node_->runtime_profile(), "DecompressionTime"); return Status::OK(); } -void HdfsScanner::Close() { +void HdfsScanner::Close(RowBatch* row_batch) { if (decompressor_.get() != NULL) decompressor_->Close(); HdfsScanNode::ConjunctsMap::const_iterator iter = scanner_conjuncts_map_.begin(); for (; iter != scanner_conjuncts_map_.end(); ++iter) { @@ -155,6 +155,7 @@ Status HdfsScanner::InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition, } Status HdfsScanner::StartNewRowBatch() { + DCHECK(add_batches_to_queue_); batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(), scan_node_->mem_tracker()); int64_t tuple_buffer_size; @@ -164,6 +165,7 @@ Status HdfsScanner::StartNewRowBatch() { } int HdfsScanner::GetMemory(MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem) { + DCHECK(add_batches_to_queue_); DCHECK(batch_ != NULL); DCHECK_GT(batch_->capacity(), batch_->num_rows()); *pool = batch_->tuple_data_pool(); @@ -185,6 +187,7 @@ Status HdfsScanner::GetCollectionMemory(CollectionValueBuilder* builder, MemPool // TODO(skye): have this check scan_node_->ReachedLimit() and get rid of manual check? Status HdfsScanner::CommitRows(int num_rows) { + DCHECK(add_batches_to_queue_); DCHECK(batch_ != NULL); DCHECK_LE(num_rows, batch_->capacity() - batch_->num_rows()); batch_->CommitRows(num_rows); @@ -209,13 +212,6 @@ Status HdfsScanner::CommitRows(int num_rows) { return Status::OK(); } -void HdfsScanner::AddFinalRowBatch() { - DCHECK(batch_ != NULL); - context_->ReleaseCompletedResources(batch_, /* done */ true); - scan_node_->AddMaterializedRowBatch(batch_); - batch_ = NULL; -} - // In this code path, no slots were materialized from the input files. The only // slots are from partition keys. This lets us simplify writing out the batches. // 1. template_tuple_ is the complete tuple. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h index 7a723b8..de39d6b 100644 --- a/be/src/exec/hdfs-scanner.h +++ b/be/src/exec/hdfs-scanner.h @@ -69,10 +69,16 @@ struct FieldLocation { /// HdfsScanner is the superclass for different hdfs file format parsers. There is /// an instance of the scanner object created for each split, each driven by a different /// thread created by the scan node. The scan node calls: -/// 1. Prepare -/// 2. ProcessSplit -/// 3. Close -/// ProcessSplit does not return until the split is complete (or an error) occurred. +/// 1. Open() +/// 2. ProcessSplit() or GetNext()* +/// 3. Close() +/// The scanner can be used in either of two modes, indicated via the add_batches_to_queue +/// c'tor parameter. +/// ProcessSplit() scans the split and adds materialized row batches to the scan node's +/// row batch queue until the split is complete or an error occurred. +/// GetNext() provides an iterator-like interface where the caller can request +/// the next materialized row batch until the split has been fully processed (eos). +/// /// The HdfsScanner works in tandem with the ScannerContext to interleave IO /// and parsing. // @@ -84,9 +90,9 @@ struct FieldLocation { /// 1. During the Prepare() phase of the ScanNode, the scanner subclass's static /// Codegen() function will be called to perform codegen for that scanner type /// for the specific tuple desc. This codegen'd function is cached in the HdfsScanNode. -/// 2. During the GetNext() phase (where we create one Scanner for each scan range), -/// the created scanner subclass can retrieve, from the scan node, the codegen'd -/// function to use. +/// 2. During the GetNext() phase of the scan node (where we create one Scanner for each +/// scan range), the created scanner subclass can retrieve, from the scan node, +/// the codegen'd function to use. /// This way, we only codegen once per scanner type, rather than once per scanner object. // /// This class also encapsulates row batch management. Subclasses should call CommitRows() @@ -100,21 +106,45 @@ class HdfsScanner { /// This probably ought to be a derived number from the environment. const static int FILE_BLOCK_SIZE = 4096; - HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state); + /// If 'add_batches_to_queue' is true the caller must call ProcessSplit() and not + /// GetNext(). Row batches will be added to the scan node's row batch queue, including + /// the final one in Close(). + HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state, bool add_batches_to_queue); virtual ~HdfsScanner(); /// One-time initialisation of state that is constant across scan ranges. - virtual Status Prepare(ScannerContext* context); + virtual Status Open(ScannerContext* context); + + /// Returns the next row batch from this scanner's split. + /// Recoverable errors are logged to the runtime state. Only returns a non-OK status + /// if a non-recoverable error is encountered (or abort_on_error is true). If OK is + /// returned, 'parse_status_' is guaranteed to be OK as well. + /// The memory referenced by the tuples is valid until this or any subsequently + /// returned batch is reset or destroyed. + /// Only valid to call if 'add_batches_to_queue_' is false. + Status GetNext(RowBatch* row_batch, bool* eos) { + DCHECK(!add_batches_to_queue_); + return GetNextInternal(row_batch, eos); + } /// Process an entire split, reading bytes from the context's streams. Context is /// initialized with the split data (e.g. template tuple, partition descriptor, etc). /// This function should only return on error or end of scan range. + /// Only valid to call if 'add_batches_to_queue_' is true. virtual Status ProcessSplit() = 0; - /// Release all resources the scanner has allocated. This is the last chance for the - /// scanner to attach any resources to the ScannerContext object. - virtual void Close(); + /// Transfers the ownership of memory backing returned tuples such as IO buffers + /// and memory in mem pools to the given row batch. If the row batch is NULL, + /// those resources are released instead. In any case, releases all other resources + /// that are not backing returned rows (e.g. temporary decompression buffers). + virtual void Close(RowBatch* row_batch); + + /// Only valid to call if 'add_batches_to_queue_' is true. + RowBatch* batch() const { + DCHECK(add_batches_to_queue_); + return batch_; + } /// Scanner subclasses must implement these static functions as well. Unfortunately, /// c++ does not allow static virtual functions. @@ -149,6 +179,11 @@ class HdfsScanner { /// RuntimeState for error reporting RuntimeState* state_; + /// True if the creator of this scanner intends to use ProcessSplit() and not GetNext). + /// Row batches will be added to the scan node's row batch queue, including the final + /// one in Close(). + const bool add_batches_to_queue_; + /// Context for this scanner ScannerContext* context_; @@ -230,6 +265,13 @@ class HdfsScanner { /// Jitted write tuples function pointer. Null if codegen is disabled. WriteTuplesFn write_tuples_fn_; + /// Implements GetNext(). Should be overridden by subclasses. + /// May be called even if 'add_batches_to_queue_' is true. + virtual Status GetNextInternal(RowBatch* row_batch, bool* eos) { + DCHECK(false) << "GetNextInternal() not implemented for this scanner type."; + return Status::OK(); + } + /// Initializes write_tuples_fn_ to the jitted function if codegen is possible. /// - partition - partition descriptor for this scanner/scan range /// - type - type for this scanner @@ -238,6 +280,7 @@ class HdfsScanner { THdfsFileFormat::type type, const std::string& scanner_name); /// Set batch_ to a new row batch and update tuple_mem_ accordingly. + /// Only valid to call if 'add_batches_to_queue_' is true. Status StartNewRowBatch(); /// Reset internal state for a new scan range. @@ -251,6 +294,7 @@ class HdfsScanner { /// current row batch is complete and a new one is allocated). /// Memory returned from this call is invalidated after calling CommitRows(). /// Callers must call GetMemory() again after calling this function. + /// Only valid to call if 'add_batches_to_queue_' is true. int GetMemory(MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem); /// Gets memory for outputting tuples into the CollectionValue being constructed via @@ -269,18 +313,15 @@ class HdfsScanner { /// Returns Status::OK if the query is not cancelled and hasn't exceeded any mem limits. /// Scanner can call this with 0 rows to flush any pending resources (attached pools /// and io buffers) to minimize memory consumption. + /// Only valid to call if 'add_batches_to_queue_' is true. Status CommitRows(int num_rows); - /// Attach all remaining resources from context_ to batch_ and send batch_ to the scan - /// node. This must be called after all rows have been committed and no further - /// resources are needed from context_ (in practice this will happen in each scanner - /// subclass's Close() implementation). - void AddFinalRowBatch(); - /// Release all memory in 'pool' to batch_. If commit_batch is true, the row batch /// will be committed. commit_batch should be true if the attached pool is expected /// to be non-trivial (i.e. a decompression buffer) to minimize scanner mem usage. + /// Only valid to call if 'add_batches_to_queue_' is true. void AttachPool(MemPool* pool, bool commit_batch) { + DCHECK(add_batches_to_queue_); DCHECK(batch_ != NULL); DCHECK(pool != NULL); batch_->tuple_data_pool()->AcquireData(pool, false); @@ -315,6 +356,7 @@ class HdfsScanner { /// Returns the number of tuples added to the row batch. This can be less than /// num_tuples/tuples_till_limit because of failed conjuncts. /// Returns -1 if parsing should be aborted due to parse errors. + /// Only valid to call if 'add_batches_to_queue_' is true. int WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row_mem, int row_size, FieldLocation* fields, int num_tuples, int max_added_tuples, int slots_per_tuple, int row_start_indx); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc index 0cd000f..025d4a4 100644 --- a/be/src/exec/hdfs-sequence-scanner.cc +++ b/be/src/exec/hdfs-sequence-scanner.cc @@ -38,8 +38,9 @@ const uint8_t HdfsSequenceScanner::SEQFILE_VERSION_HEADER[4] = {'S', 'E', 'Q', 6 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_ -HdfsSequenceScanner::HdfsSequenceScanner(HdfsScanNode* scan_node, RuntimeState* state) - : BaseSequenceScanner(scan_node, state), +HdfsSequenceScanner::HdfsSequenceScanner(HdfsScanNode* scan_node, RuntimeState* state, + bool add_batches_to_queue) + : BaseSequenceScanner(scan_node, state, add_batches_to_queue), unparsed_data_buffer_(NULL), num_buffered_records_in_compressed_block_(0) { } @@ -86,8 +87,8 @@ Status HdfsSequenceScanner::InitNewRange() { return Status::OK(); } -Status HdfsSequenceScanner::Prepare(ScannerContext* context) { - RETURN_IF_ERROR(BaseSequenceScanner::Prepare(context)); +Status HdfsSequenceScanner::Open(ScannerContext* context) { + RETURN_IF_ERROR(BaseSequenceScanner::Open(context)); // Allocate the scratch space for two pass parsing. The most fields we can go // through in one parse pass is the batch size (tuples) * the number of fields per tuple http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-sequence-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h index ae7f3bb..5b3da92 100644 --- a/be/src/exec/hdfs-sequence-scanner.h +++ b/be/src/exec/hdfs-sequence-scanner.h @@ -159,12 +159,13 @@ class HdfsSequenceScanner : public BaseSequenceScanner { /// SeqFile file: {'S', 'E', 'Q', 6} static const uint8_t SEQFILE_VERSION_HEADER[4]; - HdfsSequenceScanner(HdfsScanNode* scan_node, RuntimeState* state); + HdfsSequenceScanner(HdfsScanNode* scan_node, RuntimeState* state, + bool add_batches_to_queue); virtual ~HdfsSequenceScanner(); - + /// Implementation of HdfsScanner interface. - virtual Status Prepare(ScannerContext* context); + virtual Status Open(ScannerContext* context); /// Codegen writing tuples and evaluating predicates. static llvm::Function* Codegen(HdfsScanNode*, @@ -176,9 +177,9 @@ class HdfsSequenceScanner : public BaseSequenceScanner { virtual Status ReadFileHeader(); virtual Status InitNewRange(); virtual Status ProcessRange(); - - virtual THdfsFileFormat::type file_format() const { - return THdfsFileFormat::SEQUENCE_FILE; + + virtual THdfsFileFormat::type file_format() const { + return THdfsFileFormat::SEQUENCE_FILE; } private: @@ -213,11 +214,11 @@ class HdfsSequenceScanner : public BaseSequenceScanner { /// record_ptr: ponter to the record. /// record_len: length of the record Status GetRecord(uint8_t** record_ptr, int64_t *record_len); - + /// Appends the current file and line to the RuntimeState's error log. /// row_idx is 0-based (in current batch) where the parse error occurred. virtual void LogRowParseError(int row_idx, std::stringstream*); - + /// Helper class for picking fields and rows from delimited text. boost::scoped_ptr<DelimitedTextParser> delimited_text_parser_; std::vector<FieldLocation> field_locations_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-text-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index 5d80f06..ec7af87 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -47,8 +47,9 @@ const string HdfsTextScanner::LZO_INDEX_SUFFIX = ".index"; // progress. const int64_t COMPRESSED_DATA_FIXED_READ_SIZE = 1 * 1024 * 1024; -HdfsTextScanner::HdfsTextScanner(HdfsScanNode* scan_node, RuntimeState* state) - : HdfsScanner(scan_node, state), +HdfsTextScanner::HdfsTextScanner(HdfsScanNode* scan_node, RuntimeState* state, + bool add_batches_to_queue) + : HdfsScanner(scan_node, state, add_batches_to_queue), byte_buffer_ptr_(NULL), byte_buffer_end_(NULL), byte_buffer_read_size_(0), @@ -145,6 +146,8 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNode* scan_node, } Status HdfsTextScanner::ProcessSplit() { + DCHECK(add_batches_to_queue_); + // Reset state for new scan range RETURN_IF_ERROR(InitNewRange()); @@ -173,28 +176,28 @@ Status HdfsTextScanner::ProcessSplit() { return Status::OK(); } -void HdfsTextScanner::Close() { +void HdfsTextScanner::Close(RowBatch* row_batch) { // Need to close the decompressor before releasing the resources at AddFinalRowBatch(), // because in some cases there is memory allocated in decompressor_'s temp_memory_pool_. if (decompressor_.get() != NULL) { decompressor_->Close(); decompressor_.reset(NULL); } - if (batch_ != NULL) { - AttachPool(data_buffer_pool_.get(), false); - AttachPool(boundary_pool_.get(), false); - AddFinalRowBatch(); + if (row_batch != NULL) { + row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false); + row_batch->tuple_data_pool()->AcquireData(boundary_pool_.get(), false); + context_->ReleaseCompletedResources(row_batch, true); + if (add_batches_to_queue_) scan_node_->AddMaterializedRowBatch(row_batch); } // Verify all resources (if any) have been transferred. DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0); DCHECK_EQ(boundary_pool_.get()->total_allocated_bytes(), 0); DCHECK_EQ(context_->num_completed_io_buffers(), 0); - // Must happen after AddFinalRowBatch() is called. if (!only_parsing_header_) { scan_node_->RangeComplete(THdfsFileFormat::TEXT, stream_->file_desc()->file_compression); } - HdfsScanner::Close(); + HdfsScanner::Close(row_batch); } Status HdfsTextScanner::InitNewRange() { @@ -692,8 +695,8 @@ Function* HdfsTextScanner::Codegen(HdfsScanNode* node, return CodegenWriteAlignedTuples(node, codegen, write_complete_tuple_fn); } -Status HdfsTextScanner::Prepare(ScannerContext* context) { - RETURN_IF_ERROR(HdfsScanner::Prepare(context)); +Status HdfsTextScanner::Open(ScannerContext* context) { + RETURN_IF_ERROR(HdfsScanner::Open(context)); parse_delimiter_timer_ = ADD_CHILD_TIMER(scan_node_->runtime_profile(), "DelimiterParseTime", ScanNode::SCANNER_THREAD_TOTAL_WALLCLOCK_TIME); @@ -704,6 +707,8 @@ Status HdfsTextScanner::Prepare(ScannerContext* context) { field_locations_.resize(state_->batch_size() * scan_node_->materialized_slots().size()); row_end_locations_.resize(state_->batch_size()); + // Allocate a new row batch. May fail if mem limit is exceeded. + RETURN_IF_ERROR(StartNewRowBatch()); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/hdfs-text-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h index e8dbe08..74784b8 100644 --- a/be/src/exec/hdfs-text-scanner.h +++ b/be/src/exec/hdfs-text-scanner.h @@ -44,13 +44,14 @@ struct HdfsFileDesc; /// scanner for the tuple directly after it. class HdfsTextScanner : public HdfsScanner { public: - HdfsTextScanner(HdfsScanNode* scan_node, RuntimeState* state); + HdfsTextScanner(HdfsScanNode* scan_node, RuntimeState* state, + bool add_batches_to_queue); virtual ~HdfsTextScanner(); /// Implementation of HdfsScanner interface. - virtual Status Prepare(ScannerContext* context); + virtual Status Open(ScannerContext* context); virtual Status ProcessSplit(); - virtual void Close(); + virtual void Close(RowBatch* row_batch); /// Issue io manager byte ranges for 'files'. static Status IssueInitialRanges(HdfsScanNode* scan_node, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/83436561/be/src/exec/parquet-column-readers.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h index 3c26084..d308091 100644 --- a/be/src/exec/parquet-column-readers.h +++ b/be/src/exec/parquet-column-readers.h @@ -224,6 +224,10 @@ class ParquetColumnReader { /// Returns true if this column reader has reached the end of the row group. inline bool RowGroupAtEnd() { return rep_level_ == HdfsParquetScanner::ROW_GROUP_END; } + /// Transfers the remaining resources backing tuples to the given row batch, + /// and frees up other resources. + virtual void Close(RowBatch* row_batch) = 0; + protected: HdfsParquetScanner* parent_; const SchemaNode& node_; @@ -330,8 +334,10 @@ class BaseScalarColumnReader : public ParquetColumnReader { return Status::OK(); } - /// Called once when the scanner is complete for final cleanup. - void Close() { + virtual void Close(RowBatch* row_batch) { + if (decompressed_data_pool_.get() != NULL) { + row_batch->tuple_data_pool()->AcquireData(decompressed_data_pool_.get(), false); + } if (decompressor_.get() != NULL) decompressor_->Close(); } @@ -475,6 +481,12 @@ class CollectionColumnReader : public ParquetColumnReader { pos_current_value_ = -1; } + virtual void Close(RowBatch* row_batch) { + for (ParquetColumnReader* child_reader: children_) { + child_reader->Close(row_batch); + } + } + private: /// Column readers of fields contained within this collection. There is at least one /// child reader per collection reader. Child readers either materialize slots in the
