IMPALA-2885: ScannerContext::Stream objects should be owned by ScannerContext
Each scanner context has at least one stream which corresponds to a scan range. For parquet scanner, there can be multiple streams. These Stream objects are stored in the RuntimeState's object pool even though they have the same life span of the scanner threads. This change makes ScannerContext the owner of the Stream objects so they will be freed when ScannerContext is destroyed. Change-Id: Ic5440d414ecc0ca19676c553275aeb85231d6045 Reviewed-on: http://gerrit.cloudera.org:8080/3590 Reviewed-by: Michael Ho <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/59cdec21 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/59cdec21 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/59cdec21 Branch: refs/heads/master Commit: 59cdec21bf2a890e6f7fe856a8e1c616e8bebec3 Parents: f4fbd79 Author: Michael Ho <[email protected]> Authored: Wed Jul 6 19:20:34 2016 -0700 Committer: Taras Bobrovytsky <[email protected]> Committed: Thu Jul 14 19:04:44 2016 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-scanner.cc | 2 ++ be/src/exec/hdfs-text-scanner.cc | 5 +++-- be/src/exec/scanner-context.cc | 6 +++--- be/src/exec/scanner-context.h | 6 +++--- 4 files changed, 11 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59cdec21/be/src/exec/hdfs-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc index 275956c..5abd346 100644 --- a/be/src/exec/hdfs-scanner.cc +++ b/be/src/exec/hdfs-scanner.cc @@ -209,6 +209,8 @@ Status HdfsScanner::CommitRows(int num_rows) { void HdfsScanner::AddFinalRowBatch() { DCHECK(batch_ != NULL); + // Cannot DCHECK(stream_ != NULL) as parquet scanner sets it to NULL in ProcessSplit(). + stream_ = NULL; context_->ReleaseCompletedResources(batch_, /* done */ true); scan_node_->AddMaterializedRowBatch(batch_); batch_ = NULL; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59cdec21/be/src/exec/hdfs-text-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index 6cc308d..dcd3081 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -180,6 +180,7 @@ void HdfsTextScanner::Close() { decompressor_->Close(); decompressor_.reset(NULL); } + THdfsCompression::type compression = stream_->file_desc()->file_compression; if (batch_ != NULL) { AttachPool(data_buffer_pool_.get(), false); AttachPool(boundary_pool_.get(), false); @@ -189,9 +190,9 @@ void HdfsTextScanner::Close() { DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0); DCHECK_EQ(boundary_pool_.get()->total_allocated_bytes(), 0); DCHECK_EQ(context_->num_completed_io_buffers(), 0); + // Must happen after AddFinalRowBatch() is called. if (!only_parsing_header_) { - scan_node_->RangeComplete( - THdfsFileFormat::TEXT, stream_->file_desc()->file_compression); + scan_node_->RangeComplete(THdfsFileFormat::TEXT, compression); } HdfsScanner::Close(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59cdec21/be/src/exec/scanner-context.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc index f7f65be..4935f4c 100644 --- a/be/src/exec/scanner-context.cc +++ b/be/src/exec/scanner-context.cc @@ -63,7 +63,7 @@ ScannerContext::Stream::Stream(ScannerContext* parent) } ScannerContext::Stream* ScannerContext::AddStream(DiskIoMgr::ScanRange* range) { - Stream* stream = state_->obj_pool()->Add(new Stream(this)); + std::unique_ptr<Stream> stream(new Stream(this)); stream->scan_range_ = range; stream->file_desc_ = scan_node_->GetFileDesc(stream->filename()); stream->file_len_ = stream->file_desc_->file_length; @@ -76,8 +76,8 @@ ScannerContext::Stream* ScannerContext::AddStream(DiskIoMgr::ScanRange* range) { stream->output_buffer_bytes_left_ = const_cast<int64_t*>(&OUTPUT_BUFFER_BYTES_LEFT_INIT); stream->contains_tuple_data_ = scan_node_->tuple_desc()->ContainsStringData(); - streams_.push_back(stream); - return stream; + streams_.push_back(std::move(stream)); + return streams_.back().get(); } void ScannerContext::Stream::ReleaseCompletedResources(RowBatch* batch, bool done) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59cdec21/be/src/exec/scanner-context.h ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h index 2ab35e6..b90d512 100644 --- a/be/src/exec/scanner-context.h +++ b/be/src/exec/scanner-context.h @@ -264,7 +264,7 @@ class ScannerContext { Stream* GetStream(int idx = 0) { DCHECK_GE(idx, 0); DCHECK_LT(idx, streams_.size()); - return streams_[idx]; + return streams_[idx].get(); } /// If a non-NULL 'batch' is passed, attaches completed io buffers and boundary mem pools @@ -304,8 +304,8 @@ class ScannerContext { HdfsPartitionDescriptor* partition_desc_; - /// Vector of streams. Non-columnar formats will always have one stream per context. - std::vector<Stream*> streams_; + /// Vector of streams. Non-columnar formats will always have one stream per context. + std::vector<std::unique_ptr<Stream>> streams_; /// Always equal to the sum of completed_io_buffers_.size() across all streams. int num_completed_io_buffers_;
