IMPALA-6383: free memory after skipping parquet row groups Before this patch, resources were only flushed after breaking out of NextRowGroup(). This is a problem because resources can be allocated for skipped row groups (e.g. for reading dictionaries).
Testing: Tested in conjunction with a prototype buffer pool patch that was DCHECKing before the change. Added DCHECKs to the current version to ensure the streams are cleared up as expected. Change-Id: Ibc2f8f27c9b238be60261539f8d4be2facb57a2b Reviewed-on: http://gerrit.cloudera.org:8080/9002 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/10fb24af Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/10fb24af Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/10fb24af Branch: refs/heads/master Commit: 10fb24afb966c567adcf632a314f6af1826f19fc Parents: df3a440 Author: Tim Armstrong <[email protected]> Authored: Wed Jan 10 15:35:41 2018 -0800 Committer: Impala Public Jenkins <[email protected]> Committed: Sat Jan 13 02:48:08 2018 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-parquet-scanner.cc | 26 +++++++++++++++++++++----- be/src/exec/hdfs-parquet-scanner.h | 5 +++++ be/src/exec/scanner-context.h | 8 ++++---- 3 files changed, 30 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/10fb24af/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index f0f280d..6380722 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -228,6 +228,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) { // Release I/O buffers immediately to make sure they are cleaned up // in case we return a non-OK status anywhere below. context_->ReleaseCompletedResources(true); + context_->ClearStreams(); RETURN_IF_ERROR(footer_status); // Parse the file schema into an internal representation for schema resolution. @@ -263,7 +264,7 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) { } } else { template_tuple_pool_->FreeAll(); - dictionary_pool_.get()->FreeAll(); + dictionary_pool_->FreeAll(); context_->ReleaseCompletedResources(true); for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(nullptr); // The scratch batch may still contain tuple data. We can get into this case if @@ -478,7 +479,6 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) { // Transfer resources and clear streams if there is any leftover from the previous // row group. We will create new streams for the next row group. FlushRowGroupResources(row_batch); - context_->ClearStreams(); if (!advance_row_group_) { Status status = ValidateEndOfRowGroup(column_readers_, row_group_idx_, row_group_rows_read_); @@ -619,6 +619,9 @@ Status HdfsParquetScanner::NextRowGroup() { while (true) { // Reset the parse status for the next row group. parse_status_ = Status::OK(); + // Make sure that we don't have leftover resources from the file metadata scan range + // or previous row groups. + DCHECK_EQ(0, context_->NumStreams()); ++row_group_idx_; if (row_group_idx_ >= file_metadata_.row_groups.size()) { @@ -669,6 +672,9 @@ Status HdfsParquetScanner::NextRowGroup() { // of the column. RETURN_IF_ERROR(InitColumns(row_group_idx_, dict_filterable_readers_)); + // InitColumns() may have allocated resources to scan columns. If we skip this row + // group below, we must call ReleaseSkippedRowGroupResources() before continuing. + // If there is a dictionary-encoded column where every value is eliminated // by a conjunct, the row group can be eliminated. This initializes dictionaries // for all columns visited. @@ -677,10 +683,12 @@ Status HdfsParquetScanner::NextRowGroup() { if (!status.ok()) { // Either return an error or skip this row group if it is ok to ignore errors RETURN_IF_ERROR(state_->LogOrReturnError(status.msg())); + ReleaseSkippedRowGroupResources(); continue; } if (skip_row_group_on_dict_filters) { COUNTER_ADD(num_dict_filtered_row_groups_counter_, 1); + ReleaseSkippedRowGroupResources(); continue; } @@ -692,6 +700,7 @@ Status HdfsParquetScanner::NextRowGroup() { if (!status.ok()) { // Either return an error or skip this row group if it is ok to ignore errors RETURN_IF_ERROR(state_->LogOrReturnError(status.msg())); + ReleaseSkippedRowGroupResources(); continue; } @@ -730,9 +739,16 @@ void HdfsParquetScanner::FlushRowGroupResources(RowBatch* row_batch) { row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false); scratch_batch_->ReleaseResources(row_batch->tuple_data_pool()); context_->ReleaseCompletedResources(true); - for (ParquetColumnReader* col_reader : column_readers_) { - col_reader->Close(row_batch); - } + for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(row_batch); + context_->ClearStreams(); +} + +void HdfsParquetScanner::ReleaseSkippedRowGroupResources() { + dictionary_pool_->FreeAll(); + scratch_batch_->ReleaseResources(nullptr); + context_->ReleaseCompletedResources(true); + for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(nullptr); + context_->ClearStreams(); } bool HdfsParquetScanner::IsDictFilterable(ParquetColumnReader* col_reader) { http://git-wip-us.apache.org/repos/asf/impala/blob/10fb24af/be/src/exec/hdfs-parquet-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h index 99b5a60..cea06ed 100644 --- a/be/src/exec/hdfs-parquet-scanner.h +++ b/be/src/exec/hdfs-parquet-scanner.h @@ -642,6 +642,11 @@ class HdfsParquetScanner : public HdfsScanner { /// Should be called after completing a row group and when returning the last batch. void FlushRowGroupResources(RowBatch* row_batch); + /// Releases resources associated with a row group that was skipped and closes all + /// column readers. Should be called after skipping a row group from which no rows + /// were returned. + void ReleaseSkippedRowGroupResources(); + /// Evaluates whether the column reader is eligible for dictionary predicates bool IsDictFilterable(ParquetColumnReader* col_reader); http://git-wip-us.apache.org/repos/asf/impala/blob/10fb24af/be/src/exec/scanner-context.h ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h index e316063..09a4bdc 100644 --- a/be/src/exec/scanner-context.h +++ b/be/src/exec/scanner-context.h @@ -89,7 +89,6 @@ class ScannerContext { ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*, io::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs, MemPool* expr_results_pool); - /// Destructor verifies that all stream objects have been released. ~ScannerContext(); @@ -338,6 +337,8 @@ class ScannerContext { return streams_[idx].get(); } + int NumStreams() const { return streams_.size(); } + /// Release completed resources for all streams, e.g. the last buffer in each stream if /// the current read position is at the end of the buffer. If 'done' is true all /// resources are freed, even if the caller has not read that data yet. After calling @@ -354,8 +355,8 @@ class ScannerContext { /// size to 0. void ClearStreams(); - /// Add a stream to this ScannerContext for 'range'. Returns the added stream. - /// The stream is created in the runtime state's object pool + /// Add a stream to this ScannerContext for 'range'. The stream is owned by this + /// context. Stream* AddStream(io::ScanRange* range); /// Returns false if scan_node_ is multi-threaded and has been cancelled. @@ -370,7 +371,6 @@ class ScannerContext { RuntimeState* state_; HdfsScanNodeBase* scan_node_; - HdfsPartitionDescriptor* partition_desc_; /// Vector of streams. Non-columnar formats will always have one stream per context.
