IMPALA-2885: ScannerContext::Stream objects should be owned by ScannerContext

Each scanner context has at least one stream which corresponds to
a scan range. For parquet scanner, there can be multiple streams.
These Stream objects are stored in the RuntimeState's object pool
even though they have the same life span of the scanner threads.
This change makes ScannerContext the owner of the Stream objects
so they will be freed when ScannerContext is destroyed.

Change-Id: Ic5440d414ecc0ca19676c553275aeb85231d6045
Reviewed-on: http://gerrit.cloudera.org:8080/3590
Reviewed-by: Michael Ho <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/59cdec21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/59cdec21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/59cdec21

Branch: refs/heads/master
Commit: 59cdec21bf2a890e6f7fe856a8e1c616e8bebec3
Parents: f4fbd79
Author: Michael Ho <[email protected]>
Authored: Wed Jul 6 19:20:34 2016 -0700
Committer: Taras Bobrovytsky <[email protected]>
Committed: Thu Jul 14 19:04:44 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-scanner.cc      | 2 ++
 be/src/exec/hdfs-text-scanner.cc | 5 +++--
 be/src/exec/scanner-context.cc   | 6 +++---
 be/src/exec/scanner-context.h    | 6 +++---
 4 files changed, 11 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59cdec21/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 275956c..5abd346 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -209,6 +209,8 @@ Status HdfsScanner::CommitRows(int num_rows) {
 
 void HdfsScanner::AddFinalRowBatch() {
   DCHECK(batch_ != NULL);
+  // Cannot DCHECK(stream_ != NULL) as parquet scanner sets it to NULL in 
ProcessSplit().
+  stream_ = NULL;
   context_->ReleaseCompletedResources(batch_, /* done */ true);
   scan_node_->AddMaterializedRowBatch(batch_);
   batch_ = NULL;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59cdec21/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 6cc308d..dcd3081 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -180,6 +180,7 @@ void HdfsTextScanner::Close() {
     decompressor_->Close();
     decompressor_.reset(NULL);
   }
+  THdfsCompression::type compression = stream_->file_desc()->file_compression;
   if (batch_ != NULL) {
     AttachPool(data_buffer_pool_.get(), false);
     AttachPool(boundary_pool_.get(), false);
@@ -189,9 +190,9 @@ void HdfsTextScanner::Close() {
   DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(boundary_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(context_->num_completed_io_buffers(), 0);
+  // Must happen after AddFinalRowBatch() is called.
   if (!only_parsing_header_) {
-    scan_node_->RangeComplete(
-        THdfsFileFormat::TEXT, stream_->file_desc()->file_compression);
+    scan_node_->RangeComplete(THdfsFileFormat::TEXT, compression);
   }
   HdfsScanner::Close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59cdec21/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index f7f65be..4935f4c 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -63,7 +63,7 @@ ScannerContext::Stream::Stream(ScannerContext* parent)
 }
 
 ScannerContext::Stream* ScannerContext::AddStream(DiskIoMgr::ScanRange* range) 
{
-  Stream* stream = state_->obj_pool()->Add(new Stream(this));
+  std::unique_ptr<Stream> stream(new Stream(this));
   stream->scan_range_ = range;
   stream->file_desc_ = scan_node_->GetFileDesc(stream->filename());
   stream->file_len_ = stream->file_desc_->file_length;
@@ -76,8 +76,8 @@ ScannerContext::Stream* 
ScannerContext::AddStream(DiskIoMgr::ScanRange* range) {
   stream->output_buffer_bytes_left_ =
       const_cast<int64_t*>(&OUTPUT_BUFFER_BYTES_LEFT_INIT);
   stream->contains_tuple_data_ = 
scan_node_->tuple_desc()->ContainsStringData();
-  streams_.push_back(stream);
-  return stream;
+  streams_.push_back(std::move(stream));
+  return streams_.back().get();
 }
 
 void ScannerContext::Stream::ReleaseCompletedResources(RowBatch* batch, bool 
done) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59cdec21/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 2ab35e6..b90d512 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -264,7 +264,7 @@ class ScannerContext {
   Stream* GetStream(int idx = 0) {
     DCHECK_GE(idx, 0);
     DCHECK_LT(idx, streams_.size());
-    return streams_[idx];
+    return streams_[idx].get();
   }
 
   /// If a non-NULL 'batch' is passed, attaches completed io buffers and 
boundary mem pools
@@ -304,8 +304,8 @@ class ScannerContext {
 
   HdfsPartitionDescriptor* partition_desc_;
 
-  /// Vector of streams.  Non-columnar formats will always have one stream per 
context.
-  std::vector<Stream*> streams_;
+  /// Vector of streams. Non-columnar formats will always have one stream per 
context.
+  std::vector<std::unique_ptr<Stream>> streams_;
 
   /// Always equal to the sum of completed_io_buffers_.size() across all 
streams.
   int num_completed_io_buffers_;

Reply via email to