Repository: incubator-impala
Updated Branches:
  refs/heads/master 45d059855 -> 17bf14417


IMPALA-3611: track unused Disk IO buffer memory

Track I/O buffers against separate MemTrackers. This gives us better
visibility into memory consumption from the debug webpage and from
MemTracker consumption dumps. The immediate motivation was in trying to
determine whether idle memory consumption of an impalad was caused by a
memory leak.

We add two trackers: for buffers cached in DiskIoMgr's free list,
and another for clients that don't provide a MemTracker (the only
one is BufferedBlockMgr, which will be removed at some point).

The previous code "tracked" the buffers against the process-wide
tracker, but it was a no-op outside of ASAN builds since the
process-wide tracker took its value from TCMalloc.

The test code required fixing because it assumed that buffers were
always credited against the DiskIoMgr's tracker. This only made sense
when the DiskIoMgr's tracker is the root process-wide tracker.

Fix backend test logging for disk-io-mgr-test.

Testing:
Ran exhaustive tests.

Change-Id: I8777cf76f04d34a46f53d53005412e0f1d63b5b7
Reviewed-on: http://gerrit.cloudera.org:8080/3799
Reviewed-by: Dan Hecht <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/17bf1441
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/17bf1441
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/17bf1441

Branch: refs/heads/master
Commit: 17bf14417e3438d772b19111431453bdd537742a
Parents: 45d0598
Author: Tim Armstrong <[email protected]>
Authored: Mon May 23 14:09:39 2016 -0700
Committer: Internal Jenkins <[email protected]>
Committed: Sat Aug 6 00:31:23 2016 +0000

----------------------------------------------------------------------
 be/src/runtime/disk-io-mgr-scan-range.cc |   4 +-
 be/src/runtime/disk-io-mgr-test.cc       |  88 ++++++----
 be/src/runtime/disk-io-mgr.cc            | 241 +++++++++++++++-----------
 be/src/runtime/disk-io-mgr.h             |  77 ++++----
 be/src/runtime/row-batch.cc              |   2 +-
 5 files changed, 243 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/17bf1441/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc 
b/be/src/runtime/disk-io-mgr-scan-range.cc
index 9a7e39a..0de9622 100644
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ b/be/src/runtime/disk-io-mgr-scan-range.cc
@@ -442,8 +442,10 @@ Status DiskIoMgr::ScanRange::ReadFromCache(bool* 
read_succeeded) {
   DCHECK_EQ(bytes_read, len());
 
   // Create a single buffer desc for the entire scan range and enqueue that.
+  // 'mem_tracker' is NULL because the memory is owned by the HDFS java client,
+  // not the Impala backend.
   BufferDescriptor* desc = io_mgr_->GetBufferDesc(
-      reader_, this, reinterpret_cast<char*>(buffer), 0);
+      reader_, NULL, this, reinterpret_cast<char*>(buffer), 0);
   desc->len_ = bytes_read;
   desc->scan_range_offset_ = 0;
   desc->eosr_ = true;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/17bf1441/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc 
b/be/src/runtime/disk-io-mgr-test.cc
index 46149b5..243b2e1 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -19,6 +19,7 @@
 
 #include "testutil/gtest-util.h"
 #include "codegen/llvm-codegen.h"
+#include "common/init.h"
 #include "runtime/disk-io-mgr.h"
 #include "runtime/disk-io-mgr-stress.h"
 #include "runtime/mem-tracker.h"
@@ -39,6 +40,14 @@ namespace impala {
 
 class DiskIoMgrTest : public testing::Test {
  public:
+
+  virtual void SetUp() {
+    pool_.reset(new ObjectPool);
+  }
+
+  virtual void TearDown() {
+    pool_.reset();
+  }
   void WriteValidateCallback(int num_writes, DiskIoMgr::WriteRange** 
written_range,
       DiskIoMgr* io_mgr, DiskIoRequestContext* reader, int32_t* data,
       Status expected_status, const Status& status) {
@@ -220,6 +229,7 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
   read_io_mgr->UnregisterContext(reader);
   read_io_mgr.reset();
 }
+
 // Perform invalid writes (e.g. non-existent file, negative offset) and 
validate
 // that an error status is returned via the write callback.
 TEST_F(DiskIoMgrTest, InvalidWrite) {
@@ -230,7 +240,6 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   ASSERT_OK(io_mgr.Init(&mem_tracker));
   DiskIoRequestContext* writer;
   ASSERT_OK(io_mgr.RegisterContext(&writer));
-  pool_.reset(new ObjectPool);
   int32_t* data = pool_->Add(new int32_t);
   *data = rand();
 
@@ -619,11 +628,11 @@ TEST_F(DiskIoMgrTest, MemLimits) {
     pool_.reset(new ObjectPool);
     if (++iters % 1000 == 0) LOG(ERROR) << "Starting iteration " << iters;
 
-    MemTracker mem_tracker(mem_limit_num_buffers * MAX_BUFFER_SIZE);
+    MemTracker root_mem_tracker(mem_limit_num_buffers * MAX_BUFFER_SIZE);
     DiskIoMgr io_mgr(1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
-    ASSERT_OK(io_mgr.Init(&mem_tracker));
-    MemTracker reader_mem_tracker;
+    ASSERT_OK(io_mgr.Init(&root_mem_tracker));
+    MemTracker reader_mem_tracker(-1, -1, "Reader", &root_mem_tracker);
     DiskIoRequestContext* reader;
     ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
 
@@ -932,54 +941,66 @@ TEST_F(DiskIoMgrTest, Buffers) {
   // Test default min/max buffer size
   int min_buffer_size = 1024;
   int max_buffer_size = 8 * 1024 * 1024; // 8 MB
-  MemTracker mem_tracker(max_buffer_size * 2);
+  MemTracker root_mem_tracker(max_buffer_size * 2);
 
   DiskIoMgr io_mgr(1, 1, min_buffer_size, max_buffer_size);
-  ASSERT_OK(io_mgr.Init(&mem_tracker));
-  ASSERT_EQ(mem_tracker.consumption(), 0);
+  ASSERT_OK(io_mgr.Init(&root_mem_tracker));
+  ASSERT_EQ(root_mem_tracker.consumption(), 0);
+
+  MemTracker reader_mem_tracker(-1, -1, "Reader", &root_mem_tracker);
+  DiskIoRequestContext* reader;
+  ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
+
+  DiskIoMgr::ScanRange* dummy_range = InitRange(1, "dummy", 0, 0, 0, 0);
 
   // buffer length should be rounded up to min buffer size
   int64_t buffer_len = 1;
-  char* buf = io_mgr.GetFreeBuffer(&buffer_len);
-  EXPECT_EQ(buffer_len, min_buffer_size);
-  EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 1);
-  io_mgr.ReturnFreeBuffer(buf, buffer_len);
-  EXPECT_EQ(mem_tracker.consumption(), min_buffer_size);
+  DiskIoMgr::BufferDescriptor* buffer_desc;
+  buffer_desc = io_mgr.GetFreeBuffer(reader, dummy_range, buffer_len);
+  EXPECT_TRUE(buffer_desc->buffer() != NULL);
+  EXPECT_EQ(min_buffer_size, buffer_desc->buffer_len());
+  EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
+  io_mgr.FreeBufferMemory(buffer_desc);
+  EXPECT_EQ(min_buffer_size, root_mem_tracker.consumption());
 
   // reuse buffer
   buffer_len = min_buffer_size;
-  buf = io_mgr.GetFreeBuffer(&buffer_len);
-  EXPECT_EQ(buffer_len, min_buffer_size);
-  EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 1);
-  io_mgr.ReturnFreeBuffer(buf, buffer_len);
-  EXPECT_EQ(mem_tracker.consumption(), min_buffer_size);
+  buffer_desc = io_mgr.GetFreeBuffer(reader, dummy_range, buffer_len);
+  EXPECT_TRUE(buffer_desc->buffer() != NULL);
+  EXPECT_EQ(min_buffer_size, buffer_desc->buffer_len());
+  EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
+  io_mgr.FreeBufferMemory(buffer_desc);
+  EXPECT_EQ(min_buffer_size, root_mem_tracker.consumption());
 
   // bump up to next buffer size
   buffer_len = min_buffer_size + 1;
-  buf = io_mgr.GetFreeBuffer(&buffer_len);
-  EXPECT_EQ(buffer_len, min_buffer_size * 2);
-  EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 2);
-  EXPECT_EQ(mem_tracker.consumption(), min_buffer_size * 3);
+  buffer_desc = io_mgr.GetFreeBuffer(reader, dummy_range, buffer_len);
+  EXPECT_TRUE(buffer_desc->buffer() != NULL);
+  EXPECT_EQ(min_buffer_size * 2, buffer_desc->buffer_len());
+  EXPECT_EQ(2, io_mgr.num_allocated_buffers_.Load());
+  EXPECT_EQ(min_buffer_size * 3, root_mem_tracker.consumption());
 
   // gc unused buffer
   io_mgr.GcIoBuffers();
-  EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 1);
-  EXPECT_EQ(mem_tracker.consumption(), min_buffer_size * 2);
+  EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
+  EXPECT_EQ(min_buffer_size * 2, root_mem_tracker.consumption());
 
-  io_mgr.ReturnFreeBuffer(buf, buffer_len);
+  io_mgr.FreeBufferMemory(buffer_desc);
 
   // max buffer size
   buffer_len = max_buffer_size;
-  buf = io_mgr.GetFreeBuffer(&buffer_len);
-  EXPECT_EQ(buffer_len, max_buffer_size);
-  EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 2);
-  io_mgr.ReturnFreeBuffer(buf, buffer_len);
-  EXPECT_EQ(mem_tracker.consumption(), min_buffer_size * 2 + max_buffer_size);
+  buffer_desc = io_mgr.GetFreeBuffer(reader, dummy_range, buffer_len);
+  EXPECT_TRUE(buffer_desc->buffer() != NULL);
+  EXPECT_EQ(max_buffer_size, buffer_desc->buffer_len());
+  EXPECT_EQ(2, io_mgr.num_allocated_buffers_.Load());
+  io_mgr.FreeBufferMemory(buffer_desc);
+  EXPECT_EQ(min_buffer_size * 2 + max_buffer_size, 
root_mem_tracker.consumption());
 
   // gc buffers
   io_mgr.GcIoBuffers();
   EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 0);
-  EXPECT_EQ(mem_tracker.consumption(), 0);
+  EXPECT_EQ(root_mem_tracker.consumption(), 0);
+  io_mgr.UnregisterContext(reader);
 }
 
 // IMPALA-2366: handle partial read where range goes past end of file.
@@ -995,7 +1016,6 @@ TEST_F(DiskIoMgrTest, PartialRead) {
   struct stat stat_val;
   stat(tmp_file, &stat_val);
 
-  pool_.reset(new ObjectPool);
   scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, read_len, read_len));
 
   ASSERT_OK(io_mgr->Init(&mem_tracker));
@@ -1022,11 +1042,7 @@ TEST_F(DiskIoMgrTest, PartialRead) {
 }
 
 int main(int argc, char **argv) {
-  google::InitGoogleLogging(argv[0]);
   ::testing::InitGoogleTest(&argc, argv);
-  impala::CpuInfo::Init();
-  impala::DiskInfo::Init();
-  impala::OsInfo::Init();
-  impala::InitThreading();
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
   return RUN_ALL_TESTS();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/17bf1441/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 448424f..769c505 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -203,25 +203,50 @@ string DiskIoMgr::DebugString() {
   return ss.str();
 }
 
-DiskIoMgr::BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr) :
-  io_mgr_(io_mgr), reader_(NULL), buffer_(NULL) {
+DiskIoMgr::BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr) : 
io_mgr_(io_mgr) {
+  Reset();
+}
+
+void DiskIoMgr::BufferDescriptor::Reset() {
+  DCHECK(io_mgr_ != NULL);
+  reader_ = NULL;
+  scan_range_ = NULL;
+  mem_tracker_ = NULL;
+  buffer_ = NULL;
+  buffer_len_ = 0;
+  len_ = 0;
+  eosr_ = false;
+  status_ = Status::OK();
+  scan_range_offset_ = 0;
 }
 
 void DiskIoMgr::BufferDescriptor::Reset(DiskIoRequestContext* reader,
-      ScanRange* range, char* buffer, int64_t buffer_len) {
+    ScanRange* range, char* buffer, int64_t buffer_len, MemTracker* 
mem_tracker) {
   DCHECK(io_mgr_ != NULL);
   DCHECK(buffer_ == NULL);
   DCHECK(range != NULL);
   DCHECK(buffer != NULL);
   DCHECK_GE(buffer_len, 0);
+  DCHECK_NE(range->cached_buffer_ == NULL, mem_tracker == NULL);
   reader_ = reader;
   scan_range_ = range;
+  mem_tracker_ = mem_tracker;
   buffer_ = buffer;
   buffer_len_ = buffer_len;
   len_ = 0;
   eosr_ = false;
   status_ = Status::OK();
-  mem_tracker_ = NULL;
+  scan_range_offset_ = 0;
+}
+
+void DiskIoMgr::BufferDescriptor::TransferOwnership(MemTracker* dst) {
+  DCHECK(dst != NULL);
+  // Memory of cached buffers is not tracked against a tracker.
+  if (is_cached()) return;
+  DCHECK(mem_tracker_ != NULL);
+  dst->Consume(buffer_len_);
+  mem_tracker_->Release(buffer_len_);
+  mem_tracker_ = dst;
 }
 
 void DiskIoMgr::BufferDescriptor::Return() {
@@ -229,15 +254,6 @@ void DiskIoMgr::BufferDescriptor::Return() {
   io_mgr_->ReturnBuffer(this);
 }
 
-void DiskIoMgr::BufferDescriptor::SetMemTracker(MemTracker* tracker) {
-  // Cached buffers don't count towards mem usage.
-  if (scan_range_->cached_buffer_ != NULL) return;
-  if (mem_tracker_ == tracker) return;
-  if (mem_tracker_ != NULL) mem_tracker_->Release(buffer_len_);
-  mem_tracker_ = tracker;
-  if (mem_tracker_ != NULL) mem_tracker_->Consume(buffer_len_);
-}
-
 DiskIoMgr::WriteRange::WriteRange(const string& file, int64_t file_offset, int 
disk_id,
     WriteDoneCallback callback) {
   file_ = file;
@@ -340,12 +356,20 @@ DiskIoMgr::~DiskIoMgr() {
     delete disk_queues_[i];
   }
 
+  if (free_buffer_mem_tracker_ != NULL) 
free_buffer_mem_tracker_->UnregisterFromParent();
+  if (unowned_buffer_mem_tracker_ != NULL) {
+    unowned_buffer_mem_tracker_->UnregisterFromParent();
+  }
+
   if (cached_read_options_ != NULL) hadoopRzOptionsFree(cached_read_options_);
 }
 
 Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
   DCHECK(process_mem_tracker != NULL);
-  process_mem_tracker_ = process_mem_tracker;
+  free_buffer_mem_tracker_.reset(
+      new MemTracker(-1, -1, "Free Disk IO Buffers", process_mem_tracker, 
false));
+  unowned_buffer_mem_tracker_.reset(
+      new MemTracker(-1, -1, "Untracked Disk IO Buffers", process_mem_tracker, 
false));
   // If we hit the process limit, see if we can reclaim some memory by removing
   // previously allocated (but unused) io buffers.
   process_mem_tracker->AddGcFunction(bind(&DiskIoMgr::GcIoBuffers, this));
@@ -626,11 +650,10 @@ void DiskIoMgr::ReturnBuffer(BufferDescriptor* 
buffer_desc) {
 
   DiskIoRequestContext* reader = buffer_desc->reader_;
   if (buffer_desc->buffer_ != NULL) {
-    if (buffer_desc->scan_range_->cached_buffer_ == NULL) {
-      // Not a cached buffer. Return the io buffer and update mem tracking.
-      ReturnFreeBuffer(buffer_desc);
+    if (!buffer_desc->is_cached()) {
+      // Cached buffers are not allocated by DiskIoMgr so don't need to be 
freed.
+      FreeBufferMemory(buffer_desc);
     }
-    buffer_desc->buffer_ = NULL;
     num_buffers_in_readers_.Add(-1);
     reader->num_buffers_in_reader_.Add(-1);
   } else {
@@ -649,14 +672,15 @@ void DiskIoMgr::ReturnBuffer(BufferDescriptor* 
buffer_desc) {
 
 void DiskIoMgr::ReturnBufferDesc(BufferDescriptor* desc) {
   DCHECK(desc != NULL);
+  desc->Reset();
   unique_lock<mutex> lock(free_buffers_lock_);
   DCHECK(find(free_buffer_descs_.begin(), free_buffer_descs_.end(), desc)
          == free_buffer_descs_.end());
   free_buffer_descs_.push_back(desc);
 }
 
-DiskIoMgr::BufferDescriptor* DiskIoMgr::GetBufferDesc(
-    DiskIoRequestContext* reader, ScanRange* range, char* buffer, int64_t 
buffer_size) {
+DiskIoMgr::BufferDescriptor* DiskIoMgr::GetBufferDesc(DiskIoRequestContext* 
reader,
+    MemTracker* mem_tracker, ScanRange* range, char* buffer, int64_t 
buffer_size) {
   BufferDescriptor* buffer_desc;
   {
     unique_lock<mutex> lock(free_buffers_lock_);
@@ -667,43 +691,57 @@ DiskIoMgr::BufferDescriptor* DiskIoMgr::GetBufferDesc(
       free_buffer_descs_.pop_front();
     }
   }
-  buffer_desc->Reset(reader, range, buffer, buffer_size);
-  buffer_desc->SetMemTracker(reader->mem_tracker_);
+  buffer_desc->Reset(reader, range, buffer, buffer_size, mem_tracker);
   return buffer_desc;
 }
 
-char* DiskIoMgr::GetFreeBuffer(int64_t* buffer_size) {
-  DCHECK_LE(*buffer_size, max_buffer_size_);
-  DCHECK_GT(*buffer_size, 0);
-  *buffer_size = min(static_cast<int64_t>(max_buffer_size_), *buffer_size);
-  int idx = free_buffers_idx(*buffer_size);
+DiskIoMgr::BufferDescriptor* DiskIoMgr::GetFreeBuffer(DiskIoRequestContext* 
reader,
+    ScanRange* range, int64_t buffer_size) {
+  DCHECK_LE(buffer_size, max_buffer_size_);
+  DCHECK_GT(buffer_size, 0);
+  buffer_size = min(static_cast<int64_t>(max_buffer_size_), buffer_size);
+  int idx = free_buffers_idx(buffer_size);
   // Quantize buffer size to nearest power of 2 greater than the specified 
buffer size and
   // convert to bytes
-  *buffer_size = (1 << idx) * min_buffer_size_;
+  buffer_size = (1LL << idx) * min_buffer_size_;
+
+  // Track memory against the reader. This is checked the next time we start
+  // a read for the next reader in DiskIoMgr::GetNextScanRange().
+  // TODO: IMPALA-3200: BufferedBlockMgr does not expect read buffers to be 
tracked
+  // against its MemTracker. Once BufferedBlockMgr is removed, we can expect 
that
+  // all readers provide a MemTracker and remove this NULL check.
+  MemTracker* buffer_mem_tracker = reader->mem_tracker_ != NULL ? 
reader->mem_tracker_ :
+      unowned_buffer_mem_tracker_.get();
+  buffer_mem_tracker->Consume(buffer_size);
 
-  unique_lock<mutex> lock(free_buffers_lock_);
   char* buffer = NULL;
-  if (free_buffers_[idx].empty()) {
-    num_allocated_buffers_.Add(1);
-    if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != NULL) {
-      ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(1L);
-    }
-    if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != NULL) {
-      ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(*buffer_size);
-    }
-    // Update the process mem usage.  This is checked the next time we start
-    // a read for the next reader (DiskIoMgr::GetNextScanRange)
-    process_mem_tracker_->Consume(*buffer_size);
-    buffer = new char[*buffer_size];
-  } else {
-    if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != NULL) {
-      ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-1L);
+  {
+    unique_lock<mutex> lock(free_buffers_lock_);
+    if (free_buffers_[idx].empty()) {
+      num_allocated_buffers_.Add(1);
+      if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != NULL) {
+        ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(1L);
+      }
+      if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != NULL) {
+        ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(buffer_size);
+      }
+      // We already tracked this memory against the reader's MemTracker.
+      buffer = new char[buffer_size];
+    } else {
+      if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != NULL) {
+        ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-1L);
+      }
+      buffer = free_buffers_[idx].front();
+      free_buffers_[idx].pop_front();
+      free_buffer_mem_tracker_->Release(buffer_size);
     }
-    buffer = free_buffers_[idx].front();
-    free_buffers_[idx].pop_front();
   }
+
+  // Validate more invariants.
+  DCHECK(range != NULL);
+  DCHECK(reader != NULL);
   DCHECK(buffer != NULL);
-  return buffer;
+  return GetBufferDesc(reader, buffer_mem_tracker, range, buffer, buffer_size);
 }
 
 void DiskIoMgr::GcIoBuffers() {
@@ -711,12 +749,11 @@ void DiskIoMgr::GcIoBuffers() {
   int buffers_freed = 0;
   int bytes_freed = 0;
   for (int idx = 0; idx < free_buffers_.size(); ++idx) {
-    for (list<char*>::iterator iter = free_buffers_[idx].begin();
-         iter != free_buffers_[idx].end(); ++iter) {
-      int64_t buffer_size = (1 << idx) * min_buffer_size_;
-      process_mem_tracker_->Release(buffer_size);
+    for (char* buffer: free_buffers_[idx]) {
+      int64_t buffer_size = (1LL << idx) * min_buffer_size_;
+      delete[] buffer;
+      free_buffer_mem_tracker_->Release(buffer_size);
       num_allocated_buffers_.Add(-1);
-      delete[] *iter;
 
       ++buffers_freed;
       bytes_freed += buffer_size;
@@ -735,35 +772,44 @@ void DiskIoMgr::GcIoBuffers() {
   }
 }
 
-void DiskIoMgr::ReturnFreeBuffer(BufferDescriptor* desc) {
-  ReturnFreeBuffer(desc->buffer_, desc->buffer_len_);
-  desc->SetMemTracker(NULL);
-  desc->buffer_ = NULL;
-}
-
-void DiskIoMgr::ReturnFreeBuffer(char* buffer, int64_t buffer_size) {
-  DCHECK(buffer != NULL);
+void DiskIoMgr::FreeBufferMemory(BufferDescriptor* desc) {
+  DCHECK(!desc->is_cached());
+  char* buffer = desc->buffer_;
+  int64_t buffer_size = desc->buffer_len_;
   int idx = free_buffers_idx(buffer_size);
-  DCHECK_EQ(BitUtil::Ceil(buffer_size, min_buffer_size_) & ~(1 << idx), 0)
+  DCHECK_EQ(BitUtil::Ceil(buffer_size, min_buffer_size_) & ~(1LL << idx), 0)
       << "buffer_size_ / min_buffer_size_ should be power of 2, got 
buffer_size = "
       << buffer_size << ", min_buffer_size_ = " << min_buffer_size_;
-  unique_lock<mutex> lock(free_buffers_lock_);
-  if (!FLAGS_disable_mem_pools && free_buffers_[idx].size() < 
FLAGS_max_free_io_buffers) {
-    free_buffers_[idx].push_back(buffer);
-    if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != NULL) {
-      ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(1L);
-    }
-  } else {
-    process_mem_tracker_->Release(buffer_size);
-    num_allocated_buffers_.Add(-1);
-    delete[] buffer;
-    if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != NULL) {
-      ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-1L);
-    }
-    if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != NULL) {
-      ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-buffer_size);
+
+  {
+    unique_lock<mutex> lock(free_buffers_lock_);
+    if (!FLAGS_disable_mem_pools &&
+        free_buffers_[idx].size() < FLAGS_max_free_io_buffers) {
+      free_buffers_[idx].push_back(buffer);
+      if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != NULL) {
+        ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(1L);
+      }
+      // This consume call needs to be protected by 'free_buffers_lock_' to 
avoid a race
+      // with a Release() call for the same buffer that could make consumption 
negative.
+      // Note: we can't use TryConsume(), which can indirectly call 
GcIoBuffers().
+      // TODO: after IMPALA-3200 is completed, we should be able to leverage 
the buffer
+      // pool's free lists, and remove these free lists.
+      free_buffer_mem_tracker_->Consume(buffer_size);
+    } else {
+      num_allocated_buffers_.Add(-1);
+      delete[] buffer;
+      if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != NULL) {
+        ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-1L);
+      }
+      if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != NULL) {
+        ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-buffer_size);
+      }
     }
   }
+
+  // We transferred the buffer ownership from the BufferDescriptor to the 
DiskIoMgr.
+  desc->mem_tracker_->Release(buffer_size);
+  desc->buffer_ = NULL;
 }
 
 // This function gets the next RequestRange to work on for this disk. It 
checks for
@@ -812,14 +858,19 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* 
disk_queue, RequestRange** range,
     // same reader here (the reader is removed from the queue).  There can be
     // other disk threads operating on this reader in other functions though.
 
-    // We just picked a reader, check the mem limits.
-    // TODO: we can do a lot better here.  The reader can likely make progress
-    // with fewer io buffers.
-    bool process_limit_exceeded = process_mem_tracker_->LimitExceeded();
+    // We just picked a reader, check the mem limits. We need to fail the 
request if
+    // the reader exceeded its memory limit, or if we're over a global memory 
limit.
+    // TODO: once IMPALA-3200 is fixed, we should be able to remove the free 
lists and
+    // move these memory limit checks to GetFreeBuffer().
+    // Note that calling AnyLimitExceeded() can result in a call to 
GcIoBuffers().
+    bool any_io_mgr_limit_exceeded = 
free_buffer_mem_tracker_->AnyLimitExceeded();
+    // TODO: IMPALA-3209: we should not force a reader over its memory limit by
+    // pushing more buffers to it. Most readers can make progress and operate 
within
+    // a fixed memory limit.
     bool reader_limit_exceeded = (*request_context)->mem_tracker_ != NULL
         ? (*request_context)->mem_tracker_->AnyLimitExceeded() : false;
 
-    if (process_limit_exceeded || reader_limit_exceeded) {
+    if (any_io_mgr_limit_exceeded || reader_limit_exceeded) {
       (*request_context)->Cancel(Status::MemLimitExceeded());
     }
 
@@ -923,8 +974,7 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, 
DiskIoRequestContext*
   if (reader->state_ == DiskIoRequestContext::Cancelled) {
     state.DecrementRequestThreadAndCheckDone(reader);
     DCHECK(reader->Validate()) << endl << reader->DebugString();
-    ReturnFreeBuffer(buffer);
-    buffer->buffer_ = NULL;
+    FreeBufferMemory(buffer);
     buffer->scan_range_->Cancel(reader->status_);
     // Enqueue the buffer to use the scan range's buffer cleanup path.
     buffer->scan_range_->EnqueueBuffer(buffer);
@@ -940,7 +990,7 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, 
DiskIoRequestContext*
   //  3. Middle of scan range
   if (!buffer->status_.ok()) {
     // Error case
-    ReturnFreeBuffer(buffer);
+    FreeBufferMemory(buffer);
     buffer->eosr_ = true;
     --state.num_remaining_ranges();
     buffer->scan_range_->Cancel(buffer->status_);
@@ -952,11 +1002,12 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* 
disk_queue, DiskIoRequestContext*
   // Store the state we need before calling EnqueueBuffer().
   bool eosr = buffer->eosr_;
   ScanRange* scan_range = buffer->scan_range_;
-  bool queue_full = buffer->scan_range_->EnqueueBuffer(buffer);
+  bool is_cached = buffer->is_cached();
+  bool queue_full = scan_range->EnqueueBuffer(buffer);
   if (eosr) {
     // For cached buffers, we can't close the range until the cached buffer is 
returned.
     // Close() is called from DiskIoMgr::ReturnBuffer().
-    if (scan_range->cached_buffer_ == NULL) scan_range->Close();
+    if (!is_cached) scan_range->Close();
   } else {
     if (queue_full) {
       reader->blocked_ranges_.Enqueue(scan_range);
@@ -1003,7 +1054,6 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
 // specified reader context and disk queue.
 void DiskIoMgr::ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader,
     ScanRange* range) {
-  char* buffer = NULL;
   int64_t bytes_remaining = range->len_ - range->bytes_read_;
   DCHECK_GT(bytes_remaining, 0);
   int64_t buffer_size = ::min(bytes_remaining, 
static_cast<int64_t>(max_buffer_size_));
@@ -1044,17 +1094,9 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, 
DiskIoRequestContext* reader,
     }
   }
 
-  buffer = GetFreeBuffer(&buffer_size);
-  reader->num_used_buffers_.Add(1);
-
-  // Validate more invariants.
-  DCHECK_GT(reader->num_used_buffers_.Load(), 0);
-  DCHECK(range != NULL);
-  DCHECK(reader != NULL);
-  DCHECK(buffer != NULL);
-
-  BufferDescriptor* buffer_desc = GetBufferDesc(reader, range, buffer, 
buffer_size);
+  BufferDescriptor* buffer_desc = GetFreeBuffer(reader, range, buffer_size);
   DCHECK(buffer_desc != NULL);
+  reader->num_used_buffers_.Add(1);
 
   // No locks in this section.  Only working on local vars.  We don't want to 
hold a
   // lock across the read call.
@@ -1065,13 +1107,14 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, 
DiskIoRequestContext* reader,
       reader->active_read_thread_counter_->Add(1L);
     }
     if (reader->disks_accessed_bitmap_) {
-      int64_t disk_bit = 1 << disk_queue->disk_id;
+      int64_t disk_bit = 1LL << disk_queue->disk_id;
       reader->disks_accessed_bitmap_->BitOr(disk_bit);
     }
     SCOPED_TIMER(&read_timer_);
     SCOPED_TIMER(reader->read_timer_);
 
-    buffer_desc->status_ = range->Read(buffer, &buffer_desc->len_, 
&buffer_desc->eosr_);
+    buffer_desc->status_ = range->Read(buffer_desc->buffer_, 
&buffer_desc->len_,
+        &buffer_desc->eosr_);
     buffer_desc->scan_range_offset_ = range->bytes_read_ - buffer_desc->len_;
 
     if (reader->bytes_read_counter_ != NULL) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/17bf1441/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index 9eb663c..aadc85b 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -237,9 +237,12 @@ class DiskIoMgr {
     /// Returns the offset within the scan range that this buffer starts at
     int64_t scan_range_offset() const { return scan_range_offset_; }
 
-    /// Updates this buffer buffer to be owned by the new tracker. Consumption 
is
-    /// release from the current tracker and added to the new one.
-    void SetMemTracker(MemTracker* tracker);
+    /// Transfer ownership of buffer memory from 'mem_tracker_' to 'dst' and
+    /// set 'mem_tracker_' to 'dst'.  'mem_tracker_' and 'dst' must be 
non-NULL.
+    /// Does not check memory limits on 'dst': the caller should check the 
memory limit
+    /// if a different memory limit may apply to 'dst'.
+    /// TODO: IMPALA-3209: revisit this as part of scanner memory usage revamp.
+    void TransferOwnership(MemTracker* dst);
 
     /// Returns the buffer to the IoMgr. This must be called for every buffer
     /// returned by GetNext()/Read() that did not return an error. This is 
non-blocking.
@@ -251,19 +254,26 @@ class DiskIoMgr {
     friend class DiskIoRequestContext;
     BufferDescriptor(DiskIoMgr* io_mgr);
 
+    bool is_cached() { return scan_range_->cached_buffer_ != NULL; }
+
+    /// Reset the buffer descriptor to an uninitialized state.
+    void Reset();
+
     /// Resets the buffer descriptor state for a new reader, range and data 
buffer.
+    /// The buffer memory should already be accounted against MemTracker
     void Reset(DiskIoRequestContext* reader, ScanRange* range, char* buffer,
-        int64_t buffer_len);
+        int64_t buffer_len, MemTracker* mem_tracker);
 
-    DiskIoMgr* io_mgr_;
+    DiskIoMgr* const io_mgr_;
 
-    /// Reader that this buffer is for
+    /// Reader that this buffer is for.
     DiskIoRequestContext* reader_;
 
-    /// The current tracker this buffer is associated with.
+    /// The current tracker this buffer is associated with. After 
initialisation,
+    /// NULL for cached buffers and non-NULL for all other buffers.
     MemTracker* mem_tracker_;
 
-    /// Scan range that this buffer is for.
+    /// Scan range that this buffer is for. Non-NULL when initialised.
     ScanRange* scan_range_;
 
     /// buffer with the read contents
@@ -684,8 +694,13 @@ class DiskIoMgr {
   /// Pool to allocate BufferDescriptors.
   ObjectPool pool_;
 
-  /// Process memory tracker; needed to account for io buffers.
-  MemTracker* process_mem_tracker_;
+  /// Memory tracker for unused I/O buffers owned by DiskIoMgr.
+  boost::scoped_ptr<MemTracker> free_buffer_mem_tracker_;
+
+  /// Memory tracker for I/O buffers where the DiskIoRequestContext has no 
MemTracker.
+  /// TODO: once IMPALA-3200 is fixed, there should be no more cases where 
readers don't
+  /// provide a MemTracker.
+  boost::scoped_ptr<MemTracker> unowned_buffer_mem_tracker_;
 
   /// Number of worker(read) threads per disk. Also the max depth of queued
   /// work to the disk.
@@ -757,25 +772,27 @@ class DiskIoMgr {
   /// Returns the index into free_buffers_ for a given buffer size
   int free_buffers_idx(int64_t buffer_size);
 
-  /// Gets a buffer description object, initialized for this reader, 
allocating one as
-  /// necessary. buffer_size / min_buffer_size_ should be a power of 2, and 
buffer_size
-  /// should be <= max_buffer_size_. These constraints will be met if buffer 
was acquired
-  /// via GetFreeBuffer() (which it should have been).
-  BufferDescriptor* GetBufferDesc(
-      DiskIoRequestContext* reader, ScanRange* range, char* buffer, int64_t 
buffer_size);
-
-  /// Returns a buffer desc object which can now be used for another reader.
-  void ReturnBufferDesc(BufferDescriptor* desc);
+  /// Returns a buffer to read into with size between 'buffer_size' and
+  /// 'max_buffer_size_', If there is an appropriately-sized free buffer in the
+  /// 'free_buffers_', that is returned, otherwise a new one is allocated.
+  /// The returned *buffer_size must be between 0 and 'max_buffer_size_'.
+  /// The buffer memory is tracked against reader's mem tracker, or
+  /// 'unowned_buffer_mem_tracker_' if the reader does not have one.
+  BufferDescriptor* GetFreeBuffer(DiskIoRequestContext* reader, ScanRange* 
range,
+      int64_t buffer_size);
+
+  /// Gets a BufferDescriptor initialized with the provided parameters. The 
object may be
+  /// recycled or newly allocated. Does not do anything aside from initialize 
the
+  /// descriptor's fields.
+  BufferDescriptor* GetBufferDesc(DiskIoRequestContext* reader,
+      MemTracker* mem_tracker, ScanRange* range, char* buffer, int64_t 
buffer_size);
 
   /// Returns the buffer desc and underlying buffer to the disk IoMgr. This 
also updates
   /// the reader and disk queue state.
   void ReturnBuffer(BufferDescriptor* buffer);
 
-  /// Returns a buffer to read into with size between *buffer_size and 
max_buffer_size_,
-  /// and *buffer_size is set to the size of the buffer. If there is an
-  /// appropriately-sized free buffer in the 'free_buffers_', that is 
returned, otherwise
-  /// a new one is allocated. *buffer_size must be between 0 and 
max_buffer_size_.
-  char* GetFreeBuffer(int64_t* buffer_size);
+  /// Returns a buffer desc object which can now be used for another reader.
+  void ReturnBufferDesc(BufferDescriptor* desc);
 
   /// Garbage collect all unused io buffers. This is currently only triggered 
when the
   /// process wide limit is hit. This is not good enough. While it is 
sufficient for
@@ -783,14 +800,10 @@ class DiskIoMgr {
   /// TODO: make this run periodically?
   void GcIoBuffers();
 
-  /// Returns a buffer to the free list. buffer_size / min_buffer_size_ should 
be a power
-  /// of 2, and buffer_size should be <= max_buffer_size_. These constraints 
will be met
-  /// if buffer was acquired via GetFreeBuffer() (which it should have been).
-  void ReturnFreeBuffer(char* buffer, int64_t buffer_size);
-
-  /// Returns the buffer in desc (cannot be NULL), sets buffer to NULL and 
clears the
-  /// mem tracker.
-  void ReturnFreeBuffer(BufferDescriptor* desc);
+  /// Disassociates the desc->buffer_ memory from 'desc' (which cannot be 
NULL), either
+  /// freeing it or returning it to 'free_buffers_'. Memory tracking is 
updated to
+  /// reflect the transfer of ownership from desc->mem_tracker_ to the disk 
I/O mgr.
+  void FreeBufferMemory(BufferDescriptor* desc);
 
   /// Disk worker thread loop. This function retrieves the next range to 
process on
   /// the disk queue and invokes ReadRange() or Write() depending on the type 
of Range().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/17bf1441/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index 7e75736..458e137 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -290,7 +290,7 @@ void RowBatch::AddIoBuffer(DiskIoMgr::BufferDescriptor* 
buffer) {
   DCHECK(buffer != NULL);
   io_buffers_.push_back(buffer);
   auxiliary_mem_usage_ += buffer->buffer_len();
-  buffer->SetMemTracker(mem_tracker_);
+  buffer->TransferOwnership(mem_tracker_);
 }
 
 void RowBatch::AddTupleStream(BufferedTupleStream* stream) {

Reply via email to