IMPALA-5158: report buffer pool free memory in MemTracker

Clean pages and free buffers appear as untracked memory in the
MemTracker hierarchy. This was misleading since the memory is tracked
and present in the BufferPool. This change adds two MemTrackers below
the process level that accounts for this memory.

Updating global counters would be very inefficient and negate most of
the effort put into making the buffer allocator scalable. Instead the
values of the metrics are computed on demand by summing values across
all of the arena in BufferAlloctor.

The numbers reported are approximate because we do not lock any of the
BufferAllocator state and therefore don't get a consistent view of the
entire BufferAllocator at any moment in time. However they are accurate
enough to understand the general state of the system.

Also switches over ASAN to use a metric, similar to the regular TCMalloc
build so that the behaviour under ASAN diverges less.

Testing:
Add some checks to unit tests to sanity-check that the numbers computed
are valid.

Manually tested by rebasing my buffer pool dev branch onto this change
and running some spilling queries. The /memz page reported:

  Process: Limit=8.35 GB Total=1005.49 MB Peak=1.01 GB
    Buffer Pool: Free Buffers: Total=391.50 MB
    Buffer Pool: Clean Pages: Total=112.00 MB
    Free Disk IO Buffers: Total=247.00 KB Peak=30.23 MB
    RequestPool=fe-eval-exprs: Total=0 Peak=4.00 KB
    RequestPool=default-pool: Total=374.30 MB Peak=416.55 MB
      Query(b9421063d13af70b:ddb9973900000000): Reservation=0 
ReservationLimit=6.68 GB OtherMemory=801.09 KB Total=801.09 KB Peak=1.05 MB
      << snip >>
  Untracked Memory: Total=127.45 MB

Manually tested the ASAN change by building under ASAN, running some
queries, and inspecting the memz/ page. It reported a value of 100-200MB
untracked memory, similar to the non-ASAN build.

Change-Id: I007eb258377b33fff9f3246580d80fa551837078
Reviewed-on: http://gerrit.cloudera.org:8080/6993
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/567814b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/567814b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/567814b4

Branch: refs/heads/master
Commit: 567814b4c9cb7f7fe2e621cebec32c8481fc0cad
Parents: d5b6cb9
Author: Tim Armstrong <[email protected]>
Authored: Fri Apr 7 16:43:41 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Jun 15 20:23:48 2017 +0000

----------------------------------------------------------------------
 .../runtime/bufferpool/buffer-allocator-test.cc |  3 +
 be/src/runtime/bufferpool/buffer-allocator.cc   | 80 ++++++++++++++++++++
 be/src/runtime/bufferpool/buffer-allocator.h    | 15 ++++
 be/src/runtime/bufferpool/buffer-pool-test.cc   | 62 +++++++++++++--
 be/src/runtime/bufferpool/buffer-pool.cc        | 16 ++++
 be/src/runtime/bufferpool/buffer-pool.h         | 12 +++
 be/src/runtime/exec-env.cc                      | 30 +++++---
 be/src/runtime/exec-env.h                       |  4 +-
 be/src/runtime/mem-tracker.cc                   | 27 ++++---
 be/src/runtime/mem-tracker.h                    | 18 +++--
 be/src/scheduling/admission-controller.cc       |  2 +-
 be/src/util/memory-metrics.cc                   | 58 +++++++++++---
 be/src/util/memory-metrics.h                    | 29 ++++++-
 common/thrift/metrics.json                      | 52 +++++++++++++
 14 files changed, 357 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/567814b4/be/src/runtime/bufferpool/buffer-allocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator-test.cc 
b/be/src/runtime/bufferpool/buffer-allocator-test.cc
index 167298d..fa7ac10 100644
--- a/be/src/runtime/bufferpool/buffer-allocator-test.cc
+++ b/be/src/runtime/bufferpool/buffer-allocator-test.cc
@@ -117,6 +117,9 @@ TEST_F(BufferAllocatorTest, FreeListSizes) {
       // The low water mark will be the current size, so half the buffers 
should be freed.
       EXPECT_EQ(prev_size == 1 ? 0 : prev_size - prev_size / 2, new_size);
     }
+    // Check that the allocator reports the correct numbers.
+    EXPECT_EQ(new_size, allocator.GetNumFreeBuffers());
+    EXPECT_EQ(new_size * TEST_BUFFER_LEN, allocator.GetFreeBufferBytes());
     ++maintenance_calls;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/567814b4/be/src/runtime/bufferpool/buffer-allocator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator.cc 
b/be/src/runtime/bufferpool/buffer-allocator.cc
index 0bc4519..0978cca 100644
--- a/be/src/runtime/bufferpool/buffer-allocator.cc
+++ b/be/src/runtime/bufferpool/buffer-allocator.cc
@@ -84,6 +84,22 @@ class BufferPool::FreeBufferArena : public CacheLineAligned {
   /// on core 'core'.
   int GetFreeListSize(int64_t len);
 
+  /// Return the total number of free buffers in the arena. May be approximate 
since
+  /// it doesn't acquire the arena lock.
+  int64_t GetNumFreeBuffers();
+
+  /// Return the total bytes of free buffers in the arena. May be approximate 
since
+  /// it doesn't acquire the arena lock.
+  int64_t GetFreeBufferBytes();
+
+  /// Return the total number of clean pages in the arena. May be approximate 
since
+  /// it doesn't acquire the arena lock.
+  int64_t GetNumCleanPages();
+
+  /// Return the total bytes of clean pages in the arena. May be approximate 
since
+  /// it doesn't acquire the arena lock.
+  int64_t GetCleanPageBytes();
+
   string DebugString();
 
  private:
@@ -127,6 +143,10 @@ class BufferPool::FreeBufferArena : public 
CacheLineAligned {
     return &buffer_sizes_[idx];
   }
 
+  /// Compute a sum over all the lists in the arena. Does not lock the arena.
+  int64_t SumOverSizes(
+      std::function<int64_t(PerSizeLists* lists, int64_t buffer_size)> 
compute_fn);
+
   BufferAllocator* const parent_;
 
   /// Protects all data structures in the arena. See buffer-pool-internal.h 
for lock
@@ -381,6 +401,32 @@ int64_t 
BufferPool::BufferAllocator::FreeToSystem(vector<BufferHandle>&& buffers
   return bytes_freed;
 }
 
+int64_t BufferPool::BufferAllocator::SumOverArenas(
+    std::function<int64_t(FreeBufferArena* arena)> compute_fn) const {
+  int64_t total = 0;
+  for (const unique_ptr<FreeBufferArena>& arena : per_core_arenas_) {
+    total += compute_fn(arena.get());
+  }
+  return total;
+}
+
+int64_t BufferPool::BufferAllocator::GetNumFreeBuffers() const {
+  return SumOverArenas([](FreeBufferArena* arena) { return 
arena->GetNumFreeBuffers(); });
+}
+
+int64_t BufferPool::BufferAllocator::GetFreeBufferBytes() const {
+  return SumOverArenas(
+      [](FreeBufferArena* arena) { return arena->GetFreeBufferBytes(); });
+}
+
+int64_t BufferPool::BufferAllocator::GetNumCleanPages() const {
+  return SumOverArenas([](FreeBufferArena* arena) { return 
arena->GetNumCleanPages(); });
+}
+
+int64_t BufferPool::BufferAllocator::GetCleanPageBytes() const {
+  return SumOverArenas([](FreeBufferArena* arena) { return 
arena->GetCleanPageBytes(); });
+}
+
 string BufferPool::BufferAllocator::DebugString() {
   stringstream ss;
   ss << "<BufferAllocator> " << this << " min_buffer_len: " << min_buffer_len_
@@ -572,6 +618,40 @@ int BufferPool::FreeBufferArena::GetFreeListSize(int64_t 
len) {
   return lists->free_buffers.Size();
 }
 
+int64_t BufferPool::FreeBufferArena::SumOverSizes(
+    std::function<int64_t(PerSizeLists* lists, int64_t buffer_size)> 
compute_fn) {
+  int64_t total = 0;
+  for (int i = 0; i < NumBufferSizes(); ++i) {
+    int64_t buffer_size = (1L << i) * parent_->min_buffer_len_;
+    total += compute_fn(&buffer_sizes_[i], buffer_size);
+  }
+  return total;
+}
+
+int64_t BufferPool::FreeBufferArena::GetNumFreeBuffers() {
+  return SumOverSizes([](PerSizeLists* lists, int64_t buffer_size) {
+    return lists->num_free_buffers.Load();
+  });
+}
+
+int64_t BufferPool::FreeBufferArena::GetFreeBufferBytes() {
+  return SumOverSizes([](PerSizeLists* lists, int64_t buffer_size) {
+    return lists->num_free_buffers.Load() * buffer_size;
+  });
+}
+
+int64_t BufferPool::FreeBufferArena::GetNumCleanPages() {
+  return SumOverSizes([](PerSizeLists* lists, int64_t buffer_size) {
+    return lists->num_clean_pages.Load();
+  });
+}
+
+int64_t BufferPool::FreeBufferArena::GetCleanPageBytes() {
+  return SumOverSizes([](PerSizeLists* lists, int64_t buffer_size) {
+    return lists->num_clean_pages.Load() * buffer_size;
+  });
+}
+
 string BufferPool::FreeBufferArena::DebugString() {
   lock_guard<SpinLock> al(lock_);
   stringstream ss;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/567814b4/be/src/runtime/bufferpool/buffer-allocator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator.h 
b/be/src/runtime/bufferpool/buffer-allocator.h
index 97686f0..b360f38 100644
--- a/be/src/runtime/bufferpool/buffer-allocator.h
+++ b/be/src/runtime/bufferpool/buffer-allocator.h
@@ -129,6 +129,18 @@ class BufferPool::BufferAllocator {
     return system_bytes_limit_ - system_bytes_remaining_.Load();
   }
 
+  /// Return the total number of free buffers in the allocator.
+  int64_t GetNumFreeBuffers() const;
+
+  /// Return the total bytes of free buffers in the allocator.
+  int64_t GetFreeBufferBytes() const;
+
+  /// Return the total number of clean pages in the allocator.
+  int64_t GetNumCleanPages() const;
+
+  /// Return the total bytes of clean pages in the allocator.
+  int64_t GetCleanPageBytes() const;
+
   std::string DebugString();
 
  protected:
@@ -177,6 +189,9 @@ class BufferPool::BufferAllocator {
   /// Helper to free a list of buffers to the system. Returns the number of 
bytes freed.
   int64_t FreeToSystem(std::vector<BufferHandle>&& buffers);
 
+  /// Compute a sum over all arenas. Does not lock the arenas.
+  int64_t SumOverArenas(std::function<int64_t(FreeBufferArena* arena)> 
compute_fn) const;
+
   /// The pool that this allocator is associated with.
   BufferPool* const pool_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/567814b4/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc 
b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 860e45b..6185e36 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -287,11 +287,6 @@ class BufferPoolTest : public ::testing::Test {
     return buffer->mem_range();
   }
 
-  /// Return the total number of bytes allocated from the system currently.
-  int64_t SystemBytesAllocated(BufferPool* pool) {
-    return pool->allocator()->GetSystemBytesAllocated();
-  }
-
   /// Set the maximum number of scavenge attempts that the pool's allocator 
wil do.
   void SetMaxScavengeAttempts(BufferPool* pool, int max_attempts) {
     pool->allocator()->set_max_scavenge_attempts(max_attempts);
@@ -556,15 +551,22 @@ TEST_F(BufferPoolTest, BufferAllocation) {
   vector<BufferPool::BufferHandle> handles(num_buffers);
 
   // Create buffers of various valid sizes.
+  int64_t total_allocated = 0;
   for (int i = 0; i < num_buffers; ++i) {
     int size_multiple = 1 << i;
     int64_t buffer_len = TEST_BUFFER_LEN * size_multiple;
     int64_t used_before = client.GetUsedReservation();
     ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i]));
+    total_allocated += buffer_len;
     ASSERT_TRUE(handles[i].is_open());
     ASSERT_TRUE(handles[i].data() != NULL);
     ASSERT_EQ(handles[i].len(), buffer_len);
     ASSERT_EQ(client.GetUsedReservation(), used_before + buffer_len);
+
+    // Check that pool-wide values are updated correctly.
+    EXPECT_EQ(total_allocated, pool.GetSystemBytesAllocated());
+    EXPECT_EQ(0, pool.GetNumFreeBuffers());
+    EXPECT_EQ(0, pool.GetFreeBufferBytes());
   }
 
   // Close the handles and check memory consumption.
@@ -579,6 +581,52 @@ TEST_F(BufferPoolTest, BufferAllocation) {
 
   // All the reservations should be released at this point.
   ASSERT_EQ(global_reservations_.GetReservation(), 0);
+  // But freed memory is not released to the system immediately.
+  EXPECT_EQ(total_allocated, pool.GetSystemBytesAllocated());
+  EXPECT_EQ(num_buffers, pool.GetNumFreeBuffers());
+  EXPECT_EQ(total_allocated, pool.GetFreeBufferBytes());
+  global_reservations_.Close();
+}
+
+// Test that the buffer pool correctly reports the number of clean pages.
+TEST_F(BufferPoolTest, CleanPageStats) {
+  const int MAX_NUM_BUFFERS = 4;
+  const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+
+  ClientHandle client;
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), 
&global_reservations_,
+      nullptr, TOTAL_MEM, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
+
+  vector<PageHandle> pages;
+  CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages);
+  WriteData(pages, 0);
+
+  // Pages don't start off clean.
+  EXPECT_EQ(0, pool.GetNumCleanPages());
+  EXPECT_EQ(0, pool.GetCleanPageBytes());
+
+  // Unpin pages and wait until they're written out and therefore clean.
+  UnpinAll(&pool, &client, &pages);
+  WaitForAllWrites(&client);
+  EXPECT_EQ(MAX_NUM_BUFFERS, pool.GetNumCleanPages());
+  EXPECT_EQ(TOTAL_MEM, pool.GetCleanPageBytes());
+
+  // Do an allocation to force eviction of one page.
+  ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN));
+  EXPECT_EQ(MAX_NUM_BUFFERS - 1, pool.GetNumCleanPages());
+  EXPECT_EQ(TOTAL_MEM - TEST_BUFFER_LEN, pool.GetCleanPageBytes());
+
+  // Re-pin all the pages - none will be clean afterwards.
+  ASSERT_OK(PinAll(&pool, &client, &pages));
+  VerifyData(pages, 0);
+  EXPECT_EQ(0, pool.GetNumCleanPages());
+  EXPECT_EQ(0, pool.GetCleanPageBytes());
+
+  DestroyAll(&pool, &client, &pages);
+  pool.DeregisterClient(&client);
   global_reservations_.Close();
 }
 
@@ -1163,7 +1211,7 @@ void BufferPoolTest::TestEvictionPolicy(int64_t 
page_size) {
 
   // Unpin pages. Writes should be started and memory should not be 
deallocated.
   EXPECT_EQ(total_mem, cumulative_bytes_alloced->value());
-  EXPECT_EQ(total_mem, SystemBytesAllocated(&pool));
+  EXPECT_EQ(total_mem, pool.GetSystemBytesAllocated());
   UnpinAll(&pool, &client, &pages);
   ASSERT_GT(write_ios->value(), 0);
 
@@ -1186,7 +1234,7 @@ void BufferPoolTest::TestEvictionPolicy(int64_t 
page_size) {
   // At least two unpinned pages should have been written out.
   ASSERT_GE(write_ios->value(), prev_write_ios + NUM_EXTRA_BUFFERS);
   // No additional memory should have been allocated - it should have been 
recycled.
-  EXPECT_EQ(total_mem, SystemBytesAllocated(&pool));
+  EXPECT_EQ(total_mem, pool.GetSystemBytesAllocated());
   // Check that two pages were evicted.
   int num_evicted = 0;
   for (PageHandle& page : pages) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/567814b4/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc 
b/be/src/runtime/bufferpool/buffer-pool.cc
index 6d65457..5593a41 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -272,6 +272,22 @@ int64_t BufferPool::GetSystemBytesAllocated() const {
   return allocator_->GetSystemBytesAllocated();
 }
 
+int64_t BufferPool::GetNumCleanPages() const {
+  return allocator_->GetNumCleanPages();
+}
+
+int64_t BufferPool::GetCleanPageBytes() const {
+  return allocator_->GetCleanPageBytes();
+}
+
+int64_t BufferPool::GetNumFreeBuffers() const {
+  return allocator_->GetNumFreeBuffers();
+}
+
+int64_t BufferPool::GetFreeBufferBytes() const {
+  return allocator_->GetFreeBufferBytes();
+}
+
 bool BufferPool::ClientHandle::IncreaseReservation(int64_t bytes) {
   return impl_->reservation()->IncreaseReservation(bytes);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/567814b4/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h 
b/be/src/runtime/bufferpool/buffer-pool.h
index dbf75bc..93c08bd 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -263,6 +263,18 @@ class BufferPool : public CacheLineAligned {
   int64_t GetSystemBytesLimit() const;
   int64_t GetSystemBytesAllocated() const;
 
+  /// Return the total number of clean pages in the pool.
+  int64_t GetNumCleanPages() const;
+
+  /// Return the total bytes of clean pages in the pool.
+  int64_t GetCleanPageBytes() const;
+
+  /// Return the total number of free buffers in the pool.
+  int64_t GetNumFreeBuffers() const;
+
+  /// Return the total bytes of free buffers in the pool.
+  int64_t GetFreeBufferBytes() const;
+
   /// Generous upper bounds on page and buffer size and the number of different
   /// power-of-two buffer sizes.
   static constexpr int LOG_MAX_BUFFER_BYTES = 48;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/567814b4/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 0354b37..a84034a 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -25,6 +25,7 @@
 #include <kudu/client/client.h>
 
 #include "common/logging.h"
+#include "common/object-pool.h"
 #include "exec/kudu-util.h"
 #include "gen-cpp/CatalogService.h"
 #include "gen-cpp/ImpalaInternalService.h"
@@ -137,7 +138,8 @@ struct ExecEnv::KuduClientPtr {
 ExecEnv* ExecEnv::exec_env_ = nullptr;
 
 ExecEnv::ExecEnv()
-  : metrics_(new MetricGroup("impala-metrics")),
+  : obj_pool_(new ObjectPool),
+    metrics_(new MetricGroup("impala-metrics")),
     stream_mgr_(new DataStreamMgr(metrics_.get())),
     impalad_client_cache_(
         new 
ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries, 0,
@@ -158,8 +160,8 @@ ExecEnv::ExecEnv()
     tmp_file_mgr_(new TmpFileMgr),
     request_pool_service_(new RequestPoolService(metrics_.get())),
     frontend_(new Frontend()),
-    exec_rpc_thread_pool_(new CallableThreadPool("exec-rpc-pool",
-        "worker", FLAGS_coordinator_rpc_threads, 
numeric_limits<int32_t>::max())),
+    exec_rpc_thread_pool_(new CallableThreadPool("exec-rpc-pool", "worker",
+        FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())),
     async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 
10000)),
     query_exec_mgr_(new QueryExecMgr()),
     buffer_reservation_(nullptr),
@@ -203,7 +205,8 @@ ExecEnv::ExecEnv()
 // TODO: Need refactor to get rid of duplicated code.
 ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
     int webserver_port, const string& statestore_host, int statestore_port)
-  : metrics_(new MetricGroup("impala-metrics")),
+  : obj_pool_(new ObjectPool),
+    metrics_(new MetricGroup("impala-metrics")),
     stream_mgr_(new DataStreamMgr(metrics_.get())),
     impalad_client_cache_(
         new 
ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries, 0,
@@ -223,8 +226,8 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, 
int subscriber_port,
         CreateHdfsOpThreadPool("hdfs-worker-pool", 
FLAGS_num_hdfs_worker_threads, 1024)),
     tmp_file_mgr_(new TmpFileMgr),
     frontend_(new Frontend()),
-    exec_rpc_thread_pool_(new CallableThreadPool("exec-rpc-pool",
-        "worker", FLAGS_coordinator_rpc_threads, 
numeric_limits<int32_t>::max())),
+    exec_rpc_thread_pool_(new CallableThreadPool("exec-rpc-pool", "worker",
+        FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())),
     async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 
10000)),
     query_exec_mgr_(new QueryExecMgr()),
     buffer_reservation_(nullptr),
@@ -318,10 +321,18 @@ Status ExecEnv::StartServices() {
   RETURN_IF_ERROR(RegisterMemoryMetrics(
       metrics_.get(), true, buffer_reservation_.get(), buffer_pool_.get()));
 
-#ifndef ADDRESS_SANITIZER
   // Limit of -1 means no memory limit.
   mem_tracker_.reset(new MemTracker(
       AggregateMemoryMetric::TOTAL_USED, bytes_limit > 0 ? bytes_limit : -1, 
"Process"));
+  if (buffer_pool_ != nullptr) {
+    // Add BufferPool MemTrackers for cached memory that is not tracked 
against queries
+    // but is included in process memory consumption.
+    obj_pool_->Add(new MemTracker(BufferPoolMetric::FREE_BUFFER_BYTES, -1,
+        "Buffer Pool: Free Buffers", mem_tracker_.get()));
+    obj_pool_->Add(new MemTracker(BufferPoolMetric::CLEAN_PAGE_BYTES, -1,
+        "Buffer Pool: Clean Pages", mem_tracker_.get()));
+  }
+#ifndef ADDRESS_SANITIZER
   // Aggressive decommit is required so that unused pages in the TCMalloc page 
heap are
   // not backed by physical pages and do not contribute towards memory 
consumption.
   size_t aggressive_decommit_enabled = 0;
@@ -330,12 +341,7 @@ Status ExecEnv::StartServices() {
   if (!aggressive_decommit_enabled) {
     return Status("TCMalloc aggressive decommit is required but is disabled.");
   }
-#else
-  // tcmalloc metrics aren't defined in ASAN builds, just use the default 
behavior to
-  // track process memory usage (sum of all children trackers).
-  mem_tracker_.reset(new MemTracker(bytes_limit > 0 ? bytes_limit : -1, 
"Process"));
 #endif
-
   mem_tracker_->RegisterMetrics(metrics_.get(), "mem-tracker.process");
 
   if (bytes_limit > MemInfo::physical_mem()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/567814b4/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index f914a83..8437b41 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -45,8 +45,9 @@ class HdfsFsCache;
 class ImpalaServer;
 class LibCache;
 class MemTracker;
-class PoolMemTrackerRegistry;
 class MetricGroup;
+class PoolMemTrackerRegistry;
+class ObjectPool;
 class QueryResourceMgr;
 class RequestPoolService;
 class ReservationTracker;
@@ -135,6 +136,7 @@ class ExecEnv {
 
  protected:
   /// Leave protected so that subclasses can override
+  boost::scoped_ptr<ObjectPool> obj_pool_;
   boost::scoped_ptr<MetricGroup> metrics_;
   boost::scoped_ptr<DataStreamMgr> stream_mgr_;
   boost::scoped_ptr<Scheduler> scheduler_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/567814b4/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index e328c01..ebf5137 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -55,7 +55,6 @@ MemTracker::MemTracker(
     bytes_freed_by_last_gc_metric_(NULL),
     bytes_over_limit_metric_(NULL),
     limit_metric_(NULL) {
-  if (parent != NULL) parent_->AddChildTracker(this);
   Init();
 }
 
@@ -72,15 +71,14 @@ MemTracker::MemTracker(RuntimeProfile* profile, int64_t 
byte_limit,
     bytes_freed_by_last_gc_metric_(NULL),
     bytes_over_limit_metric_(NULL),
     limit_metric_(NULL) {
-  if (parent != NULL) parent_->AddChildTracker(this);
   Init();
 }
 
-MemTracker::MemTracker(
-    UIntGauge* consumption_metric, int64_t byte_limit, const string& label)
+MemTracker::MemTracker(UIntGauge* consumption_metric, int64_t byte_limit,
+    const string& label, MemTracker* parent)
   : limit_(byte_limit),
     label_(label),
-    parent_(NULL),
+    parent_(parent),
     consumption_(&local_counter_),
     local_counter_(TUnit::BYTES),
     consumption_metric_(consumption_metric),
@@ -94,6 +92,7 @@ MemTracker::MemTracker(
 
 void MemTracker::Init() {
   DCHECK_GE(limit_, -1);
+  if (parent_ != NULL) parent_->AddChildTracker(this);
   // populate all_trackers_ and limit_trackers_
   MemTracker* tracker = this;
   while (tracker != NULL) {
@@ -122,7 +121,7 @@ void MemTracker::EnableReservationReporting(const 
ReservationTrackerCounters& co
   reservation_counters_.Store(new_counters);
 }
 
-int64_t MemTracker::GetPoolMemReserved() const {
+int64_t MemTracker::GetPoolMemReserved() {
   // Pool trackers should have a pool_name_ and no limit.
   DCHECK(!pool_name_.empty());
   DCHECK_EQ(limit_, -1) << LogUsage("");
@@ -228,7 +227,9 @@ void MemTracker::RegisterMetrics(MetricGroup* metrics, 
const string& prefix) {
 //   TrackerName: Limit=5.00 MB Reservation=5.00 MB OtherMemory=1.04 MB
 //                Total=6.04 MB Peak=6.45 MB
 //
-string MemTracker::LogUsage(const string& prefix, int64_t* logged_consumption) 
const {
+string MemTracker::LogUsage(const string& prefix, int64_t* logged_consumption) 
{
+  // Make sure the consumption is up to date.
+  if (consumption_metric_ != nullptr) RefreshConsumptionFromMetric();
   int64_t curr_consumption = consumption();
   int64_t peak_consumption = consumption_->value();
   if (logged_consumption != nullptr) *logged_consumption = curr_consumption;
@@ -251,8 +252,14 @@ string MemTracker::LogUsage(const string& prefix, int64_t* 
logged_consumption) c
     ss << " OtherMemory="
        << PrettyPrinter::Print(curr_consumption - reservation, TUnit::BYTES);
   }
-  ss << " Total=" << PrettyPrinter::Print(curr_consumption, TUnit::BYTES)
-     << " Peak=" << PrettyPrinter::Print(peak_consumption, TUnit::BYTES);
+  ss << " Total=" << PrettyPrinter::Print(curr_consumption, TUnit::BYTES);
+  // Peak consumption is not accurate if the metric is lazily updated (i.e.
+  // this is a non-root tracker that exists only for reporting purposes).
+  // Only report peak consumption if we actually call Consume()/Release() on
+  // this tracker or an descendent.
+  if (consumption_metric_ == nullptr || parent_ == nullptr) {
+    ss << " Peak=" << PrettyPrinter::Print(peak_consumption, TUnit::BYTES);
+  }
 
   string new_prefix = Substitute("  $0", prefix);
   int64_t child_consumption;
@@ -263,7 +270,7 @@ string MemTracker::LogUsage(const string& prefix, int64_t* 
logged_consumption) c
   }
   if (!child_trackers_usage.empty()) ss << "\n" << child_trackers_usage;
 
-  if (consumption_metric_ != nullptr) {
+  if (parent_ == nullptr) {
     // Log the difference between the metric value and children as "untracked" 
memory so
     // that the values always add up. This value is not always completely 
accurate because
     // we did not necessarily get a consistent snapshot of the consumption 
values for all

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/567814b4/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 2edb658..a9e265a 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -58,7 +58,10 @@ class TQueryOptions;
 /// tally maintained by Consume() and Release(). A tcmalloc metric is used to 
track
 /// process memory consumption, since the process memory usage may be higher 
than the
 /// computed total memory (tcmalloc does not release deallocated memory 
immediately).
-//
+/// Other consumption metrics are used in trackers below the process level to 
account
+/// for memory (such as free buffer pool buffers) that is not tracked by 
Consume() and
+/// Release().
+///
 /// GcFunctions can be attached to a MemTracker in order to free up memory if 
the limit is
 /// reached. If LimitExceeded() is called and the limit is exceeded, it will 
first call
 /// the GcFunctions to try to free memory and recheck the limit. For example, 
the process
@@ -84,9 +87,11 @@ class MemTracker {
       const std::string& label = std::string(), MemTracker* parent = NULL);
 
   /// C'tor for tracker that uses consumption_metric as the consumption value.
-  /// Consume()/Release() can still be called. This is used for the process 
tracker.
+  /// Consume()/Release() can still be called. This is used for the root 
process tracker
+  /// (if 'parent' is NULL). It is also to report on other categories of 
memory under the
+  /// process tracker, e.g. buffer pool free buffers (if 'parent - non-NULL).
   MemTracker(UIntGauge* consumption_metric, int64_t byte_limit = -1,
-      const std::string& label = std::string());
+      const std::string& label = std::string(), MemTracker* parent = NULL);
 
   ~MemTracker();
 
@@ -257,7 +262,6 @@ class MemTracker {
   /// call if this tracker has a consumption metric.
   void RefreshConsumptionFromMetric() {
     DCHECK(consumption_metric_ != nullptr);
-    DCHECK(parent_ == nullptr);
     consumption_->Set(consumption_metric_->value());
   }
 
@@ -281,7 +285,7 @@ class MemTracker {
   /// of the memory reserved by the queries in it (i.e. its child trackers). 
The mem
   /// reserved for a query is its limit_, if set (which should be the common 
case with
   /// admission control). Otherwise the current consumption is used.
-  int64_t GetPoolMemReserved() const;
+  int64_t GetPoolMemReserved();
 
   /// Returns the memory consumed in bytes.
   int64_t consumption() const { return consumption_->current_value(); }
@@ -314,7 +318,7 @@ class MemTracker {
   /// TODO: once all memory is accounted in ReservationTracker hierarchy, move
   /// reporting there.
   std::string LogUsage(
-      const std::string& prefix = "", int64_t* logged_consumption = nullptr) 
const;
+      const std::string& prefix = "", int64_t* logged_consumption = nullptr);
 
   /// Log the memory usage when memory limit is exceeded and return a status 
object with
   /// details of the allocation which caused the limit to be exceeded.
@@ -385,7 +389,7 @@ class MemTracker {
   /// All the child trackers of this tracker. Used only for computing resource 
pool mem
   /// reserved and error reporting, i.e., updating a parent tracker does not 
update its
   /// children.
-  mutable SpinLock child_trackers_lock_;
+  SpinLock child_trackers_lock_;
   std::list<MemTracker*> child_trackers_;
 
   /// Iterator into parent_->child_trackers_ for this object. Stored to have 
O(1)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/567814b4/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc 
b/be/src/scheduling/admission-controller.cc
index efec78a..73169f9 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -703,7 +703,7 @@ void AdmissionController::UpdateClusterAggregates() {
 void AdmissionController::PoolStats::UpdateMemTrackerStats() {
   // May be NULL if no queries have ever executed in this pool on this node 
but another
   // node sent stats for this pool.
-  const MemTracker* tracker =
+  MemTracker* tracker =
       
ExecEnv::GetInstance()->pool_mem_trackers()->GetRequestPoolMemTracker(name_, 
false);
 
   const int64_t current_reserved =

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/567814b4/be/src/util/memory-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.cc b/be/src/util/memory-metrics.cc
index 9336cf3..b7aa3a6 100644
--- a/be/src/util/memory-metrics.cc
+++ b/be/src/util/memory-metrics.cc
@@ -39,9 +39,15 @@ TcmallocMetric* TcmallocMetric::TOTAL_BYTES_RESERVED = NULL;
 TcmallocMetric* TcmallocMetric::PAGEHEAP_UNMAPPED_BYTES = NULL;
 TcmallocMetric::PhysicalBytesMetric* TcmallocMetric::PHYSICAL_BYTES_RESERVED = 
NULL;
 
+AsanMallocMetric* AsanMallocMetric::BYTES_ALLOCATED = nullptr;
+
 BufferPoolMetric* BufferPoolMetric::LIMIT = nullptr;
 BufferPoolMetric* BufferPoolMetric::SYSTEM_ALLOCATED = nullptr;
 BufferPoolMetric* BufferPoolMetric::RESERVED = nullptr;
+BufferPoolMetric* BufferPoolMetric::NUM_FREE_BUFFERS = nullptr;
+BufferPoolMetric* BufferPoolMetric::FREE_BUFFER_BYTES = nullptr;
+BufferPoolMetric* BufferPoolMetric::NUM_CLEAN_PAGES = nullptr;
+BufferPoolMetric* BufferPoolMetric::CLEAN_PAGE_BYTES = nullptr;
 
 TcmallocMetric* TcmallocMetric::CreateAndRegister(
     MetricGroup* metrics, const string& key, const string& tcmalloc_var) {
@@ -55,7 +61,21 @@ Status impala::RegisterMemoryMetrics(MetricGroup* metrics, 
bool register_jvm_met
     RETURN_IF_ERROR(BufferPoolMetric::InitMetrics(
         metrics->GetOrCreateChildGroup("buffer-pool"), global_reservations, 
buffer_pool));
   }
-#ifndef ADDRESS_SANITIZER
+
+  // Add compound metrics that track totals across malloc and the buffer pool.
+  // total-used should track the total physical memory in use.
+  vector<UIntGauge*> used_metrics;
+  if (FLAGS_mmap_buffers && global_reservations != nullptr) {
+    // If we mmap() buffers, the buffers are not allocated via malloc. Ensure 
they are
+    // properly tracked.
+    used_metrics.push_back(BufferPoolMetric::SYSTEM_ALLOCATED);
+  }
+
+#ifdef ADDRESS_SANITIZER
+  AsanMallocMetric::BYTES_ALLOCATED = metrics->RegisterMetric(
+      new AsanMallocMetric(MetricDefs::Get("asan-total-bytes-allocated")));
+  used_metrics.push_back(AsanMallocMetric::BYTES_ALLOCATED);
+#else
   // We rely on TCMalloc for our global memory metrics, so skip setting them up
   // if we're not using TCMalloc.
   TcmallocMetric::BYTES_IN_USE = TcmallocMetric::CreateAndRegister(
@@ -74,18 +94,10 @@ Status impala::RegisterMemoryMetrics(MetricGroup* metrics, 
bool register_jvm_met
       metrics->RegisterMetric(new TcmallocMetric::PhysicalBytesMetric(
           MetricDefs::Get("tcmalloc.physical-bytes-reserved")));
 
-  // Add compound metrics that track totals across TCMalloc and the buffer 
pool.
-  // total-used should track the total physical memory in use.
-  vector<UIntGauge*> used_metrics{TcmallocMetric::PHYSICAL_BYTES_RESERVED};
-  if (FLAGS_mmap_buffers && global_reservations != nullptr) {
-    // If we mmap() buffers, the buffers are not allocated via TCMalloc. 
Ensure they are
-    // properly tracked.
-    used_metrics.push_back(BufferPoolMetric::SYSTEM_ALLOCATED);
-  }
-
+  used_metrics.push_back(TcmallocMetric::PHYSICAL_BYTES_RESERVED);
+#endif
   AggregateMemoryMetric::TOTAL_USED = metrics->RegisterMetric(
       new SumGauge<uint64_t>(MetricDefs::Get("memory.total-used"), 
used_metrics));
-#endif
   if (register_jvm_metrics) {
     
RETURN_IF_ERROR(JvmMetric::InitMetrics(metrics->GetOrCreateChildGroup("jvm")));
   }
@@ -175,6 +187,18 @@ Status BufferPoolMetric::InitMetrics(MetricGroup* metrics,
   RESERVED = metrics->RegisterMetric(
       new BufferPoolMetric(MetricDefs::Get("buffer-pool.reserved"),
           BufferPoolMetricType::RESERVED, global_reservations, buffer_pool));
+  NUM_FREE_BUFFERS = metrics->RegisterMetric(
+      new BufferPoolMetric(MetricDefs::Get("buffer-pool.free-buffers"),
+          BufferPoolMetricType::NUM_FREE_BUFFERS, global_reservations, 
buffer_pool));
+  FREE_BUFFER_BYTES = metrics->RegisterMetric(
+      new BufferPoolMetric(MetricDefs::Get("buffer-pool.free-buffer-bytes"),
+          BufferPoolMetricType::FREE_BUFFER_BYTES, global_reservations, 
buffer_pool));
+  NUM_CLEAN_PAGES = metrics->RegisterMetric(
+      new BufferPoolMetric(MetricDefs::Get("buffer-pool.clean-pages"),
+          BufferPoolMetricType::NUM_CLEAN_PAGES, global_reservations, 
buffer_pool));
+  CLEAN_PAGE_BYTES = metrics->RegisterMetric(
+      new BufferPoolMetric(MetricDefs::Get("buffer-pool.clean-page-bytes"),
+          BufferPoolMetricType::CLEAN_PAGE_BYTES, global_reservations, 
buffer_pool));
   return Status::OK();
 }
 
@@ -196,6 +220,18 @@ void BufferPoolMetric::CalculateValue() {
     case BufferPoolMetricType::RESERVED:
       value_ = global_reservations_->GetReservation();
       break;
+    case BufferPoolMetricType::NUM_FREE_BUFFERS:
+      value_ = buffer_pool_->GetNumFreeBuffers();
+      break;
+    case BufferPoolMetricType::FREE_BUFFER_BYTES:
+      value_ = buffer_pool_->GetFreeBufferBytes();
+      break;
+    case BufferPoolMetricType::NUM_CLEAN_PAGES:
+      value_ = buffer_pool_->GetNumCleanPages();
+      break;
+    case BufferPoolMetricType::CLEAN_PAGE_BYTES:
+      value_ = buffer_pool_->GetCleanPageBytes();
+      break;
     default:
       DCHECK(false) << "Unknown BufferPoolMetricType: " << 
static_cast<int>(type_);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/567814b4/be/src/util/memory-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h
index 2527735..5149d9c 100644
--- a/be/src/util/memory-metrics.h
+++ b/be/src/util/memory-metrics.h
@@ -20,12 +20,15 @@
 
 #include "util/metrics.h"
 
-#include <boost/thread/mutex.hpp>
 #include <boost/bind.hpp>
+#include <boost/thread/mutex.hpp>
 #include <gperftools/malloc_extension.h>
+#ifdef ADDRESS_SANITIZER
+#include <sanitizer/allocator_interface.h>
+#endif
 
-#include "util/debug-util.h"
 #include "gen-cpp/Frontend_types.h"
+#include "util/debug-util.h"
 
 namespace impala {
 
@@ -99,6 +102,20 @@ class TcmallocMetric : public UIntGauge {
   }
 };
 
+/// Alternative to TCMallocMetric if we're running under Address Sanitizer, 
which
+/// does not provide the same metrics.
+class AsanMallocMetric : public UIntGauge {
+ public:
+  AsanMallocMetric(const TMetricDef& def) : UIntGauge(def, 0) {}
+  static AsanMallocMetric* BYTES_ALLOCATED;
+ private:
+  virtual void CalculateValue() override {
+#ifdef ADDRESS_SANITIZER
+    value_ = __sanitizer_get_current_allocated_bytes();
+#endif
+  }
+};
+
 /// A JvmMetric corresponds to one value drawn from one 'memory pool' in the 
JVM. A memory
 /// pool is an area of memory assigned for one particular aspect of memory 
management. For
 /// example Hotspot has pools for the permanent generation, the old 
generation, survivor
@@ -151,6 +168,10 @@ class BufferPoolMetric : public UIntGauge {
   static BufferPoolMetric* LIMIT;
   static BufferPoolMetric* SYSTEM_ALLOCATED;
   static BufferPoolMetric* RESERVED;
+  static BufferPoolMetric* NUM_FREE_BUFFERS;
+  static BufferPoolMetric* FREE_BUFFER_BYTES;
+  static BufferPoolMetric* NUM_CLEAN_PAGES;
+  static BufferPoolMetric* CLEAN_PAGE_BYTES;
 
  protected:
   virtual void CalculateValue();
@@ -164,6 +185,10 @@ class BufferPoolMetric : public UIntGauge {
     // are fulfilled, or > SYSTEM_ALLOCATED because of additional memory 
cached by
     // BufferPool. Always <= LIMIT.
     RESERVED,
+    NUM_FREE_BUFFERS, // Total number of free buffers in BufferPool.
+    FREE_BUFFER_BYTES, // Total bytes of free buffers in BufferPool.
+    NUM_CLEAN_PAGES, // Total number of clean pages in BufferPool.
+    CLEAN_PAGE_BYTES, // Total bytes of clean pages in BufferPool.
   };
 
   BufferPoolMetric(const TMetricDef& def, BufferPoolMetricType type,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/567814b4/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index a3fbd7b..4d9bbf2 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -1112,6 +1112,18 @@
     "key": "tcmalloc.total-bytes-reserved"
   },
   {
+    "description": "Bytes allocated from Address Sanitizer's malloc (Address 
Sanitizer debug builds only)",
+    "contexts": [
+      "STATESTORE",
+      "CATALOGSERVER",
+      "IMPALAD"
+    ],
+    "label": "Address Sanitizer Malloc Bytes Allocated",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "asan-total-bytes-allocated"
+  },
+  {
     "description": "Maximum allowed bytes allocated by the buffer pool.",
     "contexts": [
       "IMPALAD"
@@ -1142,6 +1154,46 @@
     "key": "buffer-pool.reserved"
   },
   {
+    "description": "Total number of free buffers cached in the buffer pool.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Buffer Pool Free Buffers.",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "buffer-pool.free-buffers"
+  },
+  {
+    "description": "Total bytes of free buffer memory cached in the buffer 
pool.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Buffer Pool Free Buffer Bytes.",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "buffer-pool.free-buffer-bytes"
+  },
+  {
+    "description": "Total number of clean pages cached in the buffer pool.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Buffer Pool Clean Pages.",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "buffer-pool.clean-pages"
+  },
+  {
+    "description": "Total bytes of clean page memory cached in the buffer 
pool.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Buffer Pool Clean Page Bytes.",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "buffer-pool.clean-page-bytes"
+  },
+  {
     "description": "Total memory currently used by TCMalloc and buffer pool.",
     "contexts": [
       "STATESTORE",


Reply via email to