IMPALA-5389: simplify BufferDescriptor lifetime

This is cleanup to make the code easier to understand in anticipation of
some trickier changes to I/O buffer management for IMPALA-4835. I do
not expect any changes in behaviour as a result of this patch.

* Use unique_ptr to make BufferDescriptor ownership transfer more
  explicit and allow enforcing that the buffers are not leaked via
  a DCHECK in ~BufferDescriptor.
* Remove 'free_buffer_descs_' cache. TCMalloc is good at caching small
  objects and there will likely be a lot less lock contention compared
  with a single global lock. The cache did not avoid calls to
  malloc() anyway because appending to std::list<> requires allocating
  a list node.
* Use std::deque instead of std::list in a couple of places - it offers
  O(1) push_*()/pop_*() at both ends and requires fewer calls into
  malloc()/free().

Testing:
Ran ASAN and exhaustive builds.

Change-Id: I007d098e9a1abb1f684be37b7f1ee6c03d3879b2
Reviewed-on: http://gerrit.cloudera.org:8080/7182
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3480c892
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3480c892
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3480c892

Branch: refs/heads/master
Commit: 3480c892c86c41d0873c93e91f4ad5807a9daaee
Parents: 1fc7e65
Author: Tim Armstrong <[email protected]>
Authored: Tue May 30 13:20:40 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Fri Jun 16 09:38:23 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc      |   4 +-
 be/src/exec/scanner-context.cc           |  16 ++--
 be/src/exec/scanner-context.h            |   6 +-
 be/src/runtime/disk-io-mgr-scan-range.cc |  28 +++---
 be/src/runtime/disk-io-mgr-stress.cc     |   5 +-
 be/src/runtime/disk-io-mgr-test.cc       |  48 ++++++-----
 be/src/runtime/disk-io-mgr.cc            | 120 ++++++++------------------
 be/src/runtime/disk-io-mgr.h             |  93 ++++++++------------
 be/src/runtime/row-batch.cc              |  11 +--
 be/src/runtime/row-batch.h               |   4 +-
 be/src/runtime/tmp-file-mgr.cc           |   4 +-
 11 files changed, 137 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc 
b/be/src/exec/hdfs-parquet-scanner.cc
index 634d4ec..8de7c63 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -1357,13 +1357,13 @@ Status HdfsParquetScanner::ProcessFooter() {
         metadata_range_->disk_id(), metadata_range_->expected_local(),
         DiskIoMgr::BufferOpts::ReadInto(metadata_buffer.buffer(), 
metadata_size));
 
-    DiskIoMgr::BufferDescriptor* io_buffer;
+    unique_ptr<DiskIoMgr::BufferDescriptor> io_buffer;
     RETURN_IF_ERROR(
         io_mgr->Read(scan_node_->reader_context(), metadata_range, 
&io_buffer));
     DCHECK_EQ(io_buffer->buffer(), metadata_buffer.buffer());
     DCHECK_EQ(io_buffer->len(), metadata_size);
     DCHECK(io_buffer->eosr());
-    io_buffer->Return();
+    io_mgr->ReturnBuffer(move(io_buffer));
   }
 
   // Deserialize file header

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 335c921..7a998b3 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -21,8 +21,9 @@
 
 #include "exec/hdfs-scan-node-base.h"
 #include "exec/hdfs-scan-node.h"
-#include "runtime/row-batch.h"
+#include "runtime/exec-env.h"
 #include "runtime/mem-pool.h"
+#include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/string-buffer.h"
 #include "util/debug-util.h"
@@ -80,7 +81,6 @@ ScannerContext::Stream* 
ScannerContext::AddStream(DiskIoMgr::ScanRange* range) {
   stream->file_len_ = stream->file_desc_->file_length;
   stream->total_bytes_returned_ = 0;
   stream->io_buffer_pos_ = NULL;
-  stream->io_buffer_ = NULL;
   stream->io_buffer_bytes_left_ = 0;
   stream->boundary_buffer_bytes_left_ = 0;
   stream->output_buffer_pos_ = NULL;
@@ -97,24 +97,23 @@ void 
ScannerContext::Stream::ReleaseCompletedResources(RowBatch* batch, bool don
     // Mark any pending resources as completed
     if (io_buffer_ != nullptr) {
       ++parent_->num_completed_io_buffers_;
-      completed_io_buffers_.push_back(io_buffer_);
+      completed_io_buffers_.push_back(move(io_buffer_));
     }
     // Set variables to nullptr to make sure streams are not used again
-    io_buffer_ = nullptr;
     io_buffer_pos_ = nullptr;
     io_buffer_bytes_left_ = 0;
     // Cancel the underlying scan range to clean up any queued buffers there
     scan_range_->Cancel(Status::CANCELLED);
   }
 
-  for (DiskIoMgr::BufferDescriptor* buffer: completed_io_buffers_) {
+  for (unique_ptr<DiskIoMgr::BufferDescriptor>& buffer : 
completed_io_buffers_) {
     if (contains_tuple_data_ && batch != nullptr) {
-      batch->AddIoBuffer(buffer);
+      batch->AddIoBuffer(move(buffer));
       // TODO: We can do row batch compaction here.  This is the only place io 
buffers are
       // queued.  A good heuristic is to check the number of io buffers queued 
and if
       // there are too many, we should compact.
     } else {
-      buffer->Return();
+      ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(buffer));
       parent_->scan_node_->num_owned_io_buffers_.Add(-1);
     }
   }
@@ -147,8 +146,7 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t 
read_past_size) {
   if (io_buffer_ != NULL) {
     eosr = io_buffer_->eosr();
     ++parent_->num_completed_io_buffers_;
-    completed_io_buffers_.push_back(io_buffer_);
-    io_buffer_ = NULL;
+    completed_io_buffers_.push_back(move(io_buffer_));
   }
 
   if (!eosr) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index ff3cfa8..470ec01 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -19,6 +19,8 @@
 #ifndef IMPALA_EXEC_SCANNER_CONTEXT_H
 #define IMPALA_EXEC_SCANNER_CONTEXT_H
 
+#include <deque>
+
 #include <boost/cstdint.hpp>
 #include <boost/scoped_ptr.hpp>
 
@@ -198,7 +200,7 @@ class ScannerContext {
     int64_t next_read_past_size_bytes_;
 
     /// The current io buffer. This starts as NULL before we've read any bytes.
-    DiskIoMgr::BufferDescriptor* io_buffer_;
+    std::unique_ptr<DiskIoMgr::BufferDescriptor> io_buffer_;
 
     /// Next byte to read in io_buffer_
     uint8_t* io_buffer_pos_;
@@ -230,7 +232,7 @@ class ScannerContext {
     /// On the next GetBytes() call, these buffers are released (the caller by 
calling
     /// GetBytes() signals it is done with its previous bytes).  At this point 
the
     /// buffers are either returned to the io mgr or attached to the current 
row batch.
-    std::list<DiskIoMgr::BufferDescriptor*> completed_io_buffers_;
+    std::deque<std::unique_ptr<DiskIoMgr::BufferDescriptor>> 
completed_io_buffers_;
 
     Stream(ScannerContext* parent);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc 
b/be/src/runtime/disk-io-mgr-scan-range.cc
index 1528cd6..d909b94 100644
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ b/be/src/runtime/disk-io-mgr-scan-range.cc
@@ -47,7 +47,7 @@ DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum 
read chunk size to u
 // that buffers are queued and read in file order.
 
 // This must be called with the reader lock taken.
-bool DiskIoMgr::ScanRange::EnqueueBuffer(BufferDescriptor* buffer) {
+bool DiskIoMgr::ScanRange::EnqueueBuffer(unique_ptr<BufferDescriptor> buffer) {
   {
     unique_lock<mutex> scan_range_lock(lock_);
     DCHECK(Validate()) << DebugString();
@@ -60,12 +60,12 @@ bool DiskIoMgr::ScanRange::EnqueueBuffer(BufferDescriptor* 
buffer) {
         reader_->num_buffers_in_reader_.Add(1);
       }
       reader_->num_used_buffers_.Add(-1);
-      buffer->Return();
+      io_mgr_->ReturnBuffer(move(buffer));
       return false;
     }
     reader_->num_ready_buffers_.Add(1);
-    ready_buffers_.push_back(buffer);
     eosr_queued_ = buffer->eosr();
+    ready_buffers_.emplace_back(move(buffer));
 
     blocked_on_queue_ = ready_buffers_.size() >= ready_buffers_capacity_;
     if (blocked_on_queue_ && ready_buffers_capacity_ > MIN_QUEUE_CAPACITY) {
@@ -81,9 +81,8 @@ bool DiskIoMgr::ScanRange::EnqueueBuffer(BufferDescriptor* 
buffer) {
   return blocked_on_queue_;
 }
 
-Status DiskIoMgr::ScanRange::GetNext(BufferDescriptor** buffer) {
-  *buffer = nullptr;
-
+Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
+  DCHECK(*buffer == nullptr);
   {
     unique_lock<mutex> scan_range_lock(lock_);
     if (eosr_returned_) return Status::OK();
@@ -107,7 +106,7 @@ Status DiskIoMgr::ScanRange::GetNext(BufferDescriptor** 
buffer) {
 
     // Remove the first ready buffer from the queue and return it
     DCHECK(!ready_buffers_.empty());
-    *buffer = ready_buffers_.front();
+    *buffer = move(ready_buffers_.front());
     ready_buffers_.pop_front();
     eosr_returned_ = (*buffer)->eosr();
   }
@@ -121,8 +120,7 @@ Status DiskIoMgr::ScanRange::GetNext(BufferDescriptor** 
buffer) {
 
   Status status = (*buffer)->status_;
   if (!status.ok()) {
-    (*buffer)->Return();
-    *buffer = nullptr;
+    io_mgr_->ReturnBuffer(move(*buffer));
     return status;
   }
 
@@ -138,8 +136,7 @@ Status DiskIoMgr::ScanRange::GetNext(BufferDescriptor** 
buffer) {
   if (reader_->state_ == DiskIoRequestContext::Cancelled) {
     reader_->blocked_ranges_.Remove(this);
     Cancel(reader_->status_);
-    (*buffer)->Return();
-    *buffer = nullptr;
+    io_mgr_->ReturnBuffer(move(*buffer));
     return status_;
   }
 
@@ -184,8 +181,7 @@ void DiskIoMgr::ScanRange::CleanupQueuedBuffers() {
   reader_->num_ready_buffers_.Add(-ready_buffers_.size());
 
   while (!ready_buffers_.empty()) {
-    BufferDescriptor* buffer = ready_buffers_.front();
-    buffer->Return();
+    io_mgr_->ReturnBuffer(move(ready_buffers_.front()));
     ready_buffers_.pop_front();
   }
 }
@@ -605,13 +601,13 @@ Status DiskIoMgr::ScanRange::ReadFromCache(bool* 
read_succeeded) {
   // Create a single buffer desc for the entire scan range and enqueue that.
   // 'mem_tracker' is nullptr because the memory is owned by the HDFS java 
client,
   // not the Impala backend.
-  BufferDescriptor* desc = io_mgr_->GetBufferDesc(reader_, nullptr, this,
-      reinterpret_cast<uint8_t*>(buffer), 0);
+  unique_ptr<BufferDescriptor> desc = unique_ptr<BufferDescriptor>(new 
BufferDescriptor(
+      io_mgr_, reader_, this, reinterpret_cast<uint8_t*>(buffer), 0, nullptr));
   desc->len_ = bytes_read;
   desc->scan_range_offset_ = 0;
   desc->eosr_ = true;
   bytes_read_ = bytes_read;
-  EnqueueBuffer(desc);
+  EnqueueBuffer(move(desc));
   if (reader_->bytes_read_counter_ != nullptr) {
     COUNTER_ADD(reader_->bytes_read_counter_, bytes_read);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/runtime/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-stress.cc 
b/be/src/runtime/disk-io-mgr-stress.cc
index c25d6ef..af0c841 100644
--- a/be/src/runtime/disk-io-mgr-stress.cc
+++ b/be/src/runtime/disk-io-mgr-stress.cc
@@ -111,7 +111,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
       if (range == NULL) break;
 
       while (true) {
-        DiskIoMgr::BufferDescriptor* buffer;
+        unique_ptr<DiskIoMgr::BufferDescriptor> buffer;
         status = range->GetNext(&buffer);
         CHECK(status.ok() || status.IsCancelled());
         if (buffer == NULL) break;
@@ -133,8 +133,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
 
         // Copy the bytes from this read into the result buffer.
         memcpy(read_buffer + file_offset, buffer->buffer(), buffer->len());
-        buffer->Return();
-        buffer = NULL;
+        io_mgr_->ReturnBuffer(move(buffer));
         bytes_read += len;
 
         CHECK_GE(bytes_read, 0);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc 
b/be/src/runtime/disk-io-mgr-test.cc
index 9167a6c..38c0dd5 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -113,33 +113,33 @@ class DiskIoMgrTest : public testing::Test {
 
   static void ValidateSyncRead(DiskIoMgr* io_mgr, DiskIoRequestContext* reader,
       DiskIoMgr::ScanRange* range, const char* expected, int expected_len = 
-1) {
-    DiskIoMgr::BufferDescriptor* buffer;
+    unique_ptr<DiskIoMgr::BufferDescriptor> buffer;
     ASSERT_OK(io_mgr->Read(reader, range, &buffer));
     ASSERT_TRUE(buffer != NULL);
     EXPECT_EQ(buffer->len(), range->len());
     if (expected_len < 0) expected_len = strlen(expected);
     int cmp = memcmp(buffer->buffer(), expected, expected_len);
     EXPECT_TRUE(cmp == 0);
-    buffer->Return();
+    io_mgr->ReturnBuffer(move(buffer));
   }
 
-  static void ValidateScanRange(DiskIoMgr::ScanRange* range, const char* 
expected,
-      int expected_len, const Status& expected_status) {
+  static void ValidateScanRange(DiskIoMgr* io_mgr, DiskIoMgr::ScanRange* range,
+      const char* expected, int expected_len, const Status& expected_status) {
     char result[expected_len + 1];
     memset(result, 0, expected_len + 1);
 
     while (true) {
-      DiskIoMgr::BufferDescriptor* buffer = NULL;
+      unique_ptr<DiskIoMgr::BufferDescriptor> buffer;
       Status status = range->GetNext(&buffer);
       ASSERT_TRUE(status.ok() || status.code() == expected_status.code());
       if (buffer == NULL || !status.ok()) {
-        if (buffer != NULL) buffer->Return();
+        if (buffer != NULL) io_mgr->ReturnBuffer(move(buffer));
         break;
       }
       ASSERT_LE(buffer->len(), expected_len);
       memcpy(result + range->offset() + buffer->scan_range_offset(),
           buffer->buffer(), buffer->len());
-      buffer->Return();
+      io_mgr->ReturnBuffer(move(buffer));
     }
     ValidateEmptyOrCorrect(expected, result, expected_len);
   }
@@ -155,7 +155,7 @@ class DiskIoMgrTest : public testing::Test {
       Status status = io_mgr->GetNextRange(reader, &range);
       ASSERT_TRUE(status.ok() || status.code() == expected_status.code());
       if (range == NULL) break;
-      ValidateScanRange(range, expected_result, expected_len, expected_status);
+      ValidateScanRange(io_mgr, range, expected_result, expected_len, 
expected_status);
       num_ranges_processed->Add(1);
       ++num_ranges;
     }
@@ -653,7 +653,7 @@ TEST_F(DiskIoMgrTest, MemLimits) {
     ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
 
     // Don't return buffers to force memory pressure
-    vector<DiskIoMgr::BufferDescriptor*> buffers;
+    vector<unique_ptr<DiskIoMgr::BufferDescriptor>> buffers;
 
     AtomicInt32 num_ranges_processed;
     ScanRangeThread(&io_mgr, reader, data, strlen(data), 
Status::MemLimitExceeded(),
@@ -670,19 +670,19 @@ TEST_F(DiskIoMgrTest, MemLimits) {
       if (range == NULL) break;
 
       while (true) {
-        DiskIoMgr::BufferDescriptor* buffer = NULL;
+        unique_ptr<DiskIoMgr::BufferDescriptor> buffer;
         Status status = range->GetNext(&buffer);
         ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
         if (buffer == NULL) break;
         memcpy(result + range->offset() + buffer->scan_range_offset(),
             buffer->buffer(), buffer->len());
-        buffers.push_back(buffer);
+        buffers.push_back(move(buffer));
       }
       ValidateEmptyOrCorrect(data, result, strlen(data));
     }
 
     for (int i = 0; i < buffers.size(); ++i) {
-      buffers[i]->Return();
+      io_mgr.ReturnBuffer(move(buffers[i]));
     }
 
     EXPECT_TRUE(io_mgr.context_status(reader).IsMemLimitExceeded());
@@ -964,12 +964,13 @@ TEST_F(DiskIoMgrTest, Buffers) {
 
   // buffer length should be rounded up to min buffer size
   int64_t buffer_len = 1;
-  DiskIoMgr::BufferDescriptor* buffer_desc;
+  unique_ptr<DiskIoMgr::BufferDescriptor> buffer_desc;
   buffer_desc = io_mgr.GetFreeBuffer(reader, dummy_range, buffer_len);
   EXPECT_TRUE(buffer_desc->buffer() != NULL);
   EXPECT_EQ(min_buffer_size, buffer_desc->buffer_len());
   EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
-  io_mgr.FreeBufferMemory(buffer_desc);
+  io_mgr.FreeBufferMemory(buffer_desc.get());
+  io_mgr.ReturnBuffer(move(buffer_desc));
   EXPECT_EQ(min_buffer_size, root_mem_tracker.consumption());
 
   // reuse buffer
@@ -978,7 +979,8 @@ TEST_F(DiskIoMgrTest, Buffers) {
   EXPECT_TRUE(buffer_desc->buffer() != NULL);
   EXPECT_EQ(min_buffer_size, buffer_desc->buffer_len());
   EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
-  io_mgr.FreeBufferMemory(buffer_desc);
+  io_mgr.FreeBufferMemory(buffer_desc.get());
+  io_mgr.ReturnBuffer(move(buffer_desc));
   EXPECT_EQ(min_buffer_size, root_mem_tracker.consumption());
 
   // bump up to next buffer size
@@ -994,7 +996,8 @@ TEST_F(DiskIoMgrTest, Buffers) {
   EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
   EXPECT_EQ(min_buffer_size * 2, root_mem_tracker.consumption());
 
-  io_mgr.FreeBufferMemory(buffer_desc);
+  io_mgr.FreeBufferMemory(buffer_desc.get());
+  io_mgr.ReturnBuffer(move(buffer_desc));
 
   // max buffer size
   buffer_len = max_buffer_size;
@@ -1002,7 +1005,8 @@ TEST_F(DiskIoMgrTest, Buffers) {
   EXPECT_TRUE(buffer_desc->buffer() != NULL);
   EXPECT_EQ(max_buffer_size, buffer_desc->buffer_len());
   EXPECT_EQ(2, io_mgr.num_allocated_buffers_.Load());
-  io_mgr.FreeBufferMemory(buffer_desc);
+  io_mgr.FreeBufferMemory(buffer_desc.get());
+  io_mgr.ReturnBuffer(move(buffer_desc));
   EXPECT_EQ(min_buffer_size * 2 + max_buffer_size, 
root_mem_tracker.consumption());
 
   // gc buffers
@@ -1034,12 +1038,12 @@ TEST_F(DiskIoMgrTest, PartialRead) {
 
   // We should not read past the end of file.
   DiskIoMgr::ScanRange* range = InitRange(1, tmp_file, 0, read_len, 0, 
stat_val.st_mtime);
-  DiskIoMgr::BufferDescriptor* buffer;
+  unique_ptr<DiskIoMgr::BufferDescriptor> buffer;
   ASSERT_OK(io_mgr->Read(reader, range, &buffer));
   ASSERT_TRUE(buffer->eosr());
   ASSERT_EQ(len, buffer->len());
   ASSERT_TRUE(memcmp(buffer->buffer(), data, len) == 0);
-  buffer->Return();
+  io_mgr->ReturnBuffer(move(buffer));
 
   io_mgr->UnregisterContext(reader);
   pool_.reset();
@@ -1073,7 +1077,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
         DiskIoMgr::BufferOpts::ReadInto(&client_buffer[0], buffer_len));
     ASSERT_OK(io_mgr->AddScanRange(reader, range, true));
 
-    DiskIoMgr::BufferDescriptor* io_buffer;
+    unique_ptr<DiskIoMgr::BufferDescriptor> io_buffer;
     ASSERT_OK(range->GetNext(&io_buffer));
     ASSERT_TRUE(io_buffer->eosr());
     ASSERT_EQ(scan_len, io_buffer->len());
@@ -1082,7 +1086,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
 
     // DiskIoMgr should not have allocated memory.
     EXPECT_EQ(mem_tracker.consumption(), 0);
-    io_buffer->Return();
+    io_mgr->ReturnBuffer(move(io_buffer));
   }
 
   io_mgr->UnregisterContext(reader);
@@ -1115,7 +1119,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
     /// the read fails before the cancellation.
     if (i >= 1) io_mgr->CancelContext(reader);
 
-    DiskIoMgr::BufferDescriptor* io_buffer;
+    unique_ptr<DiskIoMgr::BufferDescriptor> io_buffer;
     ASSERT_FALSE(range->GetNext(&io_buffer).ok());
 
     // DiskIoMgr should not have allocated memory.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 89a8863..bfb6d60 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -196,41 +196,21 @@ string DiskIoMgr::DebugString() {
   return ss.str();
 }
 
-DiskIoMgr::BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr) : 
io_mgr_(io_mgr) {
-  Reset();
-}
-
-void DiskIoMgr::BufferDescriptor::Reset() {
-  DCHECK(io_mgr_ != nullptr);
-  reader_ = nullptr;
-  scan_range_ = nullptr;
-  mem_tracker_ = nullptr;
-  buffer_ = nullptr;
-  buffer_len_ = 0;
-  len_ = 0;
-  eosr_ = false;
-  status_ = Status::OK();
-  scan_range_offset_ = 0;
-}
-
-void DiskIoMgr::BufferDescriptor::Reset(DiskIoRequestContext* reader, 
ScanRange* range,
-    uint8_t* buffer, int64_t buffer_len, MemTracker* mem_tracker) {
-  DCHECK(io_mgr_ != nullptr);
-  DCHECK(buffer_ == nullptr);
-  DCHECK(range != nullptr);
+DiskIoMgr::BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr,
+    DiskIoRequestContext* reader, ScanRange* scan_range, uint8_t* buffer,
+    int64_t buffer_len, MemTracker* mem_tracker)
+  : io_mgr_(io_mgr),
+    reader_(reader),
+    mem_tracker_(mem_tracker),
+    scan_range_(scan_range),
+    buffer_(buffer),
+    buffer_len_(buffer_len) {
+  DCHECK(io_mgr != nullptr);
+  DCHECK(scan_range != nullptr);
   DCHECK(buffer != nullptr);
   DCHECK_GE(buffer_len, 0);
-  DCHECK_NE(range->external_buffer_tag_ == 
ScanRange::ExternalBufferTag::NO_BUFFER,
+  DCHECK_NE(scan_range->external_buffer_tag_ == 
ScanRange::ExternalBufferTag::NO_BUFFER,
       mem_tracker == nullptr);
-  reader_ = reader;
-  scan_range_ = range;
-  mem_tracker_ = mem_tracker;
-  buffer_ = buffer;
-  buffer_len_ = buffer_len;
-  len_ = 0;
-  eosr_ = false;
-  status_ = Status::OK();
-  scan_range_offset_ = 0;
 }
 
 void DiskIoMgr::BufferDescriptor::TransferOwnership(MemTracker* dst) {
@@ -244,11 +224,6 @@ void 
DiskIoMgr::BufferDescriptor::TransferOwnership(MemTracker* dst) {
   mem_tracker_ = dst;
 }
 
-void DiskIoMgr::BufferDescriptor::Return() {
-  DCHECK(io_mgr_ != nullptr);
-  io_mgr_->ReturnBuffer(this);
-}
-
 DiskIoMgr::WriteRange::WriteRange(
     const string& file, int64_t file_offset, int disk_id, WriteDoneCallback 
callback)
   : RequestRange(RequestType::WRITE), callback_(callback) {
@@ -630,7 +605,7 @@ Status DiskIoMgr::GetNextRange(DiskIoRequestContext* 
reader, ScanRange** range)
 }
 
 Status DiskIoMgr::Read(DiskIoRequestContext* reader,
-    ScanRange* range, BufferDescriptor** buffer) {
+    ScanRange* range, std::unique_ptr<BufferDescriptor>* buffer) {
   DCHECK(range != nullptr);
   DCHECK(buffer != nullptr);
   *buffer = nullptr;
@@ -651,7 +626,7 @@ Status DiskIoMgr::Read(DiskIoRequestContext* reader,
   return Status::OK();
 }
 
-void DiskIoMgr::ReturnBuffer(BufferDescriptor* buffer_desc) {
+void DiskIoMgr::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) {
   DCHECK(buffer_desc != nullptr);
   if (!buffer_desc->status_.ok()) DCHECK(buffer_desc->buffer_ == nullptr);
 
@@ -659,8 +634,9 @@ void DiskIoMgr::ReturnBuffer(BufferDescriptor* buffer_desc) 
{
   if (buffer_desc->buffer_ != nullptr) {
     if (!buffer_desc->is_cached() && !buffer_desc->is_client_buffer()) {
       // Buffers the were not allocated by DiskIoMgr don't need to be freed.
-      FreeBufferMemory(buffer_desc);
+      FreeBufferMemory(buffer_desc.get());
     }
+    buffer_desc->buffer_ = nullptr;
     num_buffers_in_readers_.Add(-1);
     reader->num_buffers_in_reader_.Add(-1);
   } else {
@@ -674,36 +650,10 @@ void DiskIoMgr::ReturnBuffer(BufferDescriptor* 
buffer_desc) {
     // Close() is idempotent so multiple cancelled buffers is okay.
     buffer_desc->scan_range_->Close();
   }
-  ReturnBufferDesc(buffer_desc);
-}
-
-void DiskIoMgr::ReturnBufferDesc(BufferDescriptor* desc) {
-  DCHECK(desc != nullptr);
-  desc->Reset();
-  unique_lock<mutex> lock(free_buffers_lock_);
-  DCHECK(find(free_buffer_descs_.begin(), free_buffer_descs_.end(), desc)
-         == free_buffer_descs_.end());
-  free_buffer_descs_.push_back(desc);
-}
-
-DiskIoMgr::BufferDescriptor* DiskIoMgr::GetBufferDesc(DiskIoRequestContext* 
reader,
-    MemTracker* mem_tracker, ScanRange* range, uint8_t* buffer, int64_t 
buffer_size) {
-  BufferDescriptor* buffer_desc;
-  {
-    unique_lock<mutex> lock(free_buffers_lock_);
-    if (free_buffer_descs_.empty()) {
-      buffer_desc = pool_.Add(new BufferDescriptor(this));
-    } else {
-      buffer_desc = free_buffer_descs_.front();
-      free_buffer_descs_.pop_front();
-    }
-  }
-  buffer_desc->Reset(reader, range, buffer, buffer_size, mem_tracker);
-  return buffer_desc;
 }
 
-DiskIoMgr::BufferDescriptor* DiskIoMgr::GetFreeBuffer(DiskIoRequestContext* 
reader,
-    ScanRange* range, int64_t buffer_size) {
+unique_ptr<DiskIoMgr::BufferDescriptor> DiskIoMgr::GetFreeBuffer(
+    DiskIoRequestContext* reader, ScanRange* range, int64_t buffer_size) {
   DCHECK_LE(buffer_size, max_buffer_size_);
   DCHECK_GT(buffer_size, 0);
   buffer_size = min(static_cast<int64_t>(max_buffer_size_), buffer_size);
@@ -744,7 +694,8 @@ DiskIoMgr::BufferDescriptor* 
DiskIoMgr::GetFreeBuffer(DiskIoRequestContext* read
   DCHECK(range != nullptr);
   DCHECK(reader != nullptr);
   DCHECK(buffer != nullptr);
-  return GetBufferDesc(reader, reader->mem_tracker_, range, buffer, 
buffer_size);
+  return unique_ptr<BufferDescriptor>(new BufferDescriptor(
+      this, reader, range, buffer, buffer_size, reader->mem_tracker_));
 }
 
 void DiskIoMgr::GcIoBuffers(int64_t bytes_to_free) {
@@ -753,7 +704,7 @@ void DiskIoMgr::GcIoBuffers(int64_t bytes_to_free) {
   int bytes_freed = 0;
   // Free small-to-large to avoid retaining many small buffers and fragmenting 
memory.
   for (int idx = 0; idx < free_buffers_.size(); ++idx) {
-    std::list<uint8_t*>* free_buffers = &free_buffers_[idx];
+    deque<uint8_t*>* free_buffers = &free_buffers_[idx];
     while (
         !free_buffers->empty() && (bytes_to_free == -1 || bytes_freed <= 
bytes_to_free)) {
       uint8_t* buffer = free_buffers->front();
@@ -972,7 +923,7 @@ void DiskIoMgr::HandleWriteFinished(
 }
 
 void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, 
DiskIoRequestContext* reader,
-    BufferDescriptor* buffer) {
+    unique_ptr<BufferDescriptor> buffer) {
   unique_lock<mutex> reader_lock(reader->lock_);
 
   DiskIoRequestContext::PerDiskState& state = 
reader->disk_states_[disk_queue->disk_id];
@@ -983,11 +934,12 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, 
DiskIoRequestContext*
   if (reader->state_ == DiskIoRequestContext::Cancelled) {
     state.DecrementRequestThreadAndCheckDone(reader);
     DCHECK(reader->Validate()) << endl << reader->DebugString();
-    if (!buffer->is_client_buffer()) FreeBufferMemory(buffer);
+    if (!buffer->is_client_buffer()) FreeBufferMemory(buffer.get());
     buffer->buffer_ = nullptr;
-    buffer->scan_range_->Cancel(reader->status_);
+    ScanRange* scan_range = buffer->scan_range_;
+    scan_range->Cancel(reader->status_);
     // Enqueue the buffer to use the scan range's buffer cleanup path.
-    buffer->scan_range_->EnqueueBuffer(buffer);
+    scan_range->EnqueueBuffer(move(buffer));
     return;
   }
 
@@ -1000,7 +952,7 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, 
DiskIoRequestContext*
   //  3. Middle of scan range
   if (!buffer->status_.ok()) {
     // Error case
-    if (!buffer->is_client_buffer()) FreeBufferMemory(buffer);
+    if (!buffer->is_client_buffer()) FreeBufferMemory(buffer.get());
     buffer->buffer_ = nullptr;
     buffer->eosr_ = true;
     --state.num_remaining_ranges();
@@ -1014,7 +966,7 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, 
DiskIoRequestContext*
   bool eosr = buffer->eosr_;
   ScanRange* scan_range = buffer->scan_range_;
   bool is_cached = buffer->is_cached();
-  bool queue_full = scan_range->EnqueueBuffer(buffer);
+  bool queue_full = scan_range->EnqueueBuffer(move(buffer));
   if (eosr) {
     // For cached buffers, we can't close the range until the cached buffer is 
returned.
     // Close() is called from DiskIoMgr::ReturnBuffer().
@@ -1063,14 +1015,14 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
 
 // This function reads the specified scan range associated with the
 // specified reader context and disk queue.
-void DiskIoMgr::ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader,
-    ScanRange* range) {
+void DiskIoMgr::ReadRange(
+    DiskQueue* disk_queue, DiskIoRequestContext* reader, ScanRange* range) {
   int64_t bytes_remaining = range->len_ - range->bytes_read_;
   DCHECK_GT(bytes_remaining, 0);
-  BufferDescriptor* buffer_desc = nullptr;
+  unique_ptr<BufferDescriptor> buffer_desc;
   if (range->external_buffer_tag_ == 
ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
-    buffer_desc = GetBufferDesc(
-        reader, nullptr, range, range->client_buffer_.data, 
range->client_buffer_.len);
+    buffer_desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(this, 
reader, range,
+        range->client_buffer_.data, range->client_buffer_.len, nullptr));
   } else {
     // Need to allocate a buffer to read into.
     int64_t buffer_size = ::min(bytes_remaining, 
static_cast<int64_t>(max_buffer_size_));
@@ -1109,10 +1061,10 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, 
DiskIoRequestContext* reader,
   }
 
   // Finished read, update reader/disk based on the results
-  HandleReadFinished(disk_queue, reader, buffer_desc);
+  HandleReadFinished(disk_queue, reader, move(buffer_desc));
 }
 
-DiskIoMgr::BufferDescriptor* DiskIoMgr::TryAllocateNextBufferForRange(
+unique_ptr<DiskIoMgr::BufferDescriptor> 
DiskIoMgr::TryAllocateNextBufferForRange(
     DiskQueue* disk_queue, DiskIoRequestContext* reader, ScanRange* range,
     int64_t buffer_size) {
   DCHECK(reader->mem_tracker_ != nullptr);
@@ -1149,7 +1101,7 @@ DiskIoMgr::BufferDescriptor* 
DiskIoMgr::TryAllocateNextBufferForRange(
       // now.
     }
   }
-  BufferDescriptor* buffer_desc = GetFreeBuffer(reader, range, buffer_size);
+  unique_ptr<BufferDescriptor> buffer_desc = GetFreeBuffer(reader, range, 
buffer_size);
   DCHECK(buffer_desc != nullptr);
   return buffer_desc;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index aae19ee..9621c92 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -18,8 +18,8 @@
 #ifndef IMPALA_RUNTIME_DISK_IO_MGR_H
 #define IMPALA_RUNTIME_DISK_IO_MGR_H
 
+#include <deque>
 #include <functional>
-#include <list>
 #include <vector>
 
 #include <boost/scoped_ptr.hpp>
@@ -146,9 +146,9 @@ class MemTracker;
 /// caller when constructing the scan range.
 ///
 /// As a caller reads from a scan range, these buffers are wrapped in 
BufferDescriptors
-/// and returned to the caller. The caller must always call Return() on the 
buffer
-/// descriptor when it when it is done to allow recycling of the buffer 
descriptor and
-/// the associated buffer (if there is an IoMgr-allocated or HDFS cached 
buffer).
+/// and returned to the caller. The caller must always call ReturnBuffer() on 
the buffer
+/// descriptor to allow recycling of the associated buffer (if there is an
+/// IoMgr-allocated or HDFS cached buffer).
 ///
 /// Caching support:
 /// Scan ranges contain metadata on whether or not it is cached on the DN. In 
that
@@ -169,7 +169,7 @@ class MemTracker;
 /// the number of scanner threads properly controls the amount of files we 
mlock.
 /// With cached scan ranges, we cannot close the scan range until the cached 
buffer
 /// is returned (HDFS does not allow this). We therefore need to defer the 
close until
-/// the cached buffer is returned (BufferDescriptor::Return()).
+/// the cached buffer is returned (ReturnBuffer()).
 //
 /// Remote filesystem support (e.g. S3):
 /// Remote filesystems are modeled as "remote disks". That is, there is a 
seperate disk
@@ -208,6 +208,10 @@ class DiskIoMgr : public CacheLineAligned {
   /// time.
   class BufferDescriptor {
    public:
+    ~BufferDescriptor() {
+      DCHECK(buffer_ == nullptr); // Check we didn't leak a buffer.
+    }
+
     ScanRange* scan_range() { return scan_range_; }
     uint8_t* buffer() { return buffer_; }
     int64_t buffer_len() { return buffer_len_; }
@@ -225,15 +229,16 @@ class DiskIoMgr : public CacheLineAligned {
     /// TODO: IMPALA-3209: revisit this as part of scanner memory usage revamp.
     void TransferOwnership(MemTracker* dst);
 
-    /// Returns the buffer to the IoMgr. This must be called for every buffer
-    /// returned by GetNext()/Read() that did not return an error. This is 
non-blocking.
-    /// After calling this, the buffer descriptor is invalid and cannot be 
accessed.
-    void Return();
-
    private:
     friend class DiskIoMgr;
+    friend class DiskIoMgr::ScanRange;
     friend class DiskIoRequestContext;
-    BufferDescriptor(DiskIoMgr* io_mgr);
+
+    /// Create a buffer descriptor for a new reader, range and data buffer. 
The buffer
+    /// memory should already be accounted against 'mem_tracker'.
+    BufferDescriptor(DiskIoMgr* io_mgr, DiskIoRequestContext* reader,
+        ScanRange* scan_range, uint8_t* buffer, int64_t buffer_len,
+        MemTracker* mem_tracker);
 
     /// Return true if this is a cached buffer owned by HDFS.
     bool is_cached() const {
@@ -248,42 +253,34 @@ class DiskIoMgr : public CacheLineAligned {
           == ScanRange::ExternalBufferTag::CLIENT_BUFFER;
     }
 
-    /// Reset the buffer descriptor to an uninitialized state.
-    void Reset();
-
-    /// Resets the buffer descriptor state for a new reader, range and data 
buffer.
-    /// The buffer memory should already be accounted against MemTracker
-    void Reset(DiskIoRequestContext* reader, ScanRange* range, uint8_t* buffer,
-        int64_t buffer_len, MemTracker* mem_tracker);
-
     DiskIoMgr* const io_mgr_;
 
     /// Reader that this buffer is for.
-    DiskIoRequestContext* reader_;
+    DiskIoRequestContext* const reader_;
 
     /// The current tracker this buffer is associated with. After 
initialisation,
     /// NULL for cached buffers and non-NULL for all other buffers.
     MemTracker* mem_tracker_;
 
     /// Scan range that this buffer is for. Non-NULL when initialised.
-    ScanRange* scan_range_;
+    ScanRange* const scan_range_;
 
     /// buffer with the read contents
     uint8_t* buffer_;
 
     /// length of buffer_. For buffers from cached reads, the length is 0.
-    int64_t buffer_len_;
+    const int64_t buffer_len_;
 
     /// length of read contents
-    int64_t len_;
+    int64_t len_ = 0;
 
     /// true if the current scan range is complete
-    bool eosr_;
+    bool eosr_ = false;
 
     /// Status of the read to this buffer. if status is not ok, 'buffer' is 
nullptr
     Status status_;
 
-    int64_t scan_range_offset_;
+    int64_t scan_range_offset_ = 0;
   };
 
   /// The request type, read or write associated with a request range.
@@ -411,7 +408,7 @@ class DiskIoMgr : public CacheLineAligned {
     /// called when all buffers have been returned, *buffer is set to nullptr 
and Status::OK
     /// is returned.
     /// Only one thread can be in GetNext() at any time.
-    Status GetNext(BufferDescriptor** buffer) WARN_UNUSED_RESULT;
+    Status GetNext(std::unique_ptr<BufferDescriptor>* buffer) 
WARN_UNUSED_RESULT;
 
     /// Cancel this scan range. This cleans up all queued buffers and
     /// wakes up any threads blocked on GetNext().
@@ -435,7 +432,7 @@ class DiskIoMgr : public CacheLineAligned {
     /// Returns true if this scan range has hit the queue capacity, false 
otherwise.
     /// The caller passes ownership of buffer to the scan range and it is not
     /// valid to access buffer after this call.
-    bool EnqueueBuffer(BufferDescriptor* buffer);
+    bool EnqueueBuffer(std::unique_ptr<BufferDescriptor> buffer);
 
     /// Cleanup any queued buffers (i.e. due to cancellation). This cannot
     /// be called with any locks taken.
@@ -566,7 +563,7 @@ class DiskIoMgr : public CacheLineAligned {
     /// IO buffers that are queued for this scan range.
     /// Condition variable for GetNext
     boost::condition_variable buffer_ready_cv_;
-    std::list<BufferDescriptor*> ready_buffers_;
+    std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_;
 
     /// The soft capacity limit for ready_buffers_. ready_buffers_ can exceed
     /// the limit temporarily as the capacity is adjusted dynamically.
@@ -711,7 +708,12 @@ class DiskIoMgr : public CacheLineAligned {
   /// This can only be used if the scan range fits in a single IO buffer (i.e. 
is smaller
   /// than max_read_buffer_size()) or if reading into a client-provided buffer.
   Status Read(DiskIoRequestContext* reader, ScanRange* range,
-      BufferDescriptor** buffer) WARN_UNUSED_RESULT;
+      std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT;
+
+  /// Returns the buffer to the IoMgr. This must be called for every buffer
+  /// returned by GetNext()/Read() that did not return an error. This is 
non-blocking.
+  /// After calling this, the buffer descriptor is invalid and cannot be 
accessed.
+  void ReturnBuffer(std::unique_ptr<BufferDescriptor> buffer);
 
   /// Determine which disk queue this file should be assigned to.  Returns an 
index into
   /// disk_queues_.  The disk_id is the volume ID for the local disk that 
holds the
@@ -810,9 +812,6 @@ class DiskIoMgr : public CacheLineAligned {
 
   friend class DiskIoMgrTest_Buffers_Test;
 
-  /// Pool to allocate BufferDescriptors.
-  ObjectPool pool_;
-
   /// Memory tracker for unused I/O buffers owned by DiskIoMgr.
   boost::scoped_ptr<MemTracker> free_buffer_mem_tracker_;
 
@@ -853,7 +852,7 @@ class DiskIoMgr : public CacheLineAligned {
   /// contention.
   boost::scoped_ptr<RequestContextCache> request_context_cache_;
 
-  /// Protects free_buffers_ and free_buffer_descs_
+  /// Protects free_buffers_
   boost::mutex free_buffers_lock_;
 
   /// Free buffers that can be handed out to clients. There is one list for 
each buffer
@@ -867,10 +866,7 @@ class DiskIoMgr : public CacheLineAligned {
   ///  free_buffers_[10] => list of free buffers with size 1 MB
   ///  free_buffers_[13] => list of free buffers with size 8 MB
   ///  free_buffers_[n]  => list of free buffers with size 2^n * 1024 B
-  std::vector<std::list<uint8_t*>> free_buffers_;
-
-  /// List of free buffer desc objects that can be handed out to clients
-  std::list<BufferDescriptor*> free_buffer_descs_;
+  std::vector<std::deque<uint8_t*>> free_buffers_;
 
   /// Total number of allocated buffers, used for debugging.
   AtomicInt32 num_allocated_buffers_;
@@ -905,21 +901,8 @@ class DiskIoMgr : public CacheLineAligned {
   /// The returned *buffer_size must be between 0 and 'max_buffer_size_'.
   /// The buffer memory is tracked against reader's mem tracker, or
   /// 'unowned_buffer_mem_tracker_' if the reader does not have one.
-  BufferDescriptor* GetFreeBuffer(DiskIoRequestContext* reader, ScanRange* 
range,
-      int64_t buffer_size);
-
-  /// Gets a BufferDescriptor initialized with the provided parameters. The 
object may be
-  /// recycled or newly allocated. Does not do anything aside from initialize 
the
-  /// descriptor's fields.
-  BufferDescriptor* GetBufferDesc(DiskIoRequestContext* reader, MemTracker* 
mem_tracker,
-      ScanRange* range, uint8_t* buffer, int64_t buffer_size);
-
-  /// Returns the buffer desc and underlying buffer to the disk IoMgr. This 
also updates
-  /// the reader and disk queue state.
-  void ReturnBuffer(BufferDescriptor* buffer);
-
-  /// Returns a buffer desc object which can now be used for another reader.
-  void ReturnBufferDesc(BufferDescriptor* desc);
+  std::unique_ptr<BufferDescriptor> GetFreeBuffer(
+      DiskIoRequestContext* reader, ScanRange* range, int64_t buffer_size);
 
   /// Disassociates the desc->buffer_ memory from 'desc' (which cannot be 
nullptr), either
   /// freeing it or returning it to 'free_buffers_'. Memory tracking is 
updated to
@@ -941,7 +924,8 @@ class DiskIoMgr : public CacheLineAligned {
 
   /// Updates disk queue and reader state after a read is complete. The read 
result
   /// is captured in the buffer descriptor.
-  void HandleReadFinished(DiskQueue*, DiskIoRequestContext*, 
BufferDescriptor*);
+  void HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext* reader,
+      std::unique_ptr<BufferDescriptor> buffer);
 
   /// Invokes write_range->callback_  after the range has been written and
   /// updates per-disk state and handle state. The status of the write 
OK/RUNTIME_ERROR
@@ -971,10 +955,9 @@ class DiskIoMgr : public CacheLineAligned {
   /// if successful. If 'reader' is cancelled, cancels the range and returns 
nullptr.
   /// If there is memory pressure and buffers are already queued, adds the 
range
   /// to the blocked ranges and returns nullptr.
-  BufferDescriptor* TryAllocateNextBufferForRange(DiskQueue* disk_queue,
+  std::unique_ptr<BufferDescriptor> TryAllocateNextBufferForRange(DiskQueue* 
disk_queue,
       DiskIoRequestContext* reader, ScanRange* range, int64_t buffer_size);
 };
-
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index 7f200dc..7668063 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -18,6 +18,7 @@
 #include "runtime/row-batch.h"
 
 #include <stdint.h> // for intptr_t
+#include <memory>
 #include <boost/scoped_ptr.hpp>
 
 #include "gen-cpp/Results_types.h"
@@ -153,7 +154,7 @@ RowBatch::RowBatch(
 RowBatch::~RowBatch() {
   tuple_data_pool_.FreeAll();
   for (int i = 0; i < io_buffers_.size(); ++i) {
-    io_buffers_[i]->Return();
+    ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffers_[i]));
   }
   for (int i = 0; i < blocks_.size(); ++i) {
     blocks_[i]->Delete();
@@ -295,11 +296,11 @@ void RowBatch::SerializeInternal(int64_t size, DedupMap* 
distinct_tuples,
   DCHECK_EQ(offset, size);
 }
 
-void RowBatch::AddIoBuffer(DiskIoMgr::BufferDescriptor* buffer) {
+void RowBatch::AddIoBuffer(unique_ptr<DiskIoMgr::BufferDescriptor> buffer) {
   DCHECK(buffer != NULL);
-  io_buffers_.push_back(buffer);
   auxiliary_mem_usage_ += buffer->buffer_len();
   buffer->TransferOwnership(mem_tracker_);
+  io_buffers_.emplace_back(move(buffer));
 }
 
 void RowBatch::AddBlock(BufferedBlockMgr::Block* block, FlushMode flush) {
@@ -326,7 +327,7 @@ void RowBatch::Reset() {
   // TODO: Change this to Clear() and investigate the repercussions.
   tuple_data_pool_.FreeAll();
   for (int i = 0; i < io_buffers_.size(); ++i) {
-    io_buffers_[i]->Return();
+    ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffers_[i]));
   }
   io_buffers_.clear();
   for (int i = 0; i < blocks_.size(); ++i) {
@@ -349,7 +350,7 @@ void RowBatch::Reset() {
 void RowBatch::TransferResourceOwnership(RowBatch* dest) {
   dest->tuple_data_pool_.AcquireData(&tuple_data_pool_, false);
   for (int i = 0; i < io_buffers_.size(); ++i) {
-    dest->AddIoBuffer(io_buffers_[i]);
+    dest->AddIoBuffer(move(io_buffers_[i]));
   }
   io_buffers_.clear();
   for (int i = 0; i < blocks_.size(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index e4b7191..0068478 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -214,7 +214,7 @@ class RowBatch {
   void Reset();
 
   /// Add io buffer to this row batch.
-  void AddIoBuffer(DiskIoMgr::BufferDescriptor* buffer);
+  void AddIoBuffer(std::unique_ptr<DiskIoMgr::BufferDescriptor> buffer);
 
   /// Adds a block to this row batch. The block must be pinned. The blocks 
must be
   /// deleted when freeing resources. The block's memory remains accounted 
against
@@ -432,7 +432,7 @@ class RowBatch {
   /// IO buffers current owned by this row batch. Ownership of IO buffers 
transfer
   /// between row batches. Any IO buffer will be owned by at most one row batch
   /// (i.e. they are not ref counted) so most row batches don't own any.
-  std::vector<DiskIoMgr::BufferDescriptor*> io_buffers_;
+  std::vector<std::unique_ptr<DiskIoMgr::BufferDescriptor>> io_buffers_;
 
   /// Blocks attached to this row batch. The underlying memory and block 
manager client
   /// are owned by the BufferedBlockMgr.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index bc09f2e..c99077f 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -402,7 +402,7 @@ Status TmpFileMgr::FileGroup::WaitForAsyncRead(WriteHandle* 
handle, MemRange buf
   // Don't grab handle->write_state_lock_, it is safe to touch all of handle's 
state
   // since the write is not in flight.
   SCOPED_TIMER(disk_read_timer_);
-  DiskIoMgr::BufferDescriptor* io_mgr_buffer = nullptr;
+  unique_ptr<DiskIoMgr::BufferDescriptor> io_mgr_buffer;
   Status status = handle->read_range_->GetNext(&io_mgr_buffer);
   if (!status.ok()) goto exit;
   DCHECK(io_mgr_buffer != NULL);
@@ -423,7 +423,7 @@ Status TmpFileMgr::FileGroup::WaitForAsyncRead(WriteHandle* 
handle, MemRange buf
   }
 exit:
   // Always return the buffer before exiting to avoid leaking it.
-  if (io_mgr_buffer != nullptr) io_mgr_buffer->Return();
+  if (io_mgr_buffer != nullptr) io_mgr_->ReturnBuffer(move(io_mgr_buffer));
   handle->read_range_ = nullptr;
   return status;
 }


Reply via email to